import asyncio
import io
import math
import os
import re
import tempfile
import time
from logging import getLogger
from typing import Any, Callable, Generator, List, Tuple
import aiohttp
import pandas as pd
from kumoapi.data_source import (
CompleteFileUploadRequest,
DeleteUploadedFileRequest,
PartUploadMetadata,
StartFileUploadRequest,
StartFileUploadResponse,
)
from tqdm import tqdm
from tqdm.asyncio import tqdm_asyncio
from kumoai import global_state
from kumoai.exceptions import HTTPException
from kumoai.futures import _KUMO_EVENT_LOOP
CHUNK_SIZE = 100 * 10**6 # 100 MB
MAX_PARTITION_SIZE = 1000 * 1024**2 # 1GB
MIN_PARTITION_SIZE = 100 * 1024**2 # 100MB
logger = getLogger(__name__)
CONNECTOR_ID_MAP = {
"csv": "csv_upload_connector",
"parquet": "parquet_upload_connector",
}
async def put(
session: aiohttp.ClientSession,
url: str,
data: bytes,
part_no: int,
) -> Tuple[int, str]:
r"""Performs an asynchronous PUT request to upload data to a presigned S3
URL, and returns a tuple corresponding to the uploaded part number and
the Etag of the header.
Args:
session: the ``aiohttp`` client session to use for the request
url: the S3 presigned URL to PUT ``data`` to
data: the data (``bytes``) that should be PUT to ``url``
part_no: the part number of the data to be PUT
"""
# TODO(manan): add retry...
async with session.put(url, data=data) as res:
logger.debug("PUT part_no=%s bytes=%s", part_no, len(data))
_ = await res.text()
if res.status != 200:
raise RuntimeError(
f"PUT URL={url} failed: with status {res.status}: "
f"{res}")
headers = res.headers
return (part_no + 1, headers['Etag'])
async def multi_put(
loop: asyncio.AbstractEventLoop,
urls: List[str],
data: Generator[bytes, None, None],
tqdm_bar_position: int = 0,
) -> List[PartUploadMetadata]:
r"""Performs multiple asynchronous PUT requests of the data yielded
from the ``data`` generator to the specified URLs. If the data
generator is exhausted early, only a subset of URLs are used. If
the data generator is not exhausted by the URLs, uploaded data may
be corrupted!
"""
# TODO(manan): retry
# TODO(manan): properly stream chunks
async with aiohttp.ClientSession(
loop=loop,
connector=aiohttp.TCPConnector(verify_ssl=False),
headers={'Content-Type': 'binary'},
) as session:
results = await tqdm_asyncio.gather(
*[
put(session, url, data, i)
for i, (url, data) in enumerate(zip(urls, data))
], desc="Uploading chunks", position=tqdm_bar_position,
leave=False)
for r in results:
if isinstance(r, BaseException):
raise r
return [PartUploadMetadata(v[0], v[1]) for v in results]
def stream_read(
f: io.BufferedReader,
chunk_size: int,
) -> Generator[bytes, None, None]:
r"""Streams ``chunk_size`` contiguous bytes from buffered reader
``f`` each time the generator is yielded from.
"""
while True:
byte_buf = f.read(chunk_size)
if len(byte_buf) == 0:
# StopIteration:
break
yield byte_buf
[docs]def upload_table(
name: str,
path: str,
auto_partition: bool = True,
partition_size_mb: int = 250,
) -> None:
r"""Synchronously uploads a table located on your local machine to the
Kumo data plane. Tables uploaded in this way can be accessed with a
:class:`~kumoai.connector.FileUploadConnector`.
For files larger than 1GB, the table will be automatically partitioned
into smaller chunks and uploaded with common prefix that allows
FileUploadConnector to union them when reading.
.. warning::
Uploaded tables must be single files, either in parquet or CSV
format. Partitioned tables are not currently supported.
.. code-block:: python
import kumoai
from kumoai.connector import upload_table
# Upload a small table
upload_table(name="users", path="/data/users.parquet")
# Upload a large parquet table (will be automatically partitioned)
upload_table(name="transactions",
path="/data/large_transactions.parquet")
# Upload a large CSV table (will be automatically partitioned)
upload_table(name="sales", path="/data/large_sales.csv")
# Disable auto-partitioning (will raise error for large files)
upload_table(name="users", path="/data/users.parquet",
auto_partition=False)
Args:
name: The name of the table to be uploaded. The uploaded table can
be accessed from the :class:`~kumoai.connector.FileUploadConnector`
with this name.
path: The full path of the table to be uploaded, on the local
machine.
auto_partition: Whether to automatically partition large files (>1GB).
If False and file is >1GB, raises ValueError. Supports both
Parquet and CSV files.
"""
# Validate file type
if not (path.endswith(".parquet") or path.endswith(".csv")):
raise ValueError(f"Path {path} must be either a CSV or Parquet "
f"file. Partitioned data is not currently "
f"supported.")
file_size = os.path.getsize(path)
# Route based on file size
if file_size < MAX_PARTITION_SIZE:
return _upload_single_file(name, path)
if not auto_partition:
raise ValueError(f"File {path} is {file_size / (1024**3):.2f}GB, "
f"which exceeds the 1GB limit. Enable "
f"auto_partition=True to automatically partition "
f"large files.")
# Partition and upload large files
partition_size = partition_size_mb * 1024**2
if (partition_size > MAX_PARTITION_SIZE
or partition_size < MIN_PARTITION_SIZE):
raise ValueError(f"Partition size {partition_size_mb}MB must be "
f"between {MIN_PARTITION_SIZE / 1024**2}MB and "
f"{MAX_PARTITION_SIZE / 1024**2}MB.")
logger.info("File %s is large with size %s, partitioning for upload...",
path, file_size)
if path.endswith('.parquet'):
_upload_partitioned_parquet(name, path, partition_size)
else:
_upload_partitioned_csv(name, path, partition_size)
def _sanitize_columns(names: List[str]) -> Tuple[List[str], bool]:
_SAN_RE = re.compile(r"[^0-9A-Za-z]+")
new = [_SAN_RE.sub("_", n).strip("_") for n in names]
return new, new != names
def sanitize_file(src_path: str) -> Tuple[str, bool]:
"""
Sanitizes the columns of a CSV or Parquet file by replacing invalid
characters with underscores.
Returns a tuple of the new path and a boolean indicating if the file was
changed. If the file was not changed, the original path is returned.
If the file was changed, a temporary file is created and returned.
The temporary file should be deleted by the caller.
Args:
src_path: The path to the CSV or Parquet file to sanitize.
Returns:
A tuple of the new path and a boolean indicating if the file was
changed. If the file was not changed, the original path is returned.
If the file was changed, a temporary file is created and returned.
"""
if src_path.endswith('.parquet'):
try:
import pyarrow.parquet as pq
except ModuleNotFoundError:
logger.warning("pyarrow is not installed. Skipping column "
"sanitization for parquet file: %s", src_path)
return src_path, False
pf = pq.ParquetFile(src_path)
new_names, changed = _sanitize_columns(pf.schema.names)
if not changed:
return src_path, False
temp_file = tempfile.NamedTemporaryFile(
suffix='.parquet', delete=False)
# Create schema with sanitized column names
import pyarrow as pa
original_schema = pf.schema.to_arrow_schema()
fields = [field.with_name(new_name)
for field, new_name in zip(original_schema, new_names)]
sanitized_schema = pa.schema(fields)
writer = pq.ParquetWriter(temp_file.name, sanitized_schema)
for i in range(pf.num_row_groups):
tbl = pf.read_row_group(i).rename_columns(new_names)
writer.write_table(tbl)
writer.close()
return temp_file.name, True
elif src_path.endswith('.csv'):
cols = pd.read_csv(src_path, nrows=0).columns.tolist()
new_cols, changed = _sanitize_columns(cols)
if not changed:
return src_path, False
tmp = tempfile.NamedTemporaryFile(suffix='.csv', delete=False)
tmp_path = tmp.name
tmp.close()
reader = pd.read_csv(src_path, chunksize=1_000_000)
with open(tmp_path, 'w', encoding='utf-8', newline='') as out:
out.write(','.join(new_cols) + '\n') # header once
for chunk in reader:
chunk.columns = new_cols
chunk.to_csv(out, header=False, index=False)
return tmp_path, True
else:
raise ValueError(
f"File {src_path} must be either a CSV or Parquet file.")
def _upload_single_file(
name: str,
path: str,
tqdm_bar_position: int = 0,
) -> None:
r"""Upload a single file (original upload_table logic)."""
# Validate:
if not (path.endswith(".parquet") or path.endswith(".csv")):
raise ValueError(f"Path {path} must be either a CSV or Parquet "
f"file. Partitioned data is not currently "
f"supported.")
# Prepare upload (number of parts based on total size):
file_type = 'parquet' if path.endswith('parquet') else 'csv'
path, temp_file_created = sanitize_file(path)
sz = os.path.getsize(path)
if tqdm_bar_position == 0:
logger.info("Uploading table %s (path: %s), size=%s bytes", name, path,
sz)
upload_res = _start_table_upload(
table_name=name,
file_type=file_type,
file_size_bytes=sz,
)
# Chunk and upload:
urls = list(upload_res.presigned_part_urls.values())
loop = _KUMO_EVENT_LOOP
part_metadata_list_fut = asyncio.run_coroutine_threadsafe(
multi_put(loop, urls=urls, data=stream_read(
open(path, 'rb'),
CHUNK_SIZE,
), tqdm_bar_position=tqdm_bar_position), loop)
part_metadata_list = part_metadata_list_fut.result()
# Complete:
if tqdm_bar_position == 0:
logger.info("Upload complete. Validating table %s.", name)
for i in range(5):
try:
_complete_table_upload(
table_name=name,
file_type=file_type,
upload_path=upload_res.temp_upload_path,
upload_id=upload_res.upload_id,
parts_metadata=part_metadata_list,
)
except HTTPException as e:
if e.status_code == 500 and i < 4:
# TODO(manan): this can happen when DELETE above has
# not propagated. So we retry with delay here. We
# assume DELETE is processed reasonably quickly:
time.sleep(2**(i - 1))
continue
else:
raise e
else:
break
if tqdm_bar_position == 0:
logger.info("Completed uploading table %s to Kumo.", name)
if temp_file_created:
os.unlink(path)
def _upload_partitioned_parquet(
name: str,
path: str,
partition_size: int,
) -> None:
r"""Upload a large parquet file by partitioning it into smaller chunks."""
try:
import pyarrow.parquet as pq
except ModuleNotFoundError:
raise ModuleNotFoundError(
"pyarrow is not installed. "
"Needed to upload large (>1GB) parquet files. "
"Please install it with "
"`pip install pyarrow`.")
pf = pq.ParquetFile(path)
new_columns, _ = _sanitize_columns(pf.schema.names)
# Calculate partitions
partitions = []
part_idx = 0
current_size = 0
current_row_groups: list[int] = []
for rg_idx in range(pf.num_row_groups):
rg_size = pf.metadata.row_group(rg_idx).total_byte_size
if rg_size > MAX_PARTITION_SIZE:
raise ValueError(f"Row group {rg_idx} is larger than the "
f"maximum partition size {MAX_PARTITION_SIZE} "
f"bytes")
if (current_size + rg_size > partition_size and
current_row_groups):
partitions.append((part_idx, current_row_groups.copy()))
part_idx += 1
current_row_groups = []
current_size = 0
current_row_groups.append(rg_idx)
current_size += rg_size
if current_row_groups:
partitions.append((part_idx, current_row_groups))
logger.info("Splitting %s into %d partitions", path, len(partitions))
def writer(path: str, row_groups: List[int]) -> None:
# Create schema with sanitized column names
import pyarrow as pa
original_schema = pf.schema.to_arrow_schema()
fields = [field.with_name(new_name)
for field, new_name in zip(original_schema, new_columns)]
sanitized_schema = pa.schema(fields)
pq_writer = pq.ParquetWriter(path, sanitized_schema)
for rg_idx in row_groups:
tbl = pf.read_row_group(rg_idx).rename_columns(new_columns)
pq_writer.write_table(tbl)
pq_writer.close()
_upload_all_partitions(partitions, name, ".parquet", writer)
# validation done by _upload_single_file on each partition
logger.info("Upload complete. Validated table %s.", name)
def _upload_partitioned_csv(
name: str,
path: str,
partition_size: int,
) -> None:
r"""Upload a large CSV file by partitioning it into smaller chunks."""
# calculate partitions
partitions = []
part_idx = 0
columns = pd.read_csv(path, nrows=0).columns.tolist()
new_columns, _ = _sanitize_columns(columns)
with open(path, 'r', encoding='utf-8') as f:
# preserve header per partition
_ = f.readline() # skip header
header = ','.join(new_columns) + '\n'
header_size = len(header.encode('utf-8'))
current_lines = [header]
current_size = header_size
for line in f:
line_size = len(line.encode('utf-8'))
if (current_size + line_size > partition_size
and len(current_lines) > 1):
partitions.append((part_idx, current_lines.copy()))
part_idx += 1
current_lines = [header] # Start new partition with header
current_size = header_size
current_lines.append(line)
current_size += line_size
if len(current_lines) > 1: # More than just header
partitions.append((part_idx, current_lines))
logger.info("Splitting %s into %d partitions", path, len(partitions))
def writer(path: str, lines: List[str]) -> None:
with open(path, "w", encoding="utf-8") as f:
f.writelines(lines)
_upload_all_partitions(partitions, name, ".csv", writer)
# validation done by _upload_single_file on each partition
logger.info("Upload complete. Validated table %s.", name)
def _upload_all_partitions(
partitions: List[Tuple[int, Any]],
name: str,
file_suffix: str,
writer: Callable[[str, Any], None],
) -> None:
with tqdm(partitions, desc=f"Uploading {name}", position=0) as pbar:
for part_idx, partition_data in pbar:
partition_desc = f"Part {part_idx+1}/{len(partitions)}"
pbar.set_postfix_str(partition_desc)
_create_and_upload_partition(
name=name,
part_idx=part_idx,
file_suffix=file_suffix,
partition_writer=writer,
partition_data=partition_data,
tqdm_bar_position=1
)
def _create_and_upload_partition(
name: str,
part_idx: int,
file_suffix: str,
partition_writer: Callable[[str, Any], None],
partition_data: Any,
tqdm_bar_position: int = 0,
) -> None:
r"""Create a partition file, write to it, upload it, and delete the
local copy.
"""
partition_name = (f"{name}{file_suffix}/"
f"part_{part_idx+1:04d}{file_suffix}")
with tempfile.NamedTemporaryFile(suffix=file_suffix,
delete=False) as temp_file:
partition_path = temp_file.name
try:
partition_writer(partition_path, partition_data)
# Upload partition immediately with a nested progress bar
_upload_single_file(partition_name, partition_path,
tqdm_bar_position=tqdm_bar_position)
finally:
# clean up the temporary file, even if the upload fails
try:
os.unlink(partition_path)
except OSError:
pass # File might already be deleted or not exist
[docs]def delete_uploaded_table(
name: str,
file_type: str,
) -> None:
r"""Synchronously deletes a previously uploaded table from the Kumo data
plane.
.. code-block:: python
import kumoai
from kumoai.connector import delete_uploaded_table
# Assume we have uploaded a `.parquet` table named `users`,
# and we want to delete this table from Kumo:
delete_uploaded_table(name="users", file_type="parquet")
# Assume we have uploaded a `.csv` table named `orders`,
# and we want to delete this table from Kumo:
delete_uploaded_table(name="orders", file_type="csv")
Args:
name: The name of the table to be deleted. This table must have
previously been uploaded with a call to
:meth:`~kumoai.connector.upload_table`.
file_type: The file type of the table to be deleted; this can either
be :obj:`"parquet"` or :obj:`"csv"`
"""
assert file_type in {'parquet', 'csv'}
req = DeleteUploadedFileRequest(
source_table_name=name,
connector_id=CONNECTOR_ID_MAP[file_type],
)
global_state.client.connector_api.delete_file_upload(req)
logger.info("Successfully deleted table %s from Kumo.", name)
def replace_table(
name: str,
path: str,
file_type: str,
) -> None:
r"""Replaces an existing uploaded table on the Kumo data plane with a new
table.
.. code-block:: python
import kumoai
from kumoai.connector import replace_table
# Replace an existing `.csv` table named `users`
# with a new version located at `/data/new_users.csv`:
replace_table(
name="users",
path="/data/new_users.csv",
file_type="csv",
)
Args:
name: The name of the table to be replaced. This table must have
previously been uploaded with a call to
:meth:`~kumoai.connector.upload_table`.
path: The full path of the new table to be uploaded, on the
local machine.
file_type: The file type of the table to be replaced; this
can either be :obj:`"parquet"` or :obj:`"csv"`.
Raises:
ValueError: If the specified path does not point to a valid
`.csv` or `.parquet` file.
"""
# Validate:
if not (path.endswith(".parquet") or path.endswith(".csv")):
raise ValueError(f"Path {path} must be either a CSV or Parquet "
f"file. Partitioned data is not currently "
f"supported.")
try:
logger.info("Deleting previously uploaded table %s of type %s.", name,
file_type)
delete_uploaded_table(name=name, file_type=file_type)
except Exception:
# TODO(manan): fix this...
pass
logger.info("Uploading table %s.", name)
upload_table(name=name, path=path)
logger.info("Successfully replaced table %s with the new table.", name)
def _start_table_upload(
table_name: str,
file_type: str,
file_size_bytes: float,
) -> StartFileUploadResponse:
assert file_type in CONNECTOR_ID_MAP.keys()
req = StartFileUploadRequest(
source_table_name=table_name,
connector_id=CONNECTOR_ID_MAP[file_type],
num_parts=max(1, math.ceil(file_size_bytes / CHUNK_SIZE)),
)
return global_state.client.connector_api.start_file_upload(req)
def _complete_table_upload(
table_name: str,
file_type: str,
upload_path: str,
upload_id: str,
parts_metadata: List[PartUploadMetadata],
) -> None:
assert file_type in CONNECTOR_ID_MAP.keys()
req = CompleteFileUploadRequest(
source_table_name=table_name,
connector_id=CONNECTOR_ID_MAP[file_type],
temp_upload_path=upload_path,
upload_id=upload_id,
parts_metadata=parts_metadata,
)
return global_state.client.connector_api.complete_file_upload(req)