Source code for kumoai.experimental.rfm.local_table

import logging
from typing import Any, Dict, List, Optional

import pandas as pd
from kumoapi.typing import Dtype, Stype
from typing_extensions import Self

from kumoai.experimental.rfm import utils
from kumoai.graph.column import Column

logger = logging.getLogger(__name__)

AUTO_PKEY_NAME = 'id'


[docs]class LocalTable: r"""A table backed by a :class:`pandas.DataFrame`. A :class:`LocalTable` fully specifies the relevant metadata, *i.e.* selected columns, column semantic types, primary keys and time columns. :class:`LocalTable` is used to create a :class:`LocalGraph`. .. code-block:: python import kumoai.experimental.rfm as rfm import pandas as pd # Load data from a CSV file: df = pd.read_csv("data.csv") # Create a table from a `pandas.DataFrame` and infer its metadata: table = rfm.LocalTable(df, table_name="my_table").infer_metadata() # Create a table explicitly: table = rfm.LocalTable( df=df, table_name="my_table", primary_key="id", time_column="time", ) # Change the semantic type of a column: table[column].stype = "text" Args: df: The data frame to create the table from. table_name: The name of the table. primary_key: The name of the primary key of this table, if it exists. time_column: The name of the time column of this table, if it exists. """
[docs] def __init__( self, df: pd.DataFrame, table_name: str, primary_key: Optional[str] = None, time_column: Optional[str] = None, ) -> None: validate_data(df) self._data = df.copy(deep=False) self.table_name = table_name self._columns: Dict[str, Column] = {} for column_name in df.columns: self._columns[column_name] = Column( name=column_name, dtype=utils.to_dtype(df[column_name].dtype, df[column_name]), ) self._autogen_primary_key: bool = False self._primary_key: Optional[str] = None if primary_key is not None: self.primary_key = primary_key self._time_column: Optional[str] = None if time_column is not None: self.time_column = time_column
# Data column #############################################################
[docs] def has_column(self, name: str) -> bool: r"""Returns ``True`` if this table holds a column with name ``name``; ``False`` otherwise. """ return name in self._columns
[docs] def column(self, name: str) -> Column: r"""Returns the data column named with name ``name`` in this table. Raises: KeyError: If ``name`` is not present in this table. """ if not self.has_column(name): raise KeyError(f"Column '{name}' not found " f"in table '{self.table_name}'") return self._columns[name]
@property def columns(self) -> List[Column]: r"""Returns a list of :class:`~kumoai.graph.Column` objects that represent the columns in this table. """ return list(self._columns.values()) # Primary key #############################################################
[docs] def has_primary_key(self) -> bool: r"""Returns ``True``` if this table has a primary key; ``False`` otherwise. """ return self._primary_key is not None
@property def primary_key(self) -> Optional[Column]: r"""The primary key column of this table. The getter returns the primary key column of this table, or ``None`` if no such primary key is present. The setter sets a column as a primary key on this table, and raises a :class:`ValueError` if the primary key has a non-ID semantic type or if the column name does not match a column in the underlying data frame. """ if not self.has_primary_key(): return None assert self._primary_key is not None return self._columns[self._primary_key] @primary_key.setter def primary_key(self, *args: Any, **kwargs: Any) -> None: col = Column._cast(*args, **kwargs) if getattr(col, 'name', None) == self._primary_key: return # Nothing to do. if self._primary_key is not None and self._autogen_primary_key: del self._data[self._primary_key] del self._columns[self._primary_key] self._autogen_primary_key = False if col is None: self._primary_key = None return if col.name not in self._columns: raise KeyError(f"Column '{col.name}' does not exist in the " f"underlying data frame") dtype = self[col.name].dtype assert dtype is not None if not Stype.ID.supports_dtype(dtype): raise ValueError(f"Column '{col.name}' cannot be set to a " f"primary key because it has an invalid dtype " f"(got '{dtype}')") self._columns[col.name].stype = Stype.ID self._primary_key = col.name def _add_default_primary_key(self) -> None: r"""Adds a default primary key column to the table. The default primary key column is an integer (int64) column ranging from ``0`` to ``len(df) - 1`` in the table. """ if self.has_primary_key(): return if AUTO_PKEY_NAME in self._data.columns: return self._autogen_primary_key = True self._data[AUTO_PKEY_NAME] = range(len(self._data)) self._columns[AUTO_PKEY_NAME] = Column( name=AUTO_PKEY_NAME, dtype=Dtype.int64, stype=Stype.ID, ) self._primary_key = AUTO_PKEY_NAME # Time column #############################################################
[docs] def has_time_column(self) -> bool: r"""Returns ``True`` if this table has a time column; ``False`` otherwise. """ return self._time_column is not None
@property def time_column(self) -> Optional[Column]: r"""The time column of this table. The getter returns the time column of this table, or ``None`` if no such time column is present. The setter sets a column as a time column on this table, and raises a :class:`ValueError` if the time column has a non-timestamp semantic type or if the column name does not match a column in the underlying data frame. """ if not self.has_time_column(): return None assert self._time_column is not None return self._columns[self._time_column] @time_column.setter def time_column(self, *args: Any, **kwargs: Any) -> None: col = Column._cast(*args, **kwargs) if col is None: self._time_column = None return if col.name not in self._columns: raise KeyError(f"Column '{col.name}' does not exist in the " f"underlying data frame") dtype = self[col.name].dtype assert dtype is not None if not Stype.timestamp.supports_dtype(dtype): raise ValueError(f"Column '{col.name}' cannot be set to a " f"time column because it has an invalid dtype " f"(got '{dtype}')") self._columns[col.name].stype = Stype.timestamp self._time_column = col.name # Metadata ################################################################ @property def metadata(self) -> pd.DataFrame: r"""Returns a :class:`pandas.DataFrame` object containing metadata information about the columns in this table. The returned dataframe has columns ``name``, ``dtype``, ``stype``, ``is_primary_key``, and ``is_time_column``, which provide an aggregate view of the properties of the columns of this table. Example: >>> import kumoai.experimental.rfm as rfm >>> table = rfm.LocalTable(df=..., table_name=...).infer_metadata() >>> table.metadata name dtype stype is_primary_key is_time_column 0 CustomerID float64 ID True False """ items = self._columns.items() col_names: List[str] = [i[0] for i in items] cols: List[Column] = [i[1] for i in items] return pd.DataFrame({ 'name': pd.Series(dtype=str, data=col_names), 'dtype': pd.Series( dtype=str, data=[c.dtype if c.dtype is not None else None for c in cols], ), 'stype': pd.Series( dtype=str, data=[c.stype if c.stype is not None else None for c in cols], ), 'is_primary_key': pd.Series(dtype=bool, data=[self.primary_key == c for c in cols]), 'is_time_column': pd.Series(dtype=bool, data=[self.time_column == c for c in cols]), })
[docs] def infer_metadata(self, verbose: bool = False) -> Self: r"""Infers metadata for all columns in the table. Args: verbose: Whether to print verbose output. """ for col in self.columns: # Infer metadata for each column: assert col.dtype is not None if col.stype is None: col.stype = utils.infer_stype( ser=self._data[col.name], column_name=col.name, dtype=col.dtype, ) # Try to detect primary key if not set: if not self.has_primary_key(): candidates = [ col.name for col in self.columns if col.stype is not None and col.stype == Stype.ID ] if primary_key := utils.detect_primary_key( table_name=self.table_name, df=self._data, candidates=candidates, ): if verbose: logger.info(f"Detected primary key '{primary_key}' in " f"'{self.table_name}'") self.primary_key = primary_key # Try to detect time column if not set: if not self.has_time_column(): candidates = [ col.name for col in self.columns if col.stype is not None and col.stype == Stype.timestamp ] if time_column := utils.detect_time_column(self._data, candidates): if verbose: logger.info(f"Detected time column '{time_column}' in " f"'{self.table_name}'") self.time_column = time_column return self
[docs] def validate(self) -> Self: r"""Validates the table configuration. Args: verbose: Whether to print validation messages. Raises: ValueError: If validation fails. """ for col in self.columns: # Validate column definitions: if col.dtype is None or col.stype is None: raise ValueError( f"Column {col.name} is not fully specified. Please " f"specify this column's data type and semantic type " f"before proceeding. {col.name} currently has a " f"data type of {col.dtype} and semantic type of " f"{col.stype}.") ser = self._data[col.name] if col.dtype != utils.to_dtype(ser.dtype, ser): raise ValueError(f"Column '{col.name}' has data type " f"'{col.dtype}' but data suggests " f"'{utils.to_dtype(ser.dtype, ser)}'. It is " f"not recommended to manually change the " f"data types in '{self.__class__.__name__}', " f"instead please modify the underlying data " f"frame directly.") if not col.stype.supports_dtype(col.dtype): raise ValueError(f"Column '{col.name}' has an incompatible " f"semantic type (got dtype='{col.dtype}' " f"and stype='{col.stype}')") if primary_key := self.primary_key: if primary_key.stype != Stype.ID: raise ValueError(f"Primary key '{self._primary_key}' must " f"have 'ID' semantic type " f"(got '{primary_key.stype}')") # Validate time columns if time_column := self.time_column: if time_column.stype != Stype.timestamp: raise ValueError(f"Time column '{self._time_column}' must " f"have 'timestamp' semantic type " f"(got '{time_column.stype}')") return self
# Class properties ######################################################## def __hash__(self) -> int: return hash(tuple(self.columns + [self.primary_key, self.time_column])) def __contains__(self, name: str) -> bool: return self.has_column(name) def __getitem__(self, name: str) -> Column: return self.column(name) def __repr__(self) -> str: col_names = str(list(self._columns.keys())).replace("'", "") pkey_name = self._primary_key if self.has_primary_key() else "None" t_name = self._time_column if self.has_time_column() else "None" return (f'{self.__class__.__name__}(\n' f' name={self.table_name},\n' f' data={self._data},\n' f' columns={col_names},\n' f' primary_key={pkey_name},\n' f' time_column={t_name},\n' f')')
# helpers def validate_data(data: pd.DataFrame) -> None: if data.empty: raise ValueError("Input DataFrame must have at least one row") if isinstance(data.index, pd.MultiIndex): raise ValueError("Input DataFrame must not have a multi-index") if isinstance(data.columns, pd.MultiIndex): raise ValueError("Input DataFrame must not have a multi-index") if not data.columns.is_unique: raise ValueError("Input DataFrame must have unique column names") if not all(col.replace('_', '').isalnum() for col in data.columns): raise ValueError("Input DataFrame must have alphanumeric column names") if '' in list(data.columns): raise ValueError("Input DataFrame must have non-empty column names")