Source code for kumoai.experimental.rfm.base.table

from abc import ABC, abstractmethod
from typing import Dict, List, Optional, Sequence

import pandas as pd
from kumoapi.source_table import UnavailableSourceTable
from kumoapi.table import Column as ColumnDefinition
from kumoapi.table import TableDefinition
from kumoapi.typing import Dtype, Stype
from typing_extensions import Self

from kumoai import in_notebook
from kumoai.experimental.rfm.base import Column


[docs] class Table(ABC): r"""A :class:`Table` fully specifies the relevant metadata of a single table, *i.e.* its selected columns, data types, semantic types, primary keys and time columns. Args: name: The name of this table. columns: The selected columns of this table. primary_key: The name of the primary key of this table, if it exists. time_column: The name of the time column of this table, if it exists. end_time_column: The name of the end time column of this table, if it exists. """
[docs] def __init__( self, name: str, columns: Optional[Sequence[str]] = None, primary_key: Optional[str] = None, time_column: Optional[str] = None, end_time_column: Optional[str] = None, ) -> None: self._name = name self._primary_key: Optional[str] = None self._time_column: Optional[str] = None self._end_time_column: Optional[str] = None self._columns: Dict[str, Column] = {} for column_name in columns or []: self.add_column(column_name) if primary_key is not None: if primary_key not in self: self.add_column(primary_key) self.primary_key = primary_key if time_column is not None: if time_column not in self: self.add_column(time_column) self.time_column = time_column if end_time_column is not None: if end_time_column not in self: self.add_column(end_time_column) self.end_time_column = end_time_column
@property def name(self) -> str: r"""The name of this table.""" return self._name # Data column #############################################################
[docs] def has_column(self, name: str) -> bool: r"""Returns ``True`` if this table holds a column with name ``name``; ``False`` otherwise. """ return name in self._columns
[docs] def column(self, name: str) -> Column: r"""Returns the data column named with name ``name`` in this table. Args: name: The name of the column. Raises: KeyError: If ``name`` is not present in this table. """ if not self.has_column(name): raise KeyError(f"Column '{name}' not found in table '{self.name}'") return self._columns[name]
@property def columns(self) -> List[Column]: r"""Returns a list of :class:`Column` objects that represent the columns in this table. """ return list(self._columns.values())
[docs] def add_column(self, name: str) -> Column: r"""Adds a column to this table. Args: name: The name of the column. Raises: KeyError: If ``name`` is already present in this table. """ if name in self: raise KeyError(f"Column '{name}' already exists in table " f"'{self.name}'") if not self._has_source_column(name): raise KeyError(f"Column '{name}' does not exist in the underlying " f"source table") try: dtype = self._get_source_dtype(name) except Exception as e: raise RuntimeError(f"Could not obtain data type for column " f"'{name}' in table '{self.name}'. Change " f"the data type of the column in the source " f"table or remove it from the table.") from e try: stype = self._get_source_stype(name, dtype) except Exception as e: raise RuntimeError(f"Could not obtain semantic type for column " f"'{name}' in table '{self.name}'. Change " f"the data type of the column in the source " f"table or remove it from the table.") from e self._columns[name] = Column( name=name, dtype=dtype, stype=stype, ) return self._columns[name]
[docs] def remove_column(self, name: str) -> Self: r"""Removes a column from this table. Args: name: The name of the column. Raises: KeyError: If ``name`` is not present in this table. """ if name not in self: raise KeyError(f"Column '{name}' not found in table '{self.name}'") if self._primary_key == name: self.primary_key = None if self._time_column == name: self.time_column = None if self._end_time_column == name: self.end_time_column = None del self._columns[name] return self
# Primary key #############################################################
[docs] def has_primary_key(self) -> bool: r"""Returns ``True``` if this table has a primary key; ``False`` otherwise. """ return self._primary_key is not None
@property def primary_key(self) -> Optional[Column]: r"""The primary key column of this table. The getter returns the primary key column of this table, or ``None`` if no such primary key is present. The setter sets a column as a primary key on this table, and raises a :class:`ValueError` if the primary key has a non-ID semantic type or if the column name does not match a column in the data frame. """ if self._primary_key is None: return None return self[self._primary_key] @primary_key.setter def primary_key(self, name: Optional[str]) -> None: if name is not None and name == self._time_column: raise ValueError(f"Cannot specify column '{name}' as a primary " f"key since it is already defined to be a time " f"column") if name is not None and name == self._end_time_column: raise ValueError(f"Cannot specify column '{name}' as a primary " f"key since it is already defined to be an end " f"time column") if self.primary_key is not None: self.primary_key._is_primary_key = False if name is None: self._primary_key = None return self[name].stype = Stype.ID self[name]._is_primary_key = True self._primary_key = name # Time column #############################################################
[docs] def has_time_column(self) -> bool: r"""Returns ``True`` if this table has a time column; ``False`` otherwise. """ return self._time_column is not None
@property def time_column(self) -> Optional[Column]: r"""The time column of this table. The getter returns the time column of this table, or ``None`` if no such time column is present. The setter sets a column as a time column on this table, and raises a :class:`ValueError` if the time column has a non-timestamp semantic type or if the column name does not match a column in the data frame. """ if self._time_column is None: return None return self[self._time_column] @time_column.setter def time_column(self, name: Optional[str]) -> None: if name is not None and name == self._primary_key: raise ValueError(f"Cannot specify column '{name}' as a time " f"column since it is already defined to be a " f"primary key") if name is not None and name == self._end_time_column: raise ValueError(f"Cannot specify column '{name}' as a time " f"column since it is already defined to be an " f"end time column") if self.time_column is not None: self.time_column._is_time_column = False if name is None: self._time_column = None return self[name].stype = Stype.timestamp self[name]._is_time_column = True self._time_column = name # End Time column #########################################################
[docs] def has_end_time_column(self) -> bool: r"""Returns ``True`` if this table has an end time column; ``False`` otherwise. """ return self._end_time_column is not None
@property def end_time_column(self) -> Optional[Column]: r"""The end time column of this table. The getter returns the end time column of this table, or ``None`` if no such end time column is present. The setter sets a column as an end time column on this table, and raises a :class:`ValueError` if the end time column has a non-timestamp semantic type or if the column name does not match a column in the data frame. """ if self._end_time_column is None: return None return self[self._end_time_column] @end_time_column.setter def end_time_column(self, name: Optional[str]) -> None: if name is not None and name == self._primary_key: raise ValueError(f"Cannot specify column '{name}' as an end time " f"column since it is already defined to be a " f"primary key") if name is not None and name == self._time_column: raise ValueError(f"Cannot specify column '{name}' as an end time " f"column since it is already defined to be a " f"time column") if self.end_time_column is not None: self.end_time_column._is_end_time_column = False if name is None: self._end_time_column = None return self[name].stype = Stype.timestamp self[name]._is_end_time_column = True self._end_time_column = name # Metadata ################################################################ @property def metadata(self) -> pd.DataFrame: r"""Returns a :class:`pandas.DataFrame` object containing metadata information about the columns in this table. The returned dataframe has columns ``name``, ``dtype``, ``stype``, ``is_primary_key``, ``is_time_column`` and ``is_end_time_column``, which provide an aggregate view of the properties of the columns of this table. Example: >>> # doctest: +SKIP >>> import kumoai.experimental.rfm as rfm >>> table = rfm.LocalTable(df=..., name=...).infer_metadata() >>> table.metadata name dtype stype is_primary_key is_time_column is_end_time_column 0 CustomerID float64 ID True False False """ # noqa: E501 cols = self.columns return pd.DataFrame({ 'name': pd.Series(dtype=str, data=[c.name for c in cols]), 'dtype': pd.Series(dtype=str, data=[c.dtype for c in cols]), 'stype': pd.Series(dtype=str, data=[c.stype for c in cols]), 'is_primary_key': pd.Series( dtype=bool, data=[self._primary_key == c.name for c in cols], ), 'is_time_column': pd.Series( dtype=bool, data=[self._time_column == c.name for c in cols], ), 'is_end_time_column': pd.Series( dtype=bool, data=[self._end_time_column == c.name for c in cols], ), })
[docs] def print_metadata(self) -> None: r"""Prints the :meth:`~metadata` of this table.""" num_rows = self._num_rows() num_rows_repr = ' ({num_rows:,} rows)' if num_rows is not None else '' if in_notebook(): from IPython.display import Markdown, display md_repr = f"### 🏷️ Metadata of Table `{self.name}`{num_rows_repr}" display(Markdown(md_repr)) df = self.metadata try: if hasattr(df.style, 'hide'): display(df.style.hide(axis='index')) # pandas=2 else: display(df.style.hide_index()) # pandas<1.3 except ImportError: print(df.to_string(index=False)) # missing jinja2 else: print(f"🏷️ Metadata of Table '{self.name}'{num_rows_repr}") print(self.metadata.to_string(index=False))
[docs] def infer_metadata(self, verbose: bool = True) -> Self: r"""Infers metadata, *i.e.*, primary keys and time columns, in the table. Args: verbose: Whether to print verbose output. """ logs = [] # Try to detect primary key if not set: if not self.has_primary_key(): def is_candidate(column: Column) -> bool: if column.stype == Stype.ID: return True if all(column.stype != Stype.ID for column in self.columns): if self.name == column.name: return True if (self.name.endswith('s') and self.name[:-1] == column.name): return True return False candidates = [ column.name for column in self.columns if is_candidate(column) ] if primary_key := self._infer_primary_key(candidates): self.primary_key = primary_key logs.append(f"primary key '{primary_key}'") # Try to detect time column if not set: if not self.has_time_column(): candidates = [ column.name for column in self.columns if column.stype == Stype.timestamp and column.name != self._end_time_column ] if time_column := self._infer_time_column(candidates): self.time_column = time_column logs.append(f"time column '{time_column}'") if verbose and len(logs) > 0: print(f"Detected {' and '.join(logs)} in table '{self.name}'") return self
# Helpers ################################################################# def _to_api_table_definition(self) -> TableDefinition: return TableDefinition( cols=[ ColumnDefinition(col.name, col.stype, col.dtype) for col in self.columns ], source_table=UnavailableSourceTable(table=self.name), pkey=self._primary_key, time_col=self._time_column, end_time_col=self._end_time_column, ) # Python builtins ######################################################### def __hash__(self) -> int: special_columns = [ self.primary_key, self.time_column, self.end_time_column, ] return hash(tuple(self.columns + special_columns)) def __contains__(self, name: str) -> bool: return self.has_column(name) def __getitem__(self, name: str) -> Column: return self.column(name) def __delitem__(self, name: str) -> None: self.remove_column(name) def __repr__(self) -> str: return (f'{self.__class__.__name__}(\n' f' name={self.name},\n' f' num_columns={len(self.columns)},\n' f' primary_key={self._primary_key},\n' f' time_column={self._time_column},\n' f' end_time_column={self._end_time_column},\n' f')') # Abstract method ######################################################### @abstractmethod def _has_source_column(self, name: str) -> bool: pass @abstractmethod def _get_source_dtype(self, name: str) -> Dtype: pass @abstractmethod def _get_source_stype(self, name: str, dtype: Dtype) -> Stype: pass @abstractmethod def _infer_primary_key(self, candidates: List[str]) -> Optional[str]: pass @abstractmethod def _infer_time_column(self, candidates: List[str]) -> Optional[str]: pass @abstractmethod def _num_rows(self) -> Optional[int]: pass