Source code for kumoai.connector.file_upload_connector

from typing import List

from kumoapi.source_table import (
    DataSourceType,
    FileType,
    S3SourceTableRequest,
    SourceTableConfigRequest,
    SourceTableConfigResponse,
)
from typing_extensions import override

from kumoai import global_state
from kumoai.connector.base import Connector


[docs]class FileUploadConnector(Connector): r"""Defines a connector to files directly uploaded to Kumo, either as 'parquet' or 'csv' (non-partitioned) data. To get started with file upload, please first upload a table with the :meth:`~kumoai.connector.upload_table` method. You can then access this table behind the file upload connector as follows: .. code-block:: python import kumoai from kumoai.connector import upload_table # Upload the table; assume it is stored at `/data/users.parquet` upload_table(name="users", path="/data/users.parquet") # Create the file upload connector: connector = kumoai.FileUploadConnector(file_type="parquet") # Check that the file upload connector has a `users` table: assert connector.has_table("users") Args: file_type: The file type of uploaded data. Can be either ``"csv"`` or ``"parquet"``. """
[docs] def __init__(self, file_type: str) -> None: r"""Creates the connector to uploaded files of type :obj:`file_type`. """ assert file_type in {'parquet', 'csv'} self._file_type = file_type
@property def name(self) -> str: return f'{self._file_type}_upload_connector' @override @property def source_type(self) -> DataSourceType: return DataSourceType.S3 @property def file_type(self) -> FileType: return (FileType.PARQUET if self._file_type == 'parquet' else FileType.CSV) def _get_table_config(self, table_name: str) -> SourceTableConfigResponse: req = SourceTableConfigRequest(connector_id=self.name, table_name=table_name, source_type=self.source_type, file_type=self.file_type) return global_state.client.source_table_api.get_table_config(req) @override def _source_table_request(self, table_names: List[str]) -> S3SourceTableRequest: return S3SourceTableRequest(s3_root_dir="", connector_id=self.name, table_names=table_names, file_type=self.file_type)