Introduction by Example#

Note

This tutorial is also available as a Google Colab, which you can copy and run end-to-end with your API key.

The Kumo platform makes machine learning on relational data simple, performant, and scalable. The Kumo SDK exposes an intuitive and composable interface atop the Kumo platform, enabling you to easily integrate Kumo with your existing iteration, testing, and production workflows.

Here, we shortly introduce the fundamental concepts of Kumo, and their use in the Kumo SDK. We motivate this use-case with a simple and well-known business use-case: predicting customer lifetime value from past behavior (represented as a relational dataset of customers, product stock, and transactions). The full dataset is located at s3://kumo-public-datasets/customerltv_mini/; please feel free to follow along.

Note

For a complete overview of the Kumo platform, please also read the Kumo documentation alongside this document.

Initializing the SDK#

Please refer to Installation for information about installing the SDK, and API Key Provisioning and Management to set up the SDK to work with your Kumo platform. Once you have completed these sections, you should be able to run the init() method succesfully, which will look like either

import kumoai as kumo
kumo.init(url="https://<customer_id>.kumoai.cloud/api", api_key=API_KEY)

for the Kumo SaaS and Databricks edition, or

import kumoai as kumo
credentials = {
    "user": <your username>,
    "password": <your password>,
    "account": <your account>,
}
kumo.init(url="https://<kumo_spcs_deployment_url>/api", snowflake_credentials=credentials)

for the Kumo Snowpark Container Services edition.

Inspecting Source Tables#

Once you’ve initialized the SDK, the first step to working with your data is defining a connector to your source tables. The Kumo SDK supports creating connectors to data on Amazon S3 with a S3Connector, Snowflake with a SnowflakeConnector, or Databricks with a DatabricksConnector. Here, we work with data on S3, but equivalent steps can be taken with other supported data warehouses. Connecting multiple tables across multiple connectors is supported (for example, you can use S3 and Snowflake together).

Warning

If you are using the Kumo Snowpark Container Services edition, only SnowflakeConnector is supported.

Creating a connector to a dataset on S3 is as simple as specifying the root directory of your data:

connector = kumo.S3Connector(root_dir="s3://kumo-public-datasets/customerltv_mini/")

after which tables can be accessed with Python indexing semantics, or with the table() method. The following code represents three different ways to access the tables behind the customerltv_mini directory; all are equivalent.

# Access the 'customer' table by indexing into the connector:
customer_src = connector['customer']

# Access the 'transaction' table by explicitly calling the `.table`
# method on the connector:
transaction_src = connector.table('transaction')

# Create a connector without a root directory, and obtain a table by
# passing the full table path:
stock_src = kumo.S3Connector().table('s3://kumo-public-datasets/customerltv_mini/stock')

The tables customer_src, transaction_src and stock_src are objects of type SourceTable, which support basic operations to verify the types and raw data you have connected to Kumo. While the package reference provides a full set of details, some examples include viewing a sample of the source data (as a DataFrame) or viewing the source columns and their data types:

print(customer_src.head())
>>
    CustomerID
428    16909.0
312    14002.0
306    17101.0
141    13385.0
273    14390.0

print(len(transaction_src.columns))
>> 8

Note

For tables with semantically meaningful text columns, Kumo supports a language model integration that allows for modeling to utilize powerful large language model embeddings, e.g. from OpenAI’s GPT. Please see add_llm() for more details.

Alongside viewing source table raw data, you can additionally perform data transformations with your own data platform directly alongside the Kumo SDK. For example, with pyspark, it is possible to transform the transactions table as follows:

from pyspark.sql.functions import col

root_dir = "s3://kumo-public-datasets/customerltv_mini/"

# An output directory (e.g. on S3) that you can write to, and Kumo can
# read from:
output_dir = ...

# Perform transformation with Spark
spark.read.parquet(f"{root_dir}/transaction") \
    .withColumn("TotalPrice", col("Quantity") * col("UnitPrice")) \
    .write.format("parquet").option("header","true").mode("Overwrite") \
    .save(f"{output_dir}/transaction_altered/")

# Access the altered table from the same connector:
assert S3Connector(output_dir).has_table("transaction_altered")
print("Transaction price: ", connector["transaction_altered"].head(num_rows=2)["TotalPrice"])

Creating Kumo Tables#

Once you’ve connected your source tables and applied any necessary transformations, you can next construct a Graph consisting of Table s.

A Kumo Graph represents a connected set of Tables, with each table fully specifying the relevant metadata (including selected source columns, column data type and semantic type, and relational constraint information) of SourceTables for modeling purposes.

A Table can be constructed from a SourceTable in multiple ways, and modified as necessary. The simplest approach is to call from_source_table(), as follows:

# NOTE if `columns` is not specified, all source columns are included:
customer = kumo.Table.from_source_table(
    source_table=customer_src,
    primary_key='CustomerID',
).infer_metadata()

transaction = kumo.Table.from_source_table(
    source_table=transaction_src,
    time_column='InvoiceDate',
).infer_metadata()

Here, we ask Kumo to convert source tables to Kumo tables, and infer all unspecified metadata. To verify the metadata that was inferred for these tables, we can call the metadata property, which shows a condensed view of the infromation associated with a table:

# Formatted with `tabulate`:
>>> print(customer.metadata)

+----+-----------+---------+---------+------------------+------------------+----------------------+
|    | name      | dtype   | stype   | is_primary_key   | is_time_column   | is_end_time_column   |
|----+-----------+---------+---------+------------------+------------------+----------------------|
|  0 | StockCode | string  | ID      | True             | False            | False                |
+----+-----------+---------+---------+------------------+------------------+----------------------+

If any column properties are not specified to your liking, you can additionally edit these properties by accessing their names and modifying them in the table.

You can also choose to specify the table from the ground-up, optionally inferring metadata for any non-fully-specified columns:

stock = kumo.Table(
    source_table=stock_src,
    columns=dict(name='StockCode', stype='ID'),  # will infer dtype='string'
    primary_key='StockCode',
).infer_metadata()

# Validate the table's correctness:
stock.validate(verbose=True)

No matter how you create your table, Table additionally exposes methods to inspect a table’s metadata and adjust included columns, data types, semantic types, and other relevant metadata.

# Set and access a data type for a column ("StockCode") in the stock table;
# this can be done for all properties of the table.
stock.column("StockCode").dtype = "string"
print(stock["StockCode"].dtype)

Note that column() returns a Column object, which contains the relevant metadata for the column of a table.

Creating a Graph#

After defining all Table objects, we next construct a Graph over these tables. A Graph connects the Tables by their primary key / foreign key relationships, and can be constructed by specifying the tables that partake in it along with these relationships.

graph = kumo.Graph(
    # These are the tables that participate in the graph: the keys of this
    # dictionary are the names of the tables, and the values are the Table
    # objects that correspond to these names:
    tables={
        'customer': customer,
        'stock': stock,
        'transaction': transaction,
    },

    # These are the edges that define the primary key / foreign key
    # relationships between the tables defined above. Here, `src_table`
    # is the table that has the foreign key `fkey`, which maps to the
    # table `dst_table`'s primary key:`
    edges=[
        dict(src_table='transaction', fkey='StockCode', dst_table='stock'),
        dict(src_table='transaction', fkey='CustomerID', dst_table='customer'),
    ],
)

# Validate the graph's correctness:
graph.validate(verbose=True)

Writing a Predictive Query#

Once you’ve set up the Graph of your Tables, you can define a machine learning problem as a Kumo PredictiveQuery on your Graph. Predictive queries are written using the predictive query language (PQL), a concise SQL-like syntax that allows you to define a model for a new business problem. For information on the construction of a query string, please visit the Kumo documentation.

In this example, we’ll be predicting customer lifetime value, which can be modeled as a regression problem to predict the maximum quantity of transactions for each customer over the next 30 days, given that the customer has made over 15 units worth of transactions in the past 7 days:

pquery = kumo.PredictiveQuery(
    graph=graph,
    query=(
        "PREDICT MAX(transaction.Quantity, 0, 30)\n"
        "FOR EACH customer.CustomerID\n"
        "ASSUMING SUM(transaction.UnitPrice, 0, 7, days) > 15"
    ),
)

# Validate the predictive query syntax:
pquery.validate(verbose=True)

Training a Model and Generating Predictions#

To recap: starting with raw data (in the form of SourceTable objects), we created a Graph consisting of Kumo Table objects, with the graph specifying relationships between the tables and the tables specifying machine learning metadata for each table. We next defined a PredictiveQuery to represent a machine learning problem as a statement in Kumo’s querying language.

We can now train a Kumo model with two simple steps:

model_plan = pquery.suggest_model_plan()
trainer = kumo.Trainer(model_plan)
training_job = trainer.fit(
    graph=graph,
    train_table=pquery.generate_training_table(non_blocking=True),
    non_blocking=False,
)
print(f"Training metrics: {training_job.metrics()}")

Let’s step through each of these lines of code. Line 1 defines the Kumo modeling plan that the predictive query suggests for use in training. You can either use the default model plan directly (as is done above), or can adjust any of the parameters to your liking. Line 2 creates a Trainer object initialized with the model plan, which manages the training of your query. Line 3’s call to fit() accepts a graph (created above) and a training table (produced by the predicitve query), and trains a model. Line 4 outputs metrics for the job – that’s it!

Note

The Kumo SDK makes extensive use of non_blocking as an optional parameter for long-running operations. Setting this flag to True lets a long-running operation return immediately, returning a Future object that tracks the operation as it runs in the background. Setting this flag to False lets a long-running operation wait until its completion (success or failure) before returning. Please see the package reference for more detials.

Once a model has been trained, we can use it to generate batch predictions that we can write to an external data source. This can be achieved with the following code:

prediction_job = trainer.predict(
    graph=graph,
    prediction_table=pquery.generate_prediction_table(non_blocking=True),
    output_types={'predictions', 'embeddings'},
    output_connector=connector,  # the same S3 connector from the start
    output_table_name='kumo_predictions',
    training_job_id=training_job.job_id,  # use our training job's model
    non_blocking=False,
)
print(f'Batch prediction job summary: {prediction_job.summary()}')

which will generate batch predictions to the same connector that contained our source data.

Next Steps#

While this example covered many of the core concepts underpinning the Kumo SDK, the SDK provides much more advanced functionality to help improve model iteration speed, evaluate champion/challenger models in production use-cases, integrate cleanly with upstream and downstream data pipelines, and more. Please avail yourself of the full set of package documentation and reach out to your sales engineer with any further questions, comments, and concerns.

Full Code Example#

A full code example on the CustomerLTV dataset discussed above follows.

import kumoai as kumo

# Initialize the SDK:
kumo.init(url="https://<customer_id>.kumoai.cloud/api", api_key=API_KEY)

# Create a Connector:
connector = kumo.S3Connector("s3://kumo-public-datasets/customerltv_mini_integ_test/")

# Create Tables from SourceTables:
customer = kumo.Table.from_source_table(
    source_table=connector.table('customer'),
    primary_key='CustomerID,
).infer_metadata()

stock = kumo.Table.from_source_table(
    source_table=connector.table('stock'),
    primary_key='StockCode,
).infer_metadata()

transaction = kumo.Table.from_source_table(
    source_table=connector.table('transaction'),
    time_column='InvoiceDate',
).infer_metadata()

# Create a Graph:
graph = kumo.Graph(
    tables={
        'customer': customer,
        'stock': stock,
        'transaction': transaction,
    },
    edges=[
        dict(src_table='transaction', fkey='StockCode', dst_table='stock'),
        dict(src_table='transaction', fkey='CustomerID', dst_table='customer'),
    ],
)

# Validate the Graph:
graph.validate(verbose=True)

# Create a Predictive Query on the Graph:
pquery = kumo.PredictiveQuery(
    graph=graph,
    query=(
        "PREDICT MAX(transaction.Quantity, 0, 30)\n"
        "FOR EACH customer.CustomerID\n"
        "ASSUMING SUM(transaction.UnitPrice, 0, 7, days) > 15"
    ),
)

# Validate the predictive query syntax:
pquery.validate(verbose=True)

# Create a modeling plan, and a Trainer object to train a model:
model_plan = pquery.suggest_model_plan()
trainer = kumo.Trainer(model_plan)

# Train a model:
training_job = trainer.fit(
    graph=graph,
    train_table=pquery.generate_training_table(non_blocking=True),
    non_blocking=False,
)
print(f"Training metrics: {training_job.metrics()}")

# Predict on your trained model:
prediction_job = trainer.predict(
    graph=graph,
    prediction_table=pquery.generate_prediction_table(non_blocking=True),
    output_types={'predictions', 'embeddings'},
    output_connector=connector,
    output_table_name='kumo_predictions',
    training_job_id=training_job.job_id,  # use our training job's model
    non_blocking=False,
)
print(f'Batch prediction job summary: {prediction_job.summary()}')