Source code for kumoai.pquery.prediction_table
from __future__ import annotations
import asyncio
import logging
from concurrent.futures import Future
from datetime import datetime
from functools import cached_property
from typing import List, Optional, Union
import pandas as pd
from kumoapi.common import JobStatus
from kumoapi.jobs import (
GeneratePredictionTableJobResource,
GeneratePredictionTableRequest,
JobStatusReport,
)
from typing_extensions import override
from kumoai import global_state
from kumoai.client.jobs import (
GeneratePredictionTableJobAPI,
GeneratePredictionTableJobID,
)
from kumoai.connector.s3_connector import S3URI
from kumoai.formatting import pretty_print_error_details
from kumoai.futures import KumoFuture, create_future
from kumoai.jobs import JobInterface
logger = logging.getLogger(__name__)
_DEFAULT_INTERVAL_S = 20
[docs]
class PredictionTable:
r"""A prediction table in the Kumo platform. A prediction table can
either be initialized from a job ID of a completed prediction table
generation job, or a path on a supported object store (S3 for a SaaS or
Databricks deployment, and Snowflake session storage for Snowflake).
.. warning::
Custom prediction table is an experimental feature; please work
with your Kumo POC to ensure you are using it correctly!
.. code-block:: python
import kumoai
# Create a Prediction Table from a prediction table generation job.
# Note that the job ID passed here must be in a completed state:
prediction_table = kumoai.PredictionTable("gen-predtable-job-...")
# Read the prediction table as a Pandas DataFrame:
prediction_df = prediction_table.data_df()
# Get URLs to download the prediction table:
prediction_download_urls = prediction_table.data_urls()
Args:
job_id: ID of the prediction table generation job which
generated this prediction table. If a custom table data path is
specified, this parameter should be left as ``None``.
table_data_path: S3 path of the table data location, for which Kumo
must at least have read access. If a job ID is specified, this
parameter should be left as ``None``.
"""
[docs]
def __init__(
self,
job_id: Optional[GeneratePredictionTableJobID] = None,
table_data_path: Optional[str] = None,
) -> None:
# Validation:
if not (job_id or table_data_path):
raise ValueError(
"A PredictionTable must either be initialized with a table "
"data path, or a job ID of a completed prediction table "
"generation job.")
if job_id and table_data_path:
raise ValueError(
"Please either pass a table data path, or a job ID of a "
"completed prediction table generation job; passing both "
"is not allowed.")
# Custom path:
self.table_data_uri: Optional[Union[str, S3URI]] = None
if table_data_path is not None:
if table_data_path.startswith('dbfs:/'):
raise ValueError(
"Files from Databricks UC Volumes are not supported")
if global_state.is_spcs:
if table_data_path.startswith('s3://'):
raise ValueError(
"SPCS does not support S3 paths for prediction tables."
)
# TODO(zeyuan): support custom stage path on SPCS:
self.table_data_uri = table_data_path
else:
self.table_data_uri = S3URI(table_data_path).validate()
# Job ID:
self.job_id = job_id
if job_id:
status = _get_status(job_id).status
if status != JobStatus.DONE:
raise ValueError(
f"Job {job_id} is not yet complete (status: {status}). If "
f"you would like to create a future (waiting for "
f"prediction table generation success), please use "
f"`PredictionTableJob`.")
[docs]
def data_urls(self) -> List[str]:
r"""Returns a list of URLs that can be used to view generated
prediction table data; if a custom data path was passed, this path is
simply returned.
The list will contain more than one element if the table is
partitioned; paths will be relative to the location of the Kumo data
plane.
"""
api = global_state.client.generate_prediction_table_job_api
if not self.job_id:
# Custom prediction table:
if global_state.is_spcs:
assert isinstance(self.table_data_uri, str)
return [self.table_data_uri]
else:
assert isinstance(self.table_data_uri, S3URI)
return [self.table_data_uri.uri]
return api.get_table_data(self.job_id, presigned=True)
[docs]
def data_df(self) -> pd.DataFrame:
r"""Returns a Pandas DataFrame object representing the generated
or custom-specified prediction table data.
.. warning::
This method will load the full prediction table into memory as a
:class:`~pandas.DataFrame` object. If you are working on a machine
with limited resources, please use
:meth:`~kumoai.pquery.PredictionTable.data_urls` instead to
download the data and perform analysis per-partition.
"""
if global_state.is_spcs:
from kumoai.spcs import _parquet_dataset_to_df
# TODO(dm): return type hint is wrong
return _parquet_dataset_to_df(self.data_urls())
else:
urls = self.data_urls()
try:
return pd.concat([pd.read_parquet(x) for x in urls])
except Exception as e:
raise ValueError(
f"Could not create a Pandas DataFrame object from data "
f"paths {urls}. Please construct the DataFrame manually."
) from e
@property
def anchor_time(self) -> Optional[datetime]:
r"""Returns the anchor time corresponding to the generated prediction
table data, if the data was not custom-specified.
"""
if self.job_id is None:
logger.warning(
"Fetching the anchor time is not supported for a custom "
"prediction table (path: %s)", self.table_data_uri)
return None
api = global_state.client.generate_prediction_table_job_api
return api.get_anchor_time(self.job_id)
# Prediction Table Future #####################################################
[docs]
class PredictionTableJob(JobInterface[GeneratePredictionTableJobID,
GeneratePredictionTableRequest,
GeneratePredictionTableJobResource],
KumoFuture[PredictionTable]):
r"""A representation of an ongoing prediction table generation job in the
Kumo platform.
.. code-block:: python
import kumoai
# See `PredictiveQuery` documentation:
pquery = kumoai.PredictiveQuery(...)
# If a prediction table is generated in nonblocking mode, the response
# will be of type `PredictionTableJob`:
prediction_table_job = pquery.generate_prediction_table(non_blocking=True)
# You can also construct a `PredictionTableJob` from a job ID, e.g.
# one that is present in the Kumo Jobs page:
prediction_table_job = kumoai.PredictionTableJob("gen-predtable-job-...")
# Get the status of the job:
print(prediction_table_job.status())
# Cancel the job:
prediction_table_job.cancel()
# Wait for the job to complete, and return a `PredictionTable`:
prediction_table_job.result()
Args:
job_id: ID of the prediction table generation job.
""" # noqa
@override
@staticmethod
def _api() -> GeneratePredictionTableJobAPI:
return global_state.client.generate_prediction_table_job_api
[docs]
def __init__(
self,
job_id: GeneratePredictionTableJobID,
) -> None:
self.job_id = job_id
self.job: Optional[GeneratePredictionTableJobResource] = None
@cached_property
def _fut(self) -> Future:
return create_future(self._poll())
@override
@property
def id(self) -> GeneratePredictionTableJobID:
r"""The unique ID of this prediction table generation process."""
return self.job_id
[docs]
@override
def result(self) -> PredictionTable:
return self._fut.result()
[docs]
@override
def future(self) -> Future[PredictionTable]:
return self._fut
[docs]
@override
def status(self) -> JobStatusReport:
r"""Returns the status of a running prediction table generation job."""
return self._poll_job().job_status_report
[docs]
def cancel(self) -> None:
r"""Cancels a running prediction table generation job, and raises an
error if cancellation failed.
"""
return self._api().cancel(self.job_id)
# TODO(manan): make asynchronous natively with aiohttp:
def _poll_job(self) -> GeneratePredictionTableJobResource:
# Skip polling if job is already in terminal state.
if not self.job or not self.job.job_status_report.status.is_terminal:
self.job = self._api().get(self.job_id)
return self.job
async def _poll(self) -> PredictionTable:
while not self.status().status.is_terminal:
await asyncio.sleep(_DEFAULT_INTERVAL_S)
status = self.status().status
if status != JobStatus.DONE:
error_details = self._api().get_job_error(self.job_id)
error_str = pretty_print_error_details(error_details)
raise RuntimeError(
f"Prediction table generation for job {self.job_id} failed "
f"with job status {status}. Encountered below"
f" errors: {error_str}")
return PredictionTable(self.job_id)
[docs]
@override
def load_config(self) -> GeneratePredictionTableRequest:
r"""Load the full configuration for this
prediction table generation job.
Returns:
GeneratePredictionTableRequest:
Complete configuration including plan,
pquery_id, graph_snapshot_id, etc.
"""
return self._api().get_config(self.job_id)
def _get_status(job_id: str) -> JobStatusReport:
api = global_state.client.generate_prediction_table_job_api
resource: GeneratePredictionTableJobResource = api.get(job_id)
return resource.job_status_report