Source code for kumoai.pquery.training_table

from __future__ import annotations

import asyncio
import logging
import time
from concurrent.futures import Future
from typing import List, Mapping, Optional, Tuple

import pandas as pd
from kumoapi.common import JobStatus
from kumoapi.jobs import GenerateTrainTableJobResource, JobStatusReport
from tqdm.auto import tqdm
from typing_extensions import override

from kumoai import global_state
from kumoai.client.jobs import (
    GenerateTrainTableJobAPI,
    GenerateTrainTableJobID,
)
from kumoai.formatting import pretty_print_error_details
from kumoai.futures import KumoFuture, create_future

logger = logging.getLogger(__name__)


[docs]class TrainingTable: r"""A training table in the Kumo platform. A training table can be initialized from a job ID of a completed training table generation job. .. code-block:: python import kumoai # Create a Training Table from a training table generation job. Note # that the job ID passed here must be in a completed state: training_table = kumoai.TrainingTable("gen-traintable-job-...") # Read the training table as a Pandas DataFrame: training_df = training_table.data_df() # Get URLs to download the training table: training_download_urls = training_table.data_urls() Args: job_id: ID of the training table generation job which generated this training table. """
[docs] def __init__(self, job_id: GenerateTrainTableJobID): self.job_id = job_id status = _get_status(job_id).status if status != JobStatus.DONE: raise ValueError( f"Job {job_id} is not yet complete (status: {status}). If you " f"would like to create a future (waiting for training table " f"generation success), please use `TrainingTableJob`.")
[docs] def data_urls(self) -> List[str]: r"""Returns a list of URLs that can be used to view generated training table data. The list will contain more than one element if the table is partitioned; paths will be relative to the location of the Kumo data plane. """ api: GenerateTrainTableJobAPI = ( global_state.client.generate_train_table_job_api) return api._get_table_data(self.job_id, presigned=True, raw_path=True)
[docs] def data_df(self) -> pd.DataFrame: r"""Returns a :class:`~pandas.DataFrame` object representing the generated training data. .. warning:: This method will load the full training table into memory as a :class:`~pandas.DataFrame` object. If you are working on a machine with limited resources, please use :meth:`~kumoai.pquery.TrainingTable.data_urls` instead to download the data and perform analysis per-partition. """ urls = self.data_urls() try: return pd.concat([pd.read_parquet(x) for x in urls]) except Exception as e: raise ValueError( f"Could not create a Pandas DataFrame object from data paths " f"{urls}. Please construct the DataFrame manually.") from e
def __repr__(self) -> str: return f'{self.__class__.__name__}(job_id={self.job_id})'
# Training Table Future #######################################################
[docs]class TrainingTableJob(KumoFuture[TrainingTable]): r"""A representation of an ongoing training table generation job in the Kumo platform. .. code-block:: python import kumoai # See `PredictiveQuery` documentation: pquery = kumoai.PredictiveQuery(...) # If a training table is generated in nonblocking mode, the response # will be of type `TrainingTableJob`: training_table_job = pquery.generate_training_table(non_blocking=True) # You can also construct a `TrainingTableJob` from a job ID, e.g. # one that is present in the Kumo Jobs page: training_table_job = kumoai.TrainingTableJob("trainingjob-...") # Get the status of the job: print(training_table_job.status()) # Attach to the job, and poll progress updates: training_table_job.attach() # Cancel the job: training_table_job.cancel() # Wait for the job to complete, and return a `TrainingTable`: training_table_job.result() Args: job_id: ID of the training table generation job. """
[docs] def __init__( self, job_id: GenerateTrainTableJobID, ) -> None: self.job_id = job_id # A training table holds a reference to the future that tracks the # execution of its generation. self._fut: Future[TrainingTable] = create_future(_poll(job_id))
@property def id(self) -> GenerateTrainTableJobID: r"""The unique ID of this training table generation process.""" return self.job_id
[docs] @override def result(self) -> TrainingTable: return self._fut.result()
[docs] @override def future(self) -> Future[TrainingTable]: return self._fut
[docs] def status(self) -> JobStatusReport: r"""Returns the status of a running training table generation job.""" return _get_status(self.job_id)
[docs] def attach(self) -> TrainingTable: r"""Allows a user to observe the status of the future while it is not complete. """ print(f"Attaching to training table generation job {self.job_id}. " f"Tracking this job in the Kumo UI is coming soon. To detach " f"from this job, please enter Ctrl+C (the job will continue to " f"run, and you can re-attach anytime).") api = global_state.client.generate_train_table_job_api def _get_progress() -> Optional[Tuple[int, int]]: progress = api.get_progress(self.job_id) if len(progress) == 0: return None expected_iter = progress['num_expected_iterations'] completed_iter = progress['num_finished_iterations'] return (expected_iter, completed_iter) # Print progress bar: print("Training table generation is in progress. If your task is " "temporal, progress per timeframe will be loaded shortly.") # Wait for either timeframes to become available, or the job is done: progress = _get_progress() while progress is None: progress = _get_progress() # Not a temporal task, and it's done: if self.status().status.is_terminal: return self.result() time.sleep(3) # Wait for timeframes to become available: progress = _get_progress() assert progress is not None total, prog = progress pbar = tqdm(total=total, unit="timeframe", desc="Generating Training Table") pbar.update(pbar.n - prog) while not self.done(): progress = _get_progress() assert progress is not None total, prog = progress pbar.reset(total) pbar.update(prog) time.sleep(3) pbar.update(pbar.total) pbar.close() # Future is done: return self.result()
[docs] def delete_tags(self, tags: List[str]) -> bool: r"""Removes the tags from the job. Args: tags (List[str]): The tags to remove. """ api = global_state.client.generate_train_table_job_api return api.delete_tags(self.job_id, tags)
[docs] def update_tags(self, tags: Mapping[str, Optional[str]]) -> bool: r"""Updates the tags of the job. Args: tags (Mapping[str, Optional[str]]): The tags to update. Note that the value 'none' will remove the tag. If the tag is not present, it will be added. """ api = global_state.client.generate_train_table_job_api return api.update_tags(self.job_id, tags)
[docs] def cancel(self) -> None: r"""Cancels a running training table generation job, and raises an error if cancellation failed. """ api = global_state.client.generate_train_table_job_api return api.cancel(self.job_id)
def _get_status(job_id: str) -> JobStatusReport: api = global_state.client.generate_train_table_job_api resource: GenerateTrainTableJobResource = api.get(job_id) return resource.job_status_report async def _poll(job_id: str) -> TrainingTable: # TODO(manan): make asynchronous natively with aiohttp: status = _get_status(job_id).status while not status.is_terminal: await asyncio.sleep(10) status = _get_status(job_id).status if status != JobStatus.DONE: api = global_state.client.generate_train_table_job_api error_details = api.get_job_error(job_id) error_str = pretty_print_error_details(error_details) raise RuntimeError( f"Training table generation for job {job_id} failed with " f"job status {status}. Encountered below error(s):" f'{error_str}') return TrainingTable(job_id)