Source code for kumoai.pquery.predictive_query

import logging
from typing import List, Literal, Mapping, Optional, Tuple, Union, overload

from kumoapi.jobs import (
    GeneratePredictionTableRequest,
    GenerateTrainTableRequest,
)
from kumoapi.model_plan import (
    InferredType,
    PredictionTableGenerationPlan,
    RunMode,
    SuggestModelPlanRequest,
    TrainingTableGenerationPlan,
)
from kumoapi.pquery import PQueryResource
from kumoapi.task import TaskType
from typing_extensions import Self

from kumoai import global_state
from kumoai.client.jobs import (
    GeneratePredictionTableJobID,
    GenerateTrainTableJobID,
    TrainingJobAPI,
)
from kumoai.graph import Graph
from kumoai.pquery.prediction_table import PredictionTable, PredictionTableJob
from kumoai.pquery.training_table import TrainingTable, TrainingTableJob
from kumoai.trainer import (
    BaselineTrainer,
    ModelPlan,
    Trainer,
    TrainingJob,
    TrainingJobResult,
)
from kumoai.trainer.job import BaselineJob, BaselineJobResult

logger = logging.getLogger(__name__)

PredictiveQueryID = str


[docs]class PredictiveQuery: r"""The Kumo predictive query is a declarative syntax for describing a machine learning task. Predictive queries are written using the predictive query language (PQL), a concise SQL-like syntax that allows you to define a model for a new business problem. A predictive query object can be created from a :class:`~kumoai.graph.Graph` and a query string. For information on the construction of a query string, please visit the Kumo `documentation <https://docs.kumo.ai/docs/pquery-structure/>`__. .. code-block:: python import kumoai # See `Graph` documentation for more information: graph = kumoai.Graph(...) # Create a predictive query representing a machine learning problem # over this Graph: pquery = kumoai.PredictiveQuery( graph=graph, query=( "PREDICT MAX(transaction.Quantity, 0, 30) " "FOR EACH customer.CustomerID" ), ) # Validate the predictive query configuration, for syntax and # correctness: pquery.validate(verbose=True) # Get the machine learning task type corresponding to this predictive # query (e.g. binary classification, regression, link prediction, etc.) print(pquery.get_task_type()) # Suggest a training table generation plan and use it to generate a # training table from this query, to be used in `Trainer.fit`: training_table_plan = pquery.suggest_training_table_plan() training_table = pquery.generate_training_table(training_table_plan) # Suggest a prediction table generation plan and use it to generate a # prediction table from this query, to be used in `Trainer.predict`: pred_table_plan = pquery.suggest_prediction_table_plan() pred_table = pquery.generate_prediction_table(pred_table_plan) Args: graph: The :class:`~kumoai.graph.Graph` object which the predictive query is defined over. query: A string representation of the predictive query. """
[docs] def __init__( self, graph: Graph, query: str, ) -> None: self.graph = graph self.query = query # A predictive query owns a trainer object, which is used internally # to support `fit` and `predict` directly on this object. A user can # also inspect the training table, prediction table, and trainer # objects, but cannot set them; any advanced configuration must be # done directly via `Trainer`: self._train_table: Optional[Union[TrainingTable, TrainingTableJob]] = None self._prediction_table: Optional[Union[PredictionTable, PredictionTableJob]] = None
# Metadata ################################################################ @property def id(self) -> str: r"""Returns the unique ID for this predictive query, determined from its schema and the schema of its associated graph. Two queries that differ either in their syntax or in their graph will have different ids. """ return self.save() @property def train_table(self) -> Union[TrainingTable, TrainingTableJob]: r"""Returns the training table that was last generated by this predictive query. If the predictive query has not yet generated a training table, raises a :class:`ValueError`. Note that the training table may be of type :class:`~kumoai.pquery.TrainingTable` or :class:`~kumoai.pquery.TrainingTableJob`, depending on whether the training table was generated with or without waiting for its completion, respectively. """ if not self._train_table: raise ValueError( "This predictive query has not yet generated a training " "table. Please call `generate_training_table` to generate " "a training table before proceeding.") return self._train_table @property def prediction_table(self) -> Union[PredictionTable, PredictionTableJob]: r"""Returns the prediction table that was last generated by this predictive query. If the predictive query has not yet generated a prediction table, raises a :class:`ValueError`. Note that the prediction table may be of type :class:`~kumoai.pquery.PredictionTable` or :class:`~kumoai.pquery.PredictionTableJob`, depending on whether the prediction table was generated with or without waiting for its completion, respectively. """ if not self._prediction_table: raise ValueError( "This predictive query has not yet generated a prediction " "table. Please call `generate_prediction_table` to generate a " "prediction table before proceeding.") return self._prediction_table
[docs] def get_task_type(self) -> TaskType: r"""Returns the task type of this predictive query. The task type of the query corresponds to the machine learning problem that this query translates to in the Kumo platform; for more information about possible task types, please visit the Kumo `documentation <https://docs.kumo.ai/docs/task-types/>`__. """ try: self.validate(verbose=False) except ValueError as e: raise ValueError( f"Predictive query {self.query} is improperly configured, so " f"a task type cannot be obtained. Please ensure your query " f"has a valid configuration before proceeding. You can use " f"the `validate` method to verify the validity of your query." ) from e task_type, _ = global_state.client.pquery_api.infer_task_type( pquery_string=self.query, graph_id=self.graph.id) return task_type
[docs] def validate(self, verbose: bool = True) -> Self: r"""Validates the syntax of this predictive query, ensuring that the query is formulated correctly in Kumo's Predictive Query Language and that the query makes semantic sense (defines a suitable predictive problem) on this :class:`~kumoai.graph.Graph`. Args: verbose: Whether to log non-error output of this validation. Raises: ValueError: if validation fails. Example: >>> import kumoai >>> query = kumoai.PredictiveQuery(...) # doctest: +SKIP >>> query.validate() # doctest: +SKIP ValidationResponse(warnings=[], errors=[]) """ self.graph.save() # Need a valid graph ID; also validates graph. resp = global_state.client.pquery_api.validate( self._to_api_pquery_resource()) if not resp.ok: raise ValueError(resp.error_message()) if verbose: if resp.empty(): logger.info("Query %s is configured correctly.", self.query) else: logger.warning(resp.message()) return self
# Persistence ############################################################# def _to_api_pquery_resource( self, name: Optional[str] = None, ) -> PQueryResource: return PQueryResource( name=name, query_string=self.query, graph=self.graph._to_api_graph_definition(), desc="", )
[docs] def save(self) -> PredictiveQueryID: r"""Saves a predictive query to Kumo, returning a unique ID for this query. The unique ID can later be used to load the predictive query object. Example: >>> import kumoai >>> query = kumoai.PredictiveQuery(...) # doctest: +SKIP >>> query.save() # doctest: +SKIP pquery-xxx """ try: self.validate(verbose=False) except ValueError as e: raise ValueError( f"Predictive query {self.query} is improperly configured, so " f"it cannot be saved. Please ensure your query " f"has a valid configuration before proceeding. You can use " f"the `validate` method to verify the validity of your query." ) from e self.graph.save() return global_state.client.pquery_api.create( pquery=self._to_api_pquery_resource(name=None))
[docs] def save_as_template(self, name: str) -> PredictiveQueryID: r"""Saves a predictive query as a named, re-usable template to Kumo, and returns the saved name as a response. This method can be used to "templatize" / name a query configuration for ease of future reusability. Args: name: The name of the template to save the query as. If the name is already associated with another query, that query will be overwritten. Example: >>> import kumoai >>> query = kumoai.PredictiveQuery(...) # doctest: +SKIP >>> query.save_as_template("name") # doctest: +SKIP >>> loaded = kumoai.PredictiveQuery.load("name") # doctest: +SKIP >>> loaded == query # doctest: +SKIP True """ try: self.validate(verbose=False) except ValueError as e: raise ValueError( f"Predictive query {self.query} is improperly configured, so " f"it cannot be saved. Please ensure your query " f"has a valid configuration before proceeding. You can use " f"the `validate` method to verify the validity of your query." ) from e template_resource = global_state.client.pquery_api.get_if_exists(name) if template_resource is not None: template_string = template_resource.query_string logger.warning( ("Predictive query template %s already exists, with " "query string %s. This template will be overridden with " "configuration %s."), name, template_string, self.query) self.graph.save() return global_state.client.pquery_api.create( pquery=self._to_api_pquery_resource(name))
[docs] @classmethod def load(cls, pq_id_or_template: str) -> 'PredictiveQuery': r"""Loads a predictive query from either a predictive query ID or a named template. Returns a :class:`~kumoai.pquery.PredictiveQuery` object that contains the loaded query along with its associated graph, tables, etc. """ api = global_state.client.pquery_api res = api.get_if_exists(pq_id_or_template) if not res: raise ValueError( f"Predictive query {pq_id_or_template} was not found.") return cls( graph=Graph._from_api_graph_definition(res.graph), query=res.query_string, )
[docs] @classmethod def load_from_training_job(cls, training_job_id: str) -> 'PredictiveQuery': r"""Loads a predictive query from a training job, regardless of the training job's status. Returns a :class:`~kumoai.pquery.PredictiveQuery` object that contains the loaded query along with its associated graph, tables, etc. """ train_api: TrainingJobAPI = global_state.client.training_job_api job = train_api.get(training_job_id) id_or_name = job.config.pquery_id return PredictiveQuery.load(pq_id_or_template=id_or_name)
# Training & Prediction Table Generation ################################## @overload def generate_training_table( self, plan: Optional[TrainingTableGenerationPlan] = None, ) -> TrainingTable: pass @overload def generate_training_table( self, plan: Optional[TrainingTableGenerationPlan] = None, *, non_blocking: Literal[False], ) -> TrainingTable: pass @overload def generate_training_table( self, plan: Optional[TrainingTableGenerationPlan] = None, *, non_blocking: Literal[True], ) -> TrainingTableJob: pass @overload def generate_training_table( self, plan: Optional[TrainingTableGenerationPlan] = None, *, non_blocking: bool, ) -> Union[TrainingTable, TrainingTableJob]: pass
[docs] def generate_training_table( self, plan: Optional[TrainingTableGenerationPlan] = None, *, non_blocking: bool = False, custom_tags: Mapping[str, str] = {}, ) -> Union[TrainingTable, TrainingTableJob]: r"""Generates a training table from the specified :attr:`query` string. Args: plan: A specification of the parameters for training table generation. If not provided, will use an intelligently generated default plan based on the query and graph. This plan is equivalent to the plan inferred with ``suggest_training_table_plan(run_mode=RunMode.NORMAL)``. non_blocking: Whether this operation should return immediately after launching the training table generation job, or await completion of the generated training table. custom_tags: Additional, customer defined k-v tags to be associated with the job to be launched. Job tags are useful for grouping and searching jobs. Returns: Union[TrainingTable, TrainingTableJob]: If ``non_blocking=False``, returns a training table object. If ``non_blocking=True``, returns a training table future object. """ pq_id = self.save() # TODO(manan): improve this... if not plan: plan = self.suggest_training_table_plan() train_table_job_api = global_state.client.generate_train_table_job_api job_id: GenerateTrainTableJobID = train_table_job_api.create( GenerateTrainTableRequest( dict(custom_tags), pq_id, plan, graph_snapshot_id=self.graph.snapshot( non_blocking=non_blocking))) self._train_table = TrainingTableJob(job_id=job_id) if non_blocking: return self._train_table self._train_table = self._train_table.attach() return self._train_table
@overload def generate_prediction_table( self, plan: Optional[PredictionTableGenerationPlan] = None, ) -> PredictionTable: pass @overload def generate_prediction_table( self, plan: Optional[PredictionTableGenerationPlan] = None, *, non_blocking: Literal[False], ) -> PredictionTable: pass @overload def generate_prediction_table( self, plan: Optional[PredictionTableGenerationPlan] = None, *, non_blocking: Literal[True], ) -> PredictionTableJob: pass @overload def generate_prediction_table( self, plan: Optional[PredictionTableGenerationPlan] = None, *, non_blocking: bool, ) -> Union[PredictionTable, PredictionTableJob]: pass
[docs] def generate_prediction_table( self, plan: Optional[PredictionTableGenerationPlan] = None, *, non_blocking: bool = False, custom_tags: Mapping[str, str] = {}, ) -> Union[PredictionTable, PredictionTableJob]: r"""Generates a prediction table from the predictive query :attr:`query` string. Args: plan: A specification of the parameters for prediction table generation. If not provided, will use an intelligently generated default plan based on the query and graph. This plan is equivalent to the plan inferred with ``suggest_prediction_table_plan(run_mode=RunMode.NORMAL)``. non_blocking: Whether this operation should return immediately after launching the prediction table generation job, or await completion of the generated prediction table. custom_tags: Additional, customer defined k-v tags to be associated with the job to be launched. Job tags are useful for grouping and searching jobs. Returns: Union[PredictionTable, PredictionTableJob]: If ``non_blocking=False``, returns a prediction table object. If ``non_blocking=True``, returns a prediction table future object. """ pq_id = self.save() if not plan: plan = self.suggest_prediction_table_plan() bp_table_api = global_state.client.generate_prediction_table_job_api job_id: GeneratePredictionTableJobID = bp_table_api.create( GeneratePredictionTableRequest( dict(custom_tags), pq_id, plan, graph_snapshot_id=self.graph.snapshot( non_blocking=non_blocking))) self._prediction_table = PredictionTableJob(job_id=job_id) if non_blocking: return self._prediction_table self._prediction_table = self._prediction_table.result() return self._prediction_table
# Training & Prediction ###################################################
[docs] def suggest_training_table_plan( self, run_mode: RunMode = RunMode.FAST, ) -> TrainingTableGenerationPlan: r"""Suggests a training table generation plan given the predictive query and graph. This training table generation plan can be used to alter the approach Kumo uses to generate the training table for your predictive query. Args: run_mode: A representation of how quickly you would like your predictive query to complete. Faster run modes correspond to lower training times, at the cost of potentially lower performance. """ self.graph.save() req = SuggestModelPlanRequest( query_string=self.query, graph_id=self.graph.id, run_mode=run_mode, ) return global_state.client.pquery_api.suggest_training_table_plan(req)
[docs] def suggest_prediction_table_plan(self, ) -> PredictionTableGenerationPlan: r"""Suggests a prediction table generation plan given the predictive query and graph. This prediction table generation plan can be used to alter the approach Kumo uses to generate the prediction table for your predictive query. """ return PredictionTableGenerationPlan(anchor_time=InferredType.VALUE)
[docs] def suggest_model_plan( self, run_mode: RunMode = RunMode.FAST, ) -> ModelPlan: r"""Suggests a modeling plan given the predictive query and graph. This model plan can be used to alter the approach Kumo uses to train your machine learning model. Args: run_mode: A representation of how quickly you would like your predictive query to complete. Faster run modes correspond to lower training times, at the cost of potentially lower performance. """ self.graph.save() req = SuggestModelPlanRequest( query_string=self.query, graph_id=self.graph.id, run_mode=run_mode, ) return global_state.client.pquery_api.suggest_model_plan(req)
@overload def fit( self, training_table_plan: Optional[TrainingTableGenerationPlan] = None, model_plan: Optional[ModelPlan] = None, ) -> Tuple[Trainer, TrainingJobResult]: pass @overload def fit( self, training_table_plan: Optional[TrainingTableGenerationPlan] = None, model_plan: Optional[ModelPlan] = None, *, non_blocking: Literal[False], ) -> Tuple[Trainer, TrainingJobResult]: pass @overload def fit( self, training_table_plan: Optional[TrainingTableGenerationPlan] = None, model_plan: Optional[ModelPlan] = None, *, non_blocking: Literal[True], ) -> Tuple[Trainer, TrainingJob]: pass @overload def fit( self, training_table_plan: Optional[TrainingTableGenerationPlan] = None, model_plan: Optional[ModelPlan] = None, *, non_blocking: bool, ) -> Tuple[Trainer, Union[TrainingJobResult, TrainingJob]]: pass
[docs] def fit( self, training_table_plan: Optional[TrainingTableGenerationPlan] = None, model_plan: Optional[ModelPlan] = None, *, non_blocking: bool = False, ) -> Tuple[Trainer, Union[TrainingJobResult, TrainingJob]]: r"""Trains a Kumo model on this predictive query, given optional additional specifications of the training table generation plan and the model plan. Args: training_table_plan: A specification of the parameters for training table generation. If not provided, will use an intelligently generated default plan based on the query and graph. This plan is equivalent to the plan inferred with ``suggest_training_table_plan(run_mode=RunMode.NORMAL)``. model_plan: A specification of the parameters for model training. If not provided, will use an intelligently generated default plan based on the query and graph. This plan is equivalent to the plan inferred with ``suggest_model_plan(run_mode=RunMode.NORMAL)``. non_blocking: Whether this operation should return immediately after launching the training job, or await completion of the training job. Returns: Tuple[Trainer, Union[TrainingJobResult, TrainingJob]]: A tuple with two elements. The first element is the trainer object used to launch the training job. The second element is either a training job object (if ``non_blocking=True``) or a training job future object (if ``non_blocking=False``). """ # If we have already generated the training table, use it with Trainer: if self._train_table is None: # Nonblocking generate: self._train_table = self.generate_training_table( training_table_plan, non_blocking=True) # TODO(manan): what if `self._train_table` represents a failed job? model_plan = model_plan or self.suggest_model_plan() trainer = Trainer(model_plan) return (trainer, trainer.fit(self.graph, self.train_table, non_blocking=non_blocking))
[docs] def generate_baseline( self, metrics: List[str], train_table: Union[TrainingTable, TrainingTableJob], *, non_blocking: bool = False, ) -> Union[BaselineJob, BaselineJobResult]: r"""Runs a baseline model on this predictive query, given metrics and optional additional specifications of the training table generation plan. Args: metrics (List[str]): A list to metrics that baseline model will be evaluated on. train_table (Union[TrainingTable, TrainingTableJob]): The :class:`~kumoai.pquery.TrainingTable`, or in-progress :class:`~kumoai.pquery.TrainingTableJob` that represents the training data produced by a :class:`~kumoai.pquery.PredictiveQuery` on :obj:`graph`. non_blocking (bool): Whether this operation should return immediately after launching the baseline job, or await completion of the baseline job. Defaults to False. Returns: Union[BaselineJob, BaselineJobResult]: either a baseline job object (if ``non_blocking=True``) or a baseline job future object (if ``non_blocking=False``). """ # noqa baseline_trainer = BaselineTrainer(metrics) return baseline_trainer.run(self.graph, train_table, non_blocking=non_blocking)