import logging
from typing import List, Literal, Mapping, Optional, Tuple, Union, overload
from kumoapi.jobs import (
GeneratePredictionTableRequest,
GenerateTrainTableRequest,
)
from kumoapi.model_plan import (
InferredType,
PredictionTableGenerationPlan,
RunMode,
SuggestModelPlanRequest,
TrainingTableGenerationPlan,
)
from kumoapi.pquery import PQueryResource
from kumoapi.task import TaskType
from typing_extensions import Self
from kumoai import global_state
from kumoai.client.jobs import (
GeneratePredictionTableJobID,
GenerateTrainTableJobID,
TrainingJobAPI,
)
from kumoai.graph import Graph
from kumoai.pquery.prediction_table import PredictionTable, PredictionTableJob
from kumoai.pquery.training_table import TrainingTable, TrainingTableJob
from kumoai.trainer import (
BaselineTrainer,
ModelPlan,
Trainer,
TrainingJob,
TrainingJobResult,
)
from kumoai.trainer.job import BaselineJob, BaselineJobResult
logger = logging.getLogger(__name__)
PredictiveQueryID = str
[docs]class PredictiveQuery:
r"""The Kumo predictive query is a declarative syntax for describing a
machine learning task. Predictive queries are written using the predictive
query language (PQL), a concise SQL-like syntax that allows you to define a
model for a new business problem.
A predictive query object can be created from a
:class:`~kumoai.graph.Graph` and a query string. For information on the
construction of a query string, please visit the Kumo
`documentation <https://docs.kumo.ai/docs/pquery-structure/>`__.
.. code-block:: python
import kumoai
# See `Graph` documentation for more information:
graph = kumoai.Graph(...)
# Create a predictive query representing a machine learning problem
# over this Graph:
pquery = kumoai.PredictiveQuery(
graph=graph,
query=(
"PREDICT MAX(transaction.Quantity, 0, 30) "
"FOR EACH customer.CustomerID"
),
)
# Validate the predictive query configuration, for syntax and
# correctness:
pquery.validate(verbose=True)
# Get the machine learning task type corresponding to this predictive
# query (e.g. binary classification, regression, link prediction, etc.)
print(pquery.get_task_type())
# Suggest a training table generation plan and use it to generate a
# training table from this query, to be used in `Trainer.fit`:
training_table_plan = pquery.suggest_training_table_plan()
training_table = pquery.generate_training_table(training_table_plan)
# Suggest a prediction table generation plan and use it to generate a
# prediction table from this query, to be used in `Trainer.predict`:
pred_table_plan = pquery.suggest_prediction_table_plan()
pred_table = pquery.generate_prediction_table(pred_table_plan)
Args:
graph: The :class:`~kumoai.graph.Graph` object which the predictive
query is defined over.
query: A string representation of the predictive query.
"""
[docs] def __init__(
self,
graph: Graph,
query: str,
) -> None:
self.graph = graph
self.query = query
# A predictive query owns a trainer object, which is used internally
# to support `fit` and `predict` directly on this object. A user can
# also inspect the training table, prediction table, and trainer
# objects, but cannot set them; any advanced configuration must be
# done directly via `Trainer`:
self._train_table: Optional[Union[TrainingTable,
TrainingTableJob]] = None
self._prediction_table: Optional[Union[PredictionTable,
PredictionTableJob]] = None
# Metadata ################################################################
@property
def id(self) -> str:
r"""Returns the unique ID for this predictive query, determined from
its schema and the schema of its associated graph. Two queries that
differ either in their syntax or in their graph will have different
ids.
"""
return self.save()
@property
def train_table(self) -> Union[TrainingTable, TrainingTableJob]:
r"""Returns the training table that was last generated by this
predictive query. If the predictive query has not yet generated a
training table, raises a :class:`ValueError`.
Note that the training table may be of type
:class:`~kumoai.pquery.TrainingTable` or
:class:`~kumoai.pquery.TrainingTableJob`, depending on whether the
training table was generated with or without waiting for its
completion, respectively.
"""
if not self._train_table:
raise ValueError(
"This predictive query has not yet generated a training "
"table. Please call `generate_training_table` to generate "
"a training table before proceeding.")
return self._train_table
@property
def prediction_table(self) -> Union[PredictionTable, PredictionTableJob]:
r"""Returns the prediction table that was last generated by this
predictive query. If the predictive query has not yet generated a
prediction table, raises a :class:`ValueError`.
Note that the prediction table may be of type
:class:`~kumoai.pquery.PredictionTable` or
:class:`~kumoai.pquery.PredictionTableJob`, depending on whether the
prediction table was generated with or without waiting for its
completion, respectively.
"""
if not self._prediction_table:
raise ValueError(
"This predictive query has not yet generated a prediction "
"table. Please call `generate_prediction_table` to generate a "
"prediction table before proceeding.")
return self._prediction_table
[docs] def get_task_type(self) -> TaskType:
r"""Returns the task type of this predictive query. The task type of
the query corresponds to the machine learning problem that this query
translates to in the Kumo platform; for more information about possible
task types, please visit the Kumo `documentation
<https://docs.kumo.ai/docs/task-types/>`__.
"""
try:
self.validate(verbose=False)
except ValueError as e:
raise ValueError(
f"Predictive query {self.query} is improperly configured, so "
f"a task type cannot be obtained. Please ensure your query "
f"has a valid configuration before proceeding. You can use "
f"the `validate` method to verify the validity of your query."
) from e
task_type, _ = global_state.client.pquery_api.infer_task_type(
pquery_string=self.query, graph_id=self.graph.id)
return task_type
[docs] def validate(self, verbose: bool = True) -> Self:
r"""Validates the syntax of this predictive query, ensuring that
the query is formulated correctly in Kumo's Predictive Query Language
and that the query makes semantic sense (defines a suitable predictive
problem) on this :class:`~kumoai.graph.Graph`.
Args:
verbose: Whether to log non-error output of this validation.
Raises:
ValueError:
if validation fails.
Example:
>>> import kumoai
>>> query = kumoai.PredictiveQuery(...) # doctest: +SKIP
>>> query.validate() # doctest: +SKIP
ValidationResponse(warnings=[], errors=[])
"""
self.graph.save() # Need a valid graph ID; also validates graph.
resp = global_state.client.pquery_api.validate(
self._to_api_pquery_resource())
if not resp.ok:
raise ValueError(resp.error_message())
if verbose:
if resp.empty():
logger.info("Query %s is configured correctly.", self.query)
else:
logger.warning(resp.message())
return self
# Persistence #############################################################
def _to_api_pquery_resource(
self,
name: Optional[str] = None,
) -> PQueryResource:
return PQueryResource(
name=name,
query_string=self.query,
graph=self.graph._to_api_graph_definition(),
desc="",
)
[docs] def save(self) -> PredictiveQueryID:
r"""Saves a predictive query to Kumo, returning a unique ID for this
query. The unique ID can later be used to load the predictive query
object.
Example:
>>> import kumoai
>>> query = kumoai.PredictiveQuery(...) # doctest: +SKIP
>>> query.save() # doctest: +SKIP
pquery-xxx
"""
try:
self.validate(verbose=False)
except ValueError as e:
raise ValueError(
f"Predictive query {self.query} is improperly configured, so "
f"it cannot be saved. Please ensure your query "
f"has a valid configuration before proceeding. You can use "
f"the `validate` method to verify the validity of your query."
) from e
self.graph.save()
return global_state.client.pquery_api.create(
pquery=self._to_api_pquery_resource(name=None))
[docs] def save_as_template(self, name: str) -> PredictiveQueryID:
r"""Saves a predictive query as a named, re-usable template to Kumo,
and returns the saved name as a response. This method can be used to
"templatize" / name a query configuration for ease of future
reusability.
Args:
name: The name of the template to save the query as. If the name
is already associated with another query, that query will be
overwritten.
Example:
>>> import kumoai
>>> query = kumoai.PredictiveQuery(...) # doctest: +SKIP
>>> query.save_as_template("name") # doctest: +SKIP
>>> loaded = kumoai.PredictiveQuery.load("name") # doctest: +SKIP
>>> loaded == query # doctest: +SKIP
True
"""
try:
self.validate(verbose=False)
except ValueError as e:
raise ValueError(
f"Predictive query {self.query} is improperly configured, so "
f"it cannot be saved. Please ensure your query "
f"has a valid configuration before proceeding. You can use "
f"the `validate` method to verify the validity of your query."
) from e
template_resource = global_state.client.pquery_api.get_if_exists(name)
if template_resource is not None:
template_string = template_resource.query_string
logger.warning(
("Predictive query template %s already exists, with "
"query string %s. This template will be overridden with "
"configuration %s."), name, template_string, self.query)
self.graph.save()
return global_state.client.pquery_api.create(
pquery=self._to_api_pquery_resource(name))
[docs] @classmethod
def load(cls, pq_id_or_template: str) -> 'PredictiveQuery':
r"""Loads a predictive query from either a predictive query ID or a
named template. Returns a :class:`~kumoai.pquery.PredictiveQuery`
object that contains the loaded query along with its associated graph,
tables, etc.
"""
api = global_state.client.pquery_api
res = api.get_if_exists(pq_id_or_template)
if not res:
raise ValueError(
f"Predictive query {pq_id_or_template} was not found.")
return cls(
graph=Graph._from_api_graph_definition(res.graph),
query=res.query_string,
)
[docs] @classmethod
def load_from_training_job(cls, training_job_id: str) -> 'PredictiveQuery':
r"""Loads a predictive query from a training job, regardless of the
training job's status. Returns a
:class:`~kumoai.pquery.PredictiveQuery` object that contains the loaded
query along with its associated graph, tables, etc.
"""
train_api: TrainingJobAPI = global_state.client.training_job_api
job = train_api.get(training_job_id)
id_or_name = job.config.pquery_id
return PredictiveQuery.load(pq_id_or_template=id_or_name)
# Training & Prediction Table Generation ##################################
@overload
def generate_training_table(
self,
plan: Optional[TrainingTableGenerationPlan] = None,
) -> TrainingTable:
pass
@overload
def generate_training_table(
self,
plan: Optional[TrainingTableGenerationPlan] = None,
*,
non_blocking: Literal[False],
) -> TrainingTable:
pass
@overload
def generate_training_table(
self,
plan: Optional[TrainingTableGenerationPlan] = None,
*,
non_blocking: Literal[True],
) -> TrainingTableJob:
pass
@overload
def generate_training_table(
self,
plan: Optional[TrainingTableGenerationPlan] = None,
*,
non_blocking: bool,
) -> Union[TrainingTable, TrainingTableJob]:
pass
[docs] def generate_training_table(
self,
plan: Optional[TrainingTableGenerationPlan] = None,
*,
non_blocking: bool = False,
custom_tags: Mapping[str, str] = {},
) -> Union[TrainingTable, TrainingTableJob]:
r"""Generates a training table from the specified :attr:`query`
string.
Args:
plan: A specification of the parameters for training table
generation. If not provided, will use an intelligently
generated default plan based on the query and graph. This plan
is equivalent to the plan inferred with
``suggest_training_table_plan(run_mode=RunMode.NORMAL)``.
non_blocking: Whether this operation should return immediately
after launching the training table generation job, or await
completion of the generated training table.
custom_tags: Additional, customer defined k-v tags to be associated
with the job to be launched. Job tags are useful for grouping
and searching jobs.
Returns:
Union[TrainingTable, TrainingTableJob]:
If ``non_blocking=False``, returns a training table object. If
``non_blocking=True``, returns a training table future object.
"""
pq_id = self.save()
# TODO(manan): improve this...
if not plan:
plan = self.suggest_training_table_plan()
train_table_job_api = global_state.client.generate_train_table_job_api
job_id: GenerateTrainTableJobID = train_table_job_api.create(
GenerateTrainTableRequest(
dict(custom_tags), pq_id, plan,
graph_snapshot_id=self.graph.snapshot(
non_blocking=non_blocking)))
self._train_table = TrainingTableJob(job_id=job_id)
if non_blocking:
return self._train_table
self._train_table = self._train_table.attach()
return self._train_table
@overload
def generate_prediction_table(
self,
plan: Optional[PredictionTableGenerationPlan] = None,
) -> PredictionTable:
pass
@overload
def generate_prediction_table(
self,
plan: Optional[PredictionTableGenerationPlan] = None,
*,
non_blocking: Literal[False],
) -> PredictionTable:
pass
@overload
def generate_prediction_table(
self,
plan: Optional[PredictionTableGenerationPlan] = None,
*,
non_blocking: Literal[True],
) -> PredictionTableJob:
pass
@overload
def generate_prediction_table(
self,
plan: Optional[PredictionTableGenerationPlan] = None,
*,
non_blocking: bool,
) -> Union[PredictionTable, PredictionTableJob]:
pass
[docs] def generate_prediction_table(
self,
plan: Optional[PredictionTableGenerationPlan] = None,
*,
non_blocking: bool = False,
custom_tags: Mapping[str, str] = {},
) -> Union[PredictionTable, PredictionTableJob]:
r"""Generates a prediction table from the predictive query
:attr:`query` string.
Args:
plan: A specification of the parameters for prediction table
generation. If not provided, will use an intelligently
generated default plan based on the query and graph. This plan
is equivalent to the plan inferred with
``suggest_prediction_table_plan(run_mode=RunMode.NORMAL)``.
non_blocking: Whether this operation should return immediately
after launching the prediction table generation job, or await
completion of the generated prediction table.
custom_tags: Additional, customer defined k-v tags to be associated
with the job to be launched. Job tags are useful for grouping
and searching jobs.
Returns:
Union[PredictionTable, PredictionTableJob]:
If ``non_blocking=False``, returns a prediction table object.
If ``non_blocking=True``, returns a prediction table future
object.
"""
pq_id = self.save()
if not plan:
plan = self.suggest_prediction_table_plan()
bp_table_api = global_state.client.generate_prediction_table_job_api
job_id: GeneratePredictionTableJobID = bp_table_api.create(
GeneratePredictionTableRequest(
dict(custom_tags), pq_id, plan,
graph_snapshot_id=self.graph.snapshot(
non_blocking=non_blocking)))
self._prediction_table = PredictionTableJob(job_id=job_id)
if non_blocking:
return self._prediction_table
self._prediction_table = self._prediction_table.result()
return self._prediction_table
# Training & Prediction ###################################################
[docs] def suggest_training_table_plan(
self,
run_mode: RunMode = RunMode.FAST,
) -> TrainingTableGenerationPlan:
r"""Suggests a training table generation plan given the predictive
query and graph. This training table generation plan can be used to
alter the approach Kumo uses to generate the training table for your
predictive query.
Args:
run_mode: A representation of how quickly you would like your
predictive query to complete. Faster run modes correspond to
lower training times, at the cost of potentially lower
performance.
"""
self.graph.save()
req = SuggestModelPlanRequest(
query_string=self.query,
graph_id=self.graph.id,
run_mode=run_mode,
)
return global_state.client.pquery_api.suggest_training_table_plan(req)
[docs] def suggest_prediction_table_plan(self, ) -> PredictionTableGenerationPlan:
r"""Suggests a prediction table generation plan given the predictive
query and graph. This prediction table generation plan can be used to
alter the approach Kumo uses to generate the prediction table for your
predictive query.
"""
return PredictionTableGenerationPlan(anchor_time=InferredType.VALUE)
[docs] def suggest_model_plan(
self,
run_mode: RunMode = RunMode.FAST,
) -> ModelPlan:
r"""Suggests a modeling plan given the predictive query and graph. This
model plan can be used to alter the approach Kumo uses to train your
machine learning model.
Args:
run_mode: A representation of how quickly you would like your
predictive query to complete. Faster run modes correspond to
lower training times, at the cost of potentially lower
performance.
"""
self.graph.save()
req = SuggestModelPlanRequest(
query_string=self.query,
graph_id=self.graph.id,
run_mode=run_mode,
)
return global_state.client.pquery_api.suggest_model_plan(req)
@overload
def fit(
self,
training_table_plan: Optional[TrainingTableGenerationPlan] = None,
model_plan: Optional[ModelPlan] = None,
) -> Tuple[Trainer, TrainingJobResult]:
pass
@overload
def fit(
self,
training_table_plan: Optional[TrainingTableGenerationPlan] = None,
model_plan: Optional[ModelPlan] = None,
*,
non_blocking: Literal[False],
) -> Tuple[Trainer, TrainingJobResult]:
pass
@overload
def fit(
self,
training_table_plan: Optional[TrainingTableGenerationPlan] = None,
model_plan: Optional[ModelPlan] = None,
*,
non_blocking: Literal[True],
) -> Tuple[Trainer, TrainingJob]:
pass
@overload
def fit(
self,
training_table_plan: Optional[TrainingTableGenerationPlan] = None,
model_plan: Optional[ModelPlan] = None,
*,
non_blocking: bool,
) -> Tuple[Trainer, Union[TrainingJobResult, TrainingJob]]:
pass
[docs] def fit(
self,
training_table_plan: Optional[TrainingTableGenerationPlan] = None,
model_plan: Optional[ModelPlan] = None,
*,
non_blocking: bool = False,
) -> Tuple[Trainer, Union[TrainingJobResult, TrainingJob]]:
r"""Trains a Kumo model on this predictive query, given optional
additional specifications of the training table generation plan and
the model plan.
Args:
training_table_plan: A specification of the parameters for training
table generation. If not provided, will use an intelligently
generated default plan based on the query and graph. This plan
is equivalent to the plan inferred with
``suggest_training_table_plan(run_mode=RunMode.NORMAL)``.
model_plan: A specification of the parameters for model training.
If not provided, will use an intelligently generated default
plan based on the query and graph. This plan
is equivalent to the plan inferred with
``suggest_model_plan(run_mode=RunMode.NORMAL)``.
non_blocking: Whether this operation should return immediately
after launching the training job, or await completion of the
training job.
Returns:
Tuple[Trainer, Union[TrainingJobResult, TrainingJob]]:
A tuple with two elements. The first element is the trainer
object used to launch the training job. The second element
is either a training job object (if ``non_blocking=True``)
or a training job future object (if ``non_blocking=False``).
"""
# If we have already generated the training table, use it with Trainer:
if self._train_table is None:
# Nonblocking generate:
self._train_table = self.generate_training_table(
training_table_plan, non_blocking=True)
# TODO(manan): what if `self._train_table` represents a failed job?
model_plan = model_plan or self.suggest_model_plan()
trainer = Trainer(model_plan)
return (trainer,
trainer.fit(self.graph, self.train_table,
non_blocking=non_blocking))
[docs] def generate_baseline(
self,
metrics: List[str],
train_table: Union[TrainingTable, TrainingTableJob],
*,
non_blocking: bool = False,
) -> Union[BaselineJob, BaselineJobResult]:
r"""Runs a baseline model on this predictive query, given metrics and
optional additional specifications of the training table generation
plan.
Args:
metrics (List[str]): A list to metrics that baseline model will be
evaluated on.
train_table (Union[TrainingTable, TrainingTableJob]): The
:class:`~kumoai.pquery.TrainingTable`, or in-progress
:class:`~kumoai.pquery.TrainingTableJob` that represents
the training data produced by a
:class:`~kumoai.pquery.PredictiveQuery` on :obj:`graph`.
non_blocking (bool): Whether this operation should
return immediately after launching the baseline job, or await
completion of the baseline job. Defaults to False.
Returns:
Union[BaselineJob, BaselineJobResult]: either a baseline job
object (if ``non_blocking=True``) or a baseline job future
object (if ``non_blocking=False``).
""" # noqa
baseline_trainer = BaselineTrainer(metrics)
return baseline_trainer.run(self.graph, train_table,
non_blocking=non_blocking)