Snowflake Connector#

KumoRFM can connect directly to Snowflake data warehouses, enabling predictions on enterprise-scale data without moving it out of Snowflake.

Installation#

The Snowflake backend requires the Snowflake connector:

pip install kumoai[snowflake]

Quick Start#

From a Snowflake notebook (uses the active session automatically):

import kumoai.experimental.rfm as rfm

graph = rfm.Graph.from_snowflake(schema="MY_SCHEMA")

With explicit credentials:

graph = rfm.Graph.from_snowflake(
    connection={
        "account": "my_account",
        "user": "my_user",
        "password": "my_password",
        "warehouse": "my_warehouse",
    },
    database="MY_DATABASE",
    schema="MY_SCHEMA",
)

This will:

  1. Connect to the Snowflake database and schema

  2. Discover all tables in the schema

  3. Infer column metadata (data types, semantic types, primary keys, time columns)

  4. Detect foreign key relationships

  5. Print a summary of the inferred metadata and links

Specifying Tables#

Control which tables to include and customize their configuration:

graph = rfm.Graph.from_snowflake(
    tables=[
        "USERS",                                           # Include by name
        dict(name="ORDERS", source_name="ORDERS_SNAPSHOT"),# Rename source
        dict(name="ITEMS", schema="OTHER_SCHEMA"),         # Different schema
    ],
    database="DEFAULT_DB",
    schema="DEFAULT_SCHEMA",
)

Table configuration options:

Key

Description

Required

name

The table name used in PQL queries

Yes

source_name

The actual table name in Snowflake (if different from name)

No

database

Override the default database for this table

No

schema

Override the default schema for this table

No

primary_key

Override the auto-detected primary key

No

Connection Options#

There are several ways to establish a Snowflake connection:

1. Active session (Snowflake notebooks):

# No connection needed — uses the active Snowpark session
graph = rfm.Graph.from_snowflake(schema="MY_SCHEMA")

2. Credentials dictionary:

graph = rfm.Graph.from_snowflake(connection={
    "account": "my_account",
    "user": "my_user",
    "password": "my_password",
    "warehouse": "my_warehouse",
})

3. Existing connection:

import snowflake.connector
conn = snowflake.connector.connect(...)
graph = rfm.Graph.from_snowflake(connection=conn, schema="MY_SCHEMA")

Database and Schema Defaults#

The database and schema parameters set defaults for all tables. If not specified, the current database and schema from the active session are used.

Individual tables can override these defaults using the database and schema keys in their configuration dictionary.

Controlling Metadata Inference#

graph = rfm.Graph.from_snowflake(
    schema="MY_SCHEMA",
    infer_metadata=False,  # Skip automatic type inference
    verbose=False,         # Suppress output
)

# Manually configure metadata afterwards:
graph.infer_metadata()
graph.infer_links()

Manual Edge Specification#

graph = rfm.Graph.from_snowflake(
    schema="MY_SCHEMA",
    edges=[
        ("ORDERS", "USER_ID", "USERS"),
        ("ORDERS", "ITEM_ID", "ITEMS"),
    ],
)

Supported Snowflake Types#

KumoRFM maps Snowflake data types as follows:

Snowflake Type

KumoRFM Dtype

Default Stype

NUMBER, DECIMAL, INT, BIGINT, FLOAT, DOUBLE

float or int

numerical

VARCHAR, STRING, TEXT, CHAR

string

categorical / text

BOOLEAN

bool

categorical

DATE, TIMESTAMP, TIMESTAMP_*

date

timestamp

ARRAY

stringlist

multicategorical

VECTOR

floatlist

sequence