Source code for kumoai.experimental.rfm.local_table

from typing import Optional

import pandas as pd
from kumoapi.typing import Dtype, Stype
from typing_extensions import Self

from kumoai.experimental.rfm import utils
from kumoai.experimental.rfm.base import Column, Table


[docs] class LocalTable(Table): r"""A table backed by a :class:`pandas.DataFrame`. A :class:`LocalTable` fully specifies the relevant metadata, *i.e.* selected columns, column semantic types, primary keys and time columns. :class:`LocalTable` is used to create a :class:`LocalGraph`. .. code-block:: python import pandas as pd import kumoai.experimental.rfm as rfm # Load data from a CSV file: df = pd.read_csv("data.csv") # Create a table from a `pandas.DataFrame` and infer its metadata ... table = rfm.LocalTable(df, name="my_table").infer_metadata() # ... or create a table explicitly: table = rfm.LocalTable( df=df, name="my_table", primary_key="id", time_column="time", end_time_column=None, ) # Verify metadata: table.print_metadata() # Change the semantic type of a column: table[column].stype = "text" Args: df: The data frame to create this table from. name: The name of this table. primary_key: The name of the primary key of this table, if it exists. time_column: The name of the time column of this table, if it exists. end_time_column: The name of the end time column of this table, if it exists. """
[docs] def __init__( self, df: pd.DataFrame, name: str, primary_key: Optional[str] = None, time_column: Optional[str] = None, end_time_column: Optional[str] = None, ) -> None: if df.empty: raise ValueError("Data frame must have at least one row") if isinstance(df.columns, pd.MultiIndex): raise ValueError("Data frame must not have a multi-index") if not df.columns.is_unique: raise ValueError("Data frame must have unique column names") if any(col == '' for col in df.columns): raise ValueError("Data frame must have non-empty column names") self._data = df.copy(deep=False) super().__init__( name=name, columns=list(df.columns), primary_key=primary_key, time_column=time_column, end_time_column=end_time_column, )
[docs] def infer_metadata(self, verbose: bool = True) -> Self: r"""Infers metadata, *i.e.*, primary keys and time columns, in the table. Args: verbose: Whether to print verbose output. """ logs = [] # Try to detect primary key if not set: if not self.has_primary_key(): def is_candidate(column: Column) -> bool: if column.stype == Stype.ID: return True if all(column.stype != Stype.ID for column in self.columns): if self.name == column.name: return True if (self.name.endswith('s') and self.name[:-1] == column.name): return True return False candidates = [ column.name for column in self.columns if is_candidate(column) ] if primary_key := utils.detect_primary_key( table_name=self.name, df=self._data, candidates=candidates, ): self.primary_key = primary_key logs.append(f"primary key '{primary_key}'") # Try to detect time column if not set: if not self.has_time_column(): candidates = [ column.name for column in self.columns if column.stype == Stype.timestamp and column.name != self._end_time_column ] if time_column := utils.detect_time_column(self._data, candidates): self.time_column = time_column logs.append(f"time column '{time_column}'") if verbose and len(logs) > 0: print(f"Detected {' and '.join(logs)} in table '{self.name}'") return self
def _has_source_column(self, name: str) -> bool: return name in self._data.columns def _get_source_dtype(self, name: str) -> Dtype: return utils.to_dtype(self._data[name]) def _get_source_stype(self, name: str, dtype: Dtype) -> Stype: return utils.infer_stype(self._data[name], name, dtype) def _num_rows(self) -> Optional[int]: return len(self._data)