from dataclasses import dataclass
from typing import Any, Dict, List, Optional
import pandas as pd
from kumoapi.source_table import UnavailableSourceTable
from kumoapi.table import Column as ColumnDefinition
from kumoapi.table import TableDefinition
from kumoapi.typing import Dtype, Stype
from typing_extensions import Self
from kumoai import in_notebook
from kumoai.experimental.rfm import utils
@dataclass(init=False, repr=False, eq=False)
class Column:
stype: Stype
def __init__(
self,
name: str,
dtype: Dtype,
stype: Stype,
is_primary_key: bool = False,
is_time_column: bool = False,
) -> None:
self._name = name
self._dtype = Dtype(dtype)
self._is_primary_key = is_primary_key
self._is_time_column = is_time_column
self.stype = Stype(stype)
@property
def name(self) -> str:
return self._name
@property
def dtype(self) -> Dtype:
return self._dtype
def __setattr__(self, key: str, val: Any) -> None:
if key == 'stype':
if isinstance(val, str):
val = Stype(val)
assert isinstance(val, Stype)
if not val.supports_dtype(self.dtype):
raise ValueError(f"Column '{self.name}' received an "
f"incompatible semantic type (got "
f"dtype='{self.dtype}' and stype='{val}')")
if self._is_primary_key and val != Stype.ID:
raise ValueError(f"Primary key '{self.name}' must have 'ID' "
f"semantic type (got '{val}')")
if self.name == self._is_time_column and val != Stype.timestamp:
raise ValueError(f"Time column '{self.name}' must have "
f"'timestamp' semantic type (got '{val}')")
super().__setattr__(key, val)
def __hash__(self) -> int:
return hash((self.name, self.stype, self.dtype))
def __eq__(self, other: Any) -> bool:
if not isinstance(other, Column):
return False
return hash(self) == hash(other)
def __repr__(self) -> str:
return (f'{self.__class__.__name__}(name={self.name}, '
f'stype={self.stype}, dtype={self.dtype})')
[docs]
class LocalTable:
r"""A table backed by a :class:`pandas.DataFrame`.
A :class:`LocalTable` fully specifies the relevant metadata, *i.e.*
selected columns, column semantic types, primary keys and time columns.
:class:`LocalTable` is used to create a :class:`LocalGraph`.
.. code-block:: python
import pandas as pd
import kumoai.experimental.rfm as rfm
# Load data from a CSV file:
df = pd.read_csv("data.csv")
# Create a table from a `pandas.DataFrame` and infer its metadata ...
table = rfm.LocalTable(df, name="my_table").infer_metadata()
# ... or create a table explicitly:
table = rfm.LocalTable(
df=df,
name="my_table",
primary_key="id",
time_column="time",
)
# Verify metadata:
table.print_metadata()
# Change the semantic type of a column:
table[column].stype = "text"
Args:
df: The data frame to create the table from.
name: The name of the table.
primary_key: The name of the primary key of this table, if it exists.
time_column: The name of the time column of this table, if it exists.
"""
[docs]
def __init__(
self,
df: pd.DataFrame,
name: str,
primary_key: Optional[str] = None,
time_column: Optional[str] = None,
) -> None:
if df.empty:
raise ValueError("Data frame must have at least one row")
if isinstance(df.columns, pd.MultiIndex):
raise ValueError("Data frame must not have a multi-index")
if not df.columns.is_unique:
raise ValueError("Data frame must have unique column names")
if any(col == '' for col in df.columns):
raise ValueError("Data frame must have non-empty column names")
df = df.copy(deep=False)
df.columns = df.columns.str.replace(r'\s+', '_', regex=True)
self._data = df
self._name = name
self._primary_key: Optional[str] = None
self._time_column: Optional[str] = None
self._columns: Dict[str, Column] = {}
for column_name in df.columns:
try:
dtype = utils.to_dtype(df[column_name])
except Exception as e:
raise RuntimeError(f"Data type inference for column "
f"'{column_name}' in table '{name}' "
f"failed. Consider changing the data type "
f"of the column or removing it from the "
f"table.") from e
try:
stype = utils.infer_stype(df[column_name], column_name, dtype)
except Exception as e:
raise RuntimeError(f"Semantic type inference for column "
f"'{column_name}' in table '{name}' "
f"failed. Consider changing the data type "
f"of the column or removing it from the "
f"table.") from e
self._columns[column_name] = Column(
name=column_name,
dtype=dtype,
stype=stype,
)
if primary_key is not None:
self.primary_key = primary_key
if time_column is not None:
self.time_column = time_column
@property
def name(self) -> str:
r"""The name of the table."""
return self._name
# Data column #############################################################
[docs]
def has_column(self, name: str) -> bool:
r"""Returns ``True`` if this table holds a column with name ``name``;
``False`` otherwise.
"""
return name in self._columns
[docs]
def column(self, name: str) -> Column:
r"""Returns the data column named with name ``name`` in this table.
Args:
name: The name of the column.
Raises:
KeyError: If ``name`` is not present in this table.
"""
if not self.has_column(name):
raise KeyError(f"Column '{name}' not found in table '{self.name}'")
return self._columns[name]
@property
def columns(self) -> List[Column]:
r"""Returns a list of :class:`Column` objects that represent the
columns in this table.
"""
return list(self._columns.values())
[docs]
def remove_column(self, name: str) -> Self:
r"""Removes a column from this table.
Args:
name: The name of the column.
Raises:
KeyError: If ``name`` is not present in this table.
"""
if not self.has_column(name):
raise KeyError(f"Column '{name}' not found in table '{self.name}'")
if self._primary_key == name:
self.primary_key = None
if self._time_column == name:
self.time_column = None
del self._columns[name]
return self
# Primary key #############################################################
[docs]
def has_primary_key(self) -> bool:
r"""Returns ``True``` if this table has a primary key; ``False``
otherwise.
"""
return self._primary_key is not None
@property
def primary_key(self) -> Optional[Column]:
r"""The primary key column of this table.
The getter returns the primary key column of this table, or ``None`` if
no such primary key is present.
The setter sets a column as a primary key on this table, and raises a
:class:`ValueError` if the primary key has a non-ID semantic type or
if the column name does not match a column in the data frame.
"""
if not self.has_primary_key():
return None
assert self._primary_key is not None
return self[self._primary_key]
@primary_key.setter
def primary_key(self, name: Optional[str]) -> None:
if name is not None and name == self._time_column:
raise ValueError(f"Cannot specify column '{name}' as a primary "
f"key since it is already defined to be a time "
f"column")
if self.primary_key is not None:
self.primary_key._is_primary_key = False
if name is None:
self._primary_key = None
return
self[name].stype = Stype.ID
self[name]._is_primary_key = True
self._primary_key = name
# Time column #############################################################
[docs]
def has_time_column(self) -> bool:
r"""Returns ``True`` if this table has a time column; ``False``
otherwise.
"""
return self._time_column is not None
@property
def time_column(self) -> Optional[Column]:
r"""The time column of this table.
The getter returns the time column of this table, or ``None`` if no
such time column is present.
The setter sets a column as a time column on this table, and raises a
:class:`ValueError` if the time column has a non-timestamp semantic
type or if the column name does not match a column in the data frame.
"""
if not self.has_time_column():
return None
assert self._time_column is not None
return self[self._time_column]
@time_column.setter
def time_column(self, name: Optional[str]) -> None:
if name is not None and name == self._primary_key:
raise ValueError(f"Cannot specify column '{name}' as a time "
f"column since it is already defined to be a "
f"primary key")
if self.time_column is not None:
self.time_column._is_time_column = False
if name is None:
self._time_column = None
return
self[name].stype = Stype.timestamp
self[name]._is_time_column = True
self._time_column = name
# Metadata ################################################################
@property
def metadata(self) -> pd.DataFrame:
r"""Returns a :class:`pandas.DataFrame` object containing metadata
information about the columns in this table.
The returned dataframe has columns ``name``, ``dtype``, ``stype``,
``is_primary_key``, and ``is_time_column``, which provide an aggregate
view of the properties of the columns of this table.
Example:
>>> import kumoai.experimental.rfm as rfm
>>> table = rfm.LocalTable(df=..., name=...).infer_metadata()
>>> table.metadata
name dtype stype is_primary_key is_time_column
0 CustomerID float64 ID True False
"""
cols = self.columns
return pd.DataFrame({
'name':
pd.Series(dtype=str, data=[c.name for c in cols]),
'dtype':
pd.Series(dtype=str, data=[c.dtype for c in cols]),
'stype':
pd.Series(dtype=str, data=[c.stype for c in cols]),
'is_primary_key':
pd.Series(
dtype=bool,
data=[self._primary_key == c.name for c in cols],
),
'is_time_column':
pd.Series(
dtype=bool,
data=[self._time_column == c.name for c in cols],
),
})
# Helpers #################################################################
def _to_api_table_definition(self) -> TableDefinition:
cols: List[ColumnDefinition] = []
for col in self.columns:
cols.append(ColumnDefinition(col.name, col.stype, col.dtype))
pkey = self._primary_key
time_col = self._time_column
source_table = UnavailableSourceTable(table=self.name)
return TableDefinition(
cols=cols,
source_table=source_table,
pkey=pkey,
time_col=time_col,
)
# Python builtins #########################################################
def __hash__(self) -> int:
return hash(tuple(self.columns + [self.primary_key, self.time_column]))
def __contains__(self, name: str) -> bool:
return self.has_column(name)
def __getitem__(self, name: str) -> Column:
return self.column(name)
def __delitem__(self, name: str) -> None:
self.remove_column(name)
def __repr__(self) -> str:
return (f'{self.__class__.__name__}(\n'
f' name={self.name},\n'
f' num_columns={len(self.columns)},\n'
f' primary_key={self._primary_key},\n'
f' time_column={self._time_column},\n'
f')')