import os
from typing import List
from kumoapi.data_source import DeleteUploadedFileRequest
from kumoapi.source_table import (
DataSourceType,
FileType,
S3SourceTableRequest,
SourceTableConfigRequest,
SourceTableConfigResponse,
)
from typing_extensions import override
from kumoai import global_state
from kumoai.connector.base import Connector
from kumoai.connector.utils import (
CONNECTOR_ID_MAP,
MAX_PARTITION_SIZE,
MIN_PARTITION_SIZE,
_upload_partitioned_csv,
_upload_partitioned_parquet,
_upload_single_file,
logger,
)
[docs]
class FileUploadConnector(Connector):
r"""Defines a connector to files directly uploaded to Kumo, either as
'parquet' or 'csv' (non-partitioned) data.
To get started with file upload, please first upload a table with
the :meth:`upload` method in the :class:`FileUploadConnector` class.
You can then access
this table behind the file upload connector as follows:
.. code-block:: python
import kumoai
# Create the file upload connector:
connector = kumoai.FileUploadConnector(file_type="parquet")
# Upload the table; assume it is stored at `/data/users.parquet`
connector.upload(name="users", path="/data/users.parquet")
# Check that the file upload connector has a `users` table:
assert connector.has_table("users")
Args:
file_type: The file type of uploaded data. Can be either ``"csv"``
or ``"parquet"``.
"""
[docs]
def __init__(self, file_type: str) -> None:
r"""Creates the connector to uploaded files of type
:obj:`file_type`.
"""
assert file_type.lower() in {'parquet', 'csv'}
self._file_type = file_type.lower()
@property
def name(self) -> str:
return f'{self._file_type}_upload_connector'
@override
@property
def source_type(self) -> DataSourceType:
return DataSourceType.S3
@property
def file_type(self) -> FileType:
return (FileType.PARQUET
if self._file_type == 'parquet' else FileType.CSV)
def _get_table_config(self, table_name: str) -> SourceTableConfigResponse:
req = SourceTableConfigRequest(connector_id=self.name,
table_name=table_name,
source_type=self.source_type,
file_type=None)
return global_state.client.source_table_api.get_table_config(req)
@override
def _source_table_request(self,
table_names: List[str]) -> S3SourceTableRequest:
return S3SourceTableRequest(s3_root_dir="", connector_id=self.name,
table_names=table_names, file_type=None)
[docs]
def upload(
self,
name: str,
path: str,
auto_partition: bool = True,
partition_size_mb: int = 250,
) -> None:
r"""Synchronously uploads a table located on your
local machine to the Kumo data plane.
Tables uploaded in this way can be accessed with
this ``FileUploadConnector`` using the provided name,
for example: ``connector_obj["my_table"]``
For files larger than 1GB, the table will be automatically partitioned
into smaller chunks and uploaded with common prefix that allows
FileUploadConnector to union them when reading.
.. warning::
Uploaded tables must be single files, either in parquet or CSV
format(must match connector type).
Partitioned tables are not currently supported.
.. code-block:: python
import kumoai
connector = kumoai.FileUploadConnector(file_type="parquet")
# Upload a small table
connector.upload(name="users", path="/data/users.parquet")
# Upload a large parquet table (will be automatically partitioned)
connector.upload(name="transactions",
path="/data/large_transactions.parquet")
# Disable auto-partitioning (will raise error for large files)
upload(name="users", path="/data/users.parquet",
auto_partition=False)
# Create a file upload connector for CSV files.
connectorCSV = kumoai.FileUploadConnector(file_type="csv")
# Upload a large CSV table (will be automatically partitioned)
connectorCSV.upload(name="sales", path="/data/large_sales.csv")
Args:
name: The name of the table to be uploaded. The uploaded table can
be accessed from the
:class:`~kumoai.connector.FileUploadConnector` with this name.
path: The full path of the table to be uploaded, on the local
machine. File Type must match the connector type.
auto_partition: Whether to automatically
partition large files (>1GB).
If False and file is >1GB, raises ValueError. Supports both
Parquet and CSV files.
partition_size_mb: The size of each partition in MB. Only used if
auto_partition is True.
"""
# Validate file type matches connector type
if not path.lower().endswith("." + self._file_type):
raise ValueError(f"File {path} must match connector path type: "
f"{self._file_type}.")
# Validate file type
if not (path.endswith(".parquet") or path.endswith(".csv")):
raise ValueError(f"Path {path} must be either a CSV or Parquet "
f"file. Partitioned data is not currently "
f"supported.")
file_size = os.path.getsize(path)
# Route based on file size
if file_size < MAX_PARTITION_SIZE:
return _upload_single_file(name, path)
if not auto_partition:
raise ValueError(f"File {path} is {file_size / (1024**3):.2f}GB, "
f"which exceeds the 1GB limit. Enable "
f"auto_partition=True to automatically partition "
f"large files.")
# Partition and upload large files
partition_size = partition_size_mb * 1024**2
if (partition_size > MAX_PARTITION_SIZE
or partition_size < MIN_PARTITION_SIZE):
raise ValueError(f"Partition size {partition_size_mb}MB must be "
f"between {MIN_PARTITION_SIZE / 1024**2}MB and "
f"{MAX_PARTITION_SIZE / 1024**2}MB.")
logger.info(
"File %s is large with size %s, partitioning for upload...", path,
file_size)
if path.endswith('.parquet'):
_upload_partitioned_parquet(name, path, partition_size)
else:
_upload_partitioned_csv(name, path, partition_size)
[docs]
def delete(
self,
name: str,
file_type: str,
) -> None:
r"""Synchronously deletes a previously uploaded table from the Kumo
data plane.
.. code-block:: python
# Assume we have uploaded a `.parquet` table named `users`, and a
# `FileUploadConnector` has been created called `connector`, and
# we want to delete this table from Kumo:
connector.delete(name="users", file_type="parquet")
Args:
name: The name of the table to be deleted. This table must have
previously been uploaded with a call to
:meth:`~kumoai.connector.FileUploadConnector.upload`.
file_type: The file type of the table to be deleted; this can
either be :obj:`"parquet"` or :obj:`"csv"`, and must match the
connector file_type.
"""
if file_type.lower() != self._file_type:
raise ValueError(f"File type {file_type} does not match "
f"connector file type {self._file_type}.")
if not self.has_table(name):
raise ValueError(f"The table '{name}' does not exist in {self}. "
f"Please check the existence of the source data.")
req = DeleteUploadedFileRequest(
source_table_name=name,
connector_id=CONNECTOR_ID_MAP[file_type],
)
global_state.client.connector_api.delete_file_upload(req)
logger.info("Successfully deleted table %s from Kumo.", name)