kumoai.experimental.rfm.Graph#
- class kumoai.experimental.rfm.Graph[source]#
Bases:
objectA graph of
Tableobjects, akin to relationships between tables in a relational database.Creating a graph is the final step of data definition; after a
Graphis created, you can use it to initialize the Kumo Relational Foundation Model (KumoRFM).>>> >>> import pandas as pd >>> import kumoai.experimental.rfm as rfm >>> # Load data frames into memory: >>> df1 = pd.DataFrame(...) >>> df2 = pd.DataFrame(...) >>> df3 = pd.DataFrame(...) >>> # Define tables from data frames: >>> table1 = rfm.LocalTable(name="table1", data=df1) >>> table2 = rfm.LocalTable(name="table2", data=df2) >>> table3 = rfm.LocalTable(name="table3", data=df3) >>> # Create a graph from a dictionary of tables: >>> graph = rfm.Graph({ ... "table1": table1, ... "table2": table2, ... "table3": table3, ... }) >>> # Infer table metadata: >>> graph.infer_metadata() >>> # Infer links/edges: >>> graph.infer_links() >>> # Inspect table metadata: >>> for table in graph.tables.values(): ... table.print_metadata() >>> # Visualize graph: >>> graph.visualize() >>> # Add/Remove edges between tables: >>> graph.link(src_table="table1", fkey="id1", dst_table="table2") >>> graph.unlink(src_table="table1", fkey="id1", dst_table="table2") >>> # Validate graph: >>> graph.validate()
- classmethod from_data(df_dict, edges=None, infer_metadata=True, verbose=True)[source]#
Creates a
Graphfrom a dictionary ofpandas.DataFrameobjects.Automatically infers table metadata and links by default.
>>> >>> import pandas as pd >>> import kumoai.experimental.rfm as rfm >>> # Load data frames into memory: >>> df1 = pd.DataFrame(...) >>> df2 = pd.DataFrame(...) >>> df3 = pd.DataFrame(...) >>> # Create a graph from a dictionary of data frames: >>> graph = rfm.Graph.from_data({ ... "table1": df1, ... "table2": df2, ... "table3": df3, ... })
- Parameters:
df_dict (
dict[str,DataFrame]) – A dictionary of data frames, where the keys are the names of the tables and the values hold table data.edges (
Optional[Sequence[Union[Edge,Dict[str,str],Tuple[str,str,str]]]]) – An optional list ofEdgeobjects to add to the graph. If not provided, edges will be automatically inferred from the data in caseinfer_metadata=True.infer_metadata (
bool) – Whether to infer metadata for all tables in the graph.verbose (
bool) – Whether to print verbose output.
- Return type:
Self
- classmethod from_sqlite(connection, tables=None, edges=None, infer_metadata=True, verbose=True)[source]#
Creates a
Graphfrom asqlitedatabase.Automatically infers table metadata and links by default.
>>> >>> import kumoai.experimental.rfm as rfm >>> # Create a graph from a SQLite database: >>> graph = rfm.Graph.from_sqlite('data.db') >>> # Fine-grained control over table specification: >>> graph = rfm.Graph.from_sqlite('data.db', tables=[ ... 'USERS', ... dict(name='ORDERS', source_name='ORDERS_SNAPSHOT'), ... dict(name='ITEMS', primary_key='ITEM_ID'), ... ])
- Parameters:
connection (
Union[AdbcSqliteConnection,SqliteConnectionConfig,str,Path,dict[str,Any]]) – An open connection fromconnect()or the path to the database file.tables (
Optional[Sequence[str|dict[str,Any]]]) – Set of table names orSQLiteTablekeyword arguments to include. IfNone, will add all tables present in the database.edges (
Optional[Sequence[Union[Edge,Dict[str,str],Tuple[str,str,str]]]]) – An optional list ofEdgeobjects to add to the graph. If not provided, edges will be automatically inferred from the data in caseinfer_metadata=True.infer_metadata (
bool) – Whether to infer missing metadata for all tables in the graph.verbose (
bool) – Whether to print verbose output.
- Return type:
Self
- classmethod from_snowflake(connection=None, tables=None, database=None, schema=None, edges=None, infer_metadata=True, verbose=True)[source]#
Creates a
Graphfrom asnowflakedatabase and schema.Automatically infers table metadata and links by default.
>>> >>> import kumoai.experimental.rfm as rfm >>> # Create a graph directly in a Snowflake notebook: >>> graph = rfm.Graph.from_snowflake(schema='my_schema') >>> # Fine-grained control over table specification: >>> graph = rfm.Graph.from_snowflake(tables=[ ... 'USERS', ... dict(name='ORDERS', source_name='ORDERS_SNAPSHOT'), ... dict(name='ITEMS', schema='OTHER_SCHEMA'), ... ], database='DEFAULT_DB', schema='DEFAULT_SCHEMA')
- Parameters:
connection (
Union[SnowflakeConnection,dict[str,Any],None]) – An open connection fromconnect()or thesnowflakeconnector keyword arguments to open a new connection. IfNone, will re-use an active session in case it exists, or create a new connection from credentials stored in environment variables.tables (
Optional[Sequence[str|dict[str,Any]]]) – Set of table names orSnowTablekeyword arguments to include. IfNone, will add all tables present in the current database and schema.edges (
Optional[Sequence[Union[Edge,Dict[str,str],Tuple[str,str,str]]]]) – An optional list ofEdgeobjects to add to the graph. If not provided, edges will be automatically inferred from the data in caseinfer_metadata=True.infer_metadata (
bool) – Whether to infer metadata for all tables in the graph.verbose (
bool) – Whether to print verbose output.
- Return type:
Self
- classmethod from_relbench(dataset, verbose=True)[source]#
Loads a RelBench dataset into a
Graphinstance.>>> >>> import kumoai.experimental.rfm as rfm >>> graph = rfm.Graph.from_relbench("f1")
- classmethod graph_and_pquery_from_timeseries(df, timeseries_col, timestamps_col=None, time_delta=None, anchor_time=None, entity_col=None, num_timeframes=1)[source]#
Creates a
Graphand predictive query string from a time-series dataset stored as a single flat table.Many forecasting datasets arrive as a single table where each row represents one entity and a column holds an array of historical observations. This method converts such a table into the two-table (entity + target) structure expected by KumoRFM and returns a ready-to-use predictive query.
The input
dfis split into:An entity table - one row per input row, containing all columns except
timeseries_col(andtimestamps_colwhen given).A target table - one row per observation, with a foreign-key to the entity table, a timestamp column, and the observed value.
>>> >>> import pandas as pd >>> import kumoai.experimental.rfm as rfm >>> df = pd.DataFrame({ ... 'customer_id': [1, 2, 3], ... 'sales': [[10, 20, 15, 30], [5, 8, 12], [100, 95, 80]], ... }) >>> anchor = pd.Timestamp('2024-01-10') >>> graph, pquery = rfm.Graph.graph_and_pquery_from_timeseries( ... df, ... timeseries_col='sales', ... entity_col='customer_id', ... time_delta=pd.Timedelta('1D'), ... anchor_time=anchor, ... num_timeframes=4, ... ) >>> # pquery == ("PREDICT MAX(target.value, 0, 1, days) " >>> # "FORECAST 4 TIMEFRAMES " >>> # "FOR EACH entity.customer_id") >>> model = rfm.KumoRFM(graph) >>> result = model.predict(pquery, anchor_time=anchor)
- Parameters:
df (
DataFrame) – Input DataFrame where each row represents one entity andtimeseries_colholds a list or array of scalar observations for that entity.timeseries_col (
str) – Name of the column containing per-entity observation arrays.timestamps_col (
Optional[str]) – Optional name of the column containing per-entity timestamp arrays (one timestamp per observation value). Must be the same length astimeseries_colfor every row. WhenNone, synthetic timestamps are generated backwards fromanchor_timeusingtime_delta.time_delta (
Optional[Timedelta]) – Step size between consecutive observations. Required whentimestamps_colisNone. Also controls the prediction-window size in the generated pquery. Whentimestamps_colis provided andtime_deltaisNone, the typical step is inferred from the median inter-observation interval across all entities.anchor_time (
Optional[Timestamp]) – The forecast cutoff timestamp. Required whentimestamps_colisNone; used to place synthetic timestamps so that the last observation falls atanchor_time - time_delta. Pass the same value toKumoRFM.predict()asanchor_time.entity_col (
Optional[str]) – Name of an existing column to use as the entity primary key. WhenNone, integer IDs[0, 1, ..., n-1]are generated and stored in a new column namedentity_id.num_timeframes (
int) – Number of timeframes to forecast.
- Return type:
- Returns:
A tuple
(graph, pquery)where graph is aGraphready for use withKumoRFMand pquery is a predictive-query string for single-step forecasting. Pass pquery toKumoRFM.predict()together withanchor_timeto generate forecasts.
- has_table(name)[source]#
Returns
Trueif the graph has a table with namename;Falseotherwise.- Return type:
- add_table(table)[source]#
Adds a table to the graph.
- Parameters:
table (
Table) – The table to add.- Raises:
KeyError – If a table with the same name already exists in the graph.
ValueError – If the table belongs to a different backend than the rest of the tables in the graph.
- Return type:
Self
- property metadata: DataFrame#
Returns a
pandas.DataFrameobject containing metadata information about the tables in this graph.The returned dataframe has columns
"Name","Primary Key","Time Column", and"End Time Column", which provide an aggregated view of the properties of the tables of this graph.Example
>>> >>> import kumoai.experimental.rfm as rfm >>> graph = rfm.Graph(tables=...).infer_metadata() >>> graph.metadata Name Primary Key Time Column End Time Column 0 users user_id - -
- print_metadata()[source]#
Prints the
metadata()of the graph.- Return type:
- infer_metadata(verbose=True)[source]#
Infers metadata for all tables in the graph.
- Parameters:
verbose (
bool) – Whether to print verbose output.- Return type:
Self
Note
For more information, please see
kumoai.experimental.rfm.Table.infer_metadata().
- link(src_table, fkey, dst_table)[source]#
Links two tables (
src_tableanddst_table) from the foreign keyfkeyin the source table to the primary key in the destination table.The link is treated as bidirectional.
- Parameters:
src_table (
str|Table) – The name of the source table of the edge. This table must have a foreign key with namefkeythat links to the primary key in the destination table.fkey (
str) – The name of the foreign key in the source table.dst_table (
str|Table) – The name of the destination table of the edge. This table must have a primary key that links to the source table’s foreign key.
- Raises:
ValueError – if the edge is already present in the graph, if the source table does not exist in the graph, if the destination table does not exist in the graph, if the source key does not exist in the source table.
- Return type:
Self
- infer_links(verbose=True)[source]#
Infers missing links for the tables and adds them as edges to the graph.
- Parameters:
verbose (
bool) – Whether to print verbose output.- Return type:
Self
- validate()[source]#
Validates the graph to ensure that all relevant metadata is specified for its tables and edges.
Concretely, validation ensures that edges properly link foreign keys to primary keys between valid tables. It additionally ensures that primary and foreign keys between tables in an
Edgeare of the same data type.- Raises:
ValueError – if validation fails.
- Return type:
Self
- visualize(path=None, show_columns=True, backend='auto')[source]#
Visualizes the tables and edges in this graph.
- Parameters:
path (
Union[Path,str,BytesIO,None]) – A path to write the produced image to. IfNone, the image will not be written to disk.show_columns (
bool) – Whether to show all columns of every table in the graph. IfFalse, will only show the primary key, foreign key(s), and time column of each table.backend (
Literal['auto','graphviz','mermaid']) – The visualization backend to use.autochooses the backend based on environment and availability.
- Return type: