Source code for kumoai.connector.utils

import asyncio
import io
import math
import os
from logging import getLogger
from typing import Generator, List, Tuple

import aiohttp
from kumoapi.data_source import (
    CompleteFileUploadRequest,
    DeleteUploadedFileRequest,
    PartUploadMetadata,
    StartFileUploadRequest,
    StartFileUploadResponse,
)
from tqdm.asyncio import tqdm_asyncio

from kumoai import global_state
from kumoai.futures import _KUMO_EVENT_LOOP

CHUNK_SIZE = 100 * 10**6  # 100 MB

logger = getLogger(__name__)

CONNECTOR_ID_MAP = {
    "csv": "csv_upload_connector",
    "parquet": "parquet_upload_connector",
}


async def put(
    session: aiohttp.ClientSession,
    url: str,
    data: bytes,
    part_no: int,
) -> Tuple[int, str]:
    r"""Performs an asynchronous PUT request to upload data to a presigned S3
    URL, and returns a tuple corresponding to the uploaded part number and
    the Etag of the header.

    Args:
        session: the ``aiohttp`` client session to use for the request
        url: the S3 presigned URL to PUT ``data`` to
        data: the data (``bytes``) that should be PUT to ``url``
        part_no: the part number of the data to be PUT
    """
    # TODO(manan): add retry...
    async with session.put(url, data=data) as res:
        logger.debug("PUT part_no=%s bytes=%s", part_no, len(data))
        _ = await res.text()
        if res.status != 200:
            raise RuntimeError(
                f"PUT URL={url} failed: with status {res.status}: "
                f"{res}")
        headers = res.headers
        return (part_no + 1, headers['Etag'])


async def multi_put(
    loop: asyncio.AbstractEventLoop,
    urls: List[str],
    data: Generator[bytes, None, None],
) -> List[PartUploadMetadata]:
    r"""Performs multiple asynchronous PUT requests of the data yielded
    from the ``data`` generator to the specified URLs. If the data
    generator is exhausted early, only a subset of URLs are used. If
    the data generator is not exhausted by the URLs, uploaded data may
    be corrupted!
    """
    # TODO(manan): retry
    # TODO(manan): properly stream chunks
    async with aiohttp.ClientSession(
        loop=loop,
        connector=aiohttp.TCPConnector(verify_ssl=False),
        headers={'Content-Type': 'binary'},
    ) as session:
        results = await tqdm_asyncio.gather(*[
            put(session, url, data, i)
            for i, (url, data) in enumerate(zip(urls, data))
        ])
        for r in results:
            if isinstance(r, BaseException):
                raise r
        return [PartUploadMetadata(v[0], v[1]) for v in results]


def stream_read(
    f: io.BufferedReader,
    chunk_size: int,
) -> Generator[bytes, None, None]:
    r"""Streams ``chunk_size`` contiguous bytes from buffered reader
    ``f`` each time the generator is yielded from.
    """
    while True:
        byte_buf = f.read(chunk_size)
        if len(byte_buf) == 0:
            # StopIteration:
            break
        yield byte_buf


[docs]def upload_table(name: str, path: str) -> None: r"""Synchronously uploads a table located on your local machine to the Kumo data plane. Tables uploaded in this way can be accessed with a :class:`~kumoai.connector.FileUploadConnector`. .. warning:: Uploaded tables must be single files, either in parquet or CSV format. Partitioned tables are not currently supported. .. code-block:: python import kumoai from kumoai.connector import upload_table # Assume we have a table located at /data/users.parquet, and we # want to upload this table to Kumo, to be used downstream: upload_table(name="users", path="/data/users.parquet") Args: name: The name of the table to be uploaded. The uploaded table can be accessed from the :class:`~kumoai.connector.FileUploadConnector` with this name. path: The full path of the table to be uploaded, on the local machine. """ # TODO(manan): support progress bars # Validate: if not (path.endswith(".parquet") or path.endswith(".csv")): raise ValueError(f"Path {path} must be either a CSV or Parquet " f"file. Partitioned data is not currently supported.") # Prepare upload (number of parts based on total size): file_type = 'parquet' if path.endswith('parquet') else 'csv' sz = os.path.getsize(path) logger.info("Uploading table %s (path: %s), size=%s bytes", name, path, sz) upload_res = _start_table_upload( table_name=name, file_type=file_type, file_size_bytes=sz, ) # Chunk and upload: urls = list(upload_res.presigned_part_urls.values()) loop = _KUMO_EVENT_LOOP part_metadata_list_fut = asyncio.run_coroutine_threadsafe( multi_put(loop, urls=urls, data=stream_read( open(path, 'rb'), CHUNK_SIZE, )), loop) part_metadata_list = part_metadata_list_fut.result() # Complete: logger.info("Upload complete. Validating table %s.", name) _complete_table_upload( table_name=name, file_type=file_type, upload_path=upload_res.temp_upload_path, upload_id=upload_res.upload_id, parts_metadata=part_metadata_list, ) logger.info("Completed uploading table %s to Kumo.", name)
[docs]def delete_uploaded_table( name: str, file_type: str, ) -> None: r"""Synchronously deletes a previously uploaded table from the Kumo data plane. .. code-block:: python import kumoai from kumoai.connector import delete_uploaded_table # Assume we have uploaded a `.parquet` table named `users`, # and we want to delete this table from Kumo: delete_uploaded_table(name="users", file_type="parquet") # Assume we have uploaded a `.csv` table named `orders`, # and we want to delete this table from Kumo: delete_uploaded_table(name="orders", file_type="csv") Args: name: The name of the table to be deleted. This table must have previously been uploaded with a call to :meth:`~kumoai.connector.upload_table`. file_type: The file type of the table to be deleted; this can either be :obj:`"parquet"` or :obj:`"csv"` """ assert file_type in {'parquet', 'csv'} req = DeleteUploadedFileRequest( source_table_name=name, connector_id=CONNECTOR_ID_MAP[file_type], ) global_state.client.connector_api.delete_file_upload(req) logger.info("Successfully deleted table %s from Kumo.", name)
def replace_table( name: str, path: str, file_type: str, ) -> None: r"""Replaces an existing uploaded table on the Kumo data plane with a new table. .. code-block:: python import kumoai from kumoai.connector import replace_table # Replace an existing `.csv` table named `users` # with a new version located at `/data/new_users.csv`: replace_table( name="users", path="/data/new_users.csv", file_type="csv", ) Args: name: The name of the table to be replaced. This table must have previously been uploaded with a call to :meth:`~kumoai.connector.upload_table`. path: The full path of the new table to be uploaded, on the local machine. file_type: The file type of the table to be replaced; this can either be :obj:`"parquet"` or :obj:`"csv"`. Raises: ValueError: If the specified path does not point to a valid `.csv` or `.parquet` file. """ # Validate: if not (path.endswith(".parquet") or path.endswith(".csv")): raise ValueError(f"Path {path} must be either a CSV or Parquet file. " f"Partitioned data is not currently supported.") try: logger.info("Deleting previously uploaded table %s of type %s.", name, file_type) delete_uploaded_table(name=name, file_type=file_type) except Exception: # TODO(manan): fix this... pass logger.info("Uploading table %s.", name) upload_table(name=name, path=path) logger.info("Successfully replaced table %s with the new table.", name) def _start_table_upload( table_name: str, file_type: str, file_size_bytes: float, ) -> StartFileUploadResponse: assert file_type in CONNECTOR_ID_MAP.keys() req = StartFileUploadRequest( source_table_name=table_name, connector_id=CONNECTOR_ID_MAP[file_type], num_parts=max(1, math.ceil(file_size_bytes / CHUNK_SIZE)), ) return global_state.client.connector_api.start_file_upload(req) def _complete_table_upload( table_name: str, file_type: str, upload_path: str, upload_id: str, parts_metadata: List[PartUploadMetadata], ) -> None: assert file_type in CONNECTOR_ID_MAP.keys() req = CompleteFileUploadRequest( source_table_name=table_name, connector_id=CONNECTOR_ID_MAP[file_type], temp_upload_path=upload_path, upload_id=upload_id, parts_metadata=parts_metadata, ) return global_state.client.connector_api.complete_file_upload(req)