Source code for kumoai.connector.s3_connector
import logging
from typing import List, Optional
from kumoapi.data_source import DataSourceType, FileConnectorResourceConfig
from kumoapi.source_table import (
S3SourceTableRequest,
SourceTableConfigRequest,
SourceTableConfigResponse,
SourceTableListRequest,
)
from typing_extensions import Self, override
from kumoai import global_state
from kumoai.connector import Connector
from kumoai.connector.source_table import SourceTable
logger = logging.getLogger(__name__)
_DEFAULT_NAME = 's3_connector'
[docs]class S3Connector(Connector):
r"""Defines a connector to a table stored as a file (or partitioned
set of files) on the Amazon `S3 <https://aws.amazon.com/s3/>`__ object
store. Any table behind an S3 bucket accessible by the shared external IAM
role can be accessed through this connector.
.. code-block:: python
import kumoai
connector = kumoai.S3Connector(root_dir="s3://...") # an S3 path.
# List all tables:
print(connector.table_names()) # Returns: ['articles', 'customers', 'users']
# Check whether a table is present:
assert "articles" in connector
# Fetch a source table (both approaches are equivalent):
source_table = connector["articles"]
source_table = connector.table("articles")
Args:
root_dir: The root directory of this connector. If provided, the root
directory is used as a prefix for tables in this connector. If not
provided, all tables must be specified by their full S3 paths.
""" # noqa
[docs] def __init__(
self,
root_dir: Optional[str] = None,
_connector_id: Optional[str] = None,
) -> None:
if _connector_id is not None:
# UI S3Connector, named:
self._connector_id = _connector_id
self.root_dir = None
return
self._connector_id = _DEFAULT_NAME
if root_dir is not None:
# Remove trailing / to be consistent with boto s3
root_dir = root_dir.rstrip('/')
self.root_dir = root_dir
if global_state.is_spcs and root_dir is not None \
and root_dir.startswith('s3://'):
raise ValueError(
"S3 connectors are not supported when running Kumo in "
"Snowpark container services. Please use a Snowflake "
"connector instead.")
@override
@property
def name(self) -> str:
r"""Not supported by :class:`S3Connector`; returns an internal
specifier.
"""
return self._connector_id
@override
@property
def source_type(self) -> DataSourceType:
return DataSourceType.S3
@override
def _source_table_request(
self,
table_names: List[str],
) -> S3SourceTableRequest:
root_dir = self.root_dir
if not root_dir and self.name == _DEFAULT_NAME:
# Handle None root directories (table name is a path):
table_path = S3URI(table_names[0]).validate()
root_dir = table_path.root_dir
for i, v in enumerate(table_names):
uri = S3URI(v)
if uri.root_dir != root_dir:
# TODO(manan): fix
raise ValueError(
f"Please ensure that all of your tables are behind "
f"the same root directory ({root_dir}).")
table_names[i] = uri.object_name
connector_id = self.name if self.name != _DEFAULT_NAME else None
root_dir = root_dir if self.name == _DEFAULT_NAME else ""
# TODO(manan): file type?
return S3SourceTableRequest(
s3_root_dir=root_dir,
connector_id=connector_id,
table_names=table_names,
)
[docs] @override
def table(self, name: str) -> SourceTable:
r"""Returns a :class:`~kumoai.connector.SourceTable` object
corresponding to a source table on Amazon S3.
Args:
name: The name of the table on S3. If :obj:`root_dir` is provided,
the path will be specified as :obj:`root_dir/name`. If
:obj:`root_dir` is not provided, the name should be the full
path (e.g. starting with ``s3://``).
Raises:
:class:`ValueError`: if ``name`` does not exist in the backing
connector.
"""
# NOTE only overridden for documentation purposes.
return super().table(name)
@override
def _list_tables(self) -> List[str]:
connector_id = self.name if self.name != _DEFAULT_NAME else None
root_dir = self.root_dir if self.name == _DEFAULT_NAME else None
if root_dir is None and connector_id is None:
raise ValueError(
"Listing tables without a specified root directory is not "
"supported. Please specify a root directory to continue; "
"alternatively, please access individual tables with their "
"full S3 paths.")
req = SourceTableListRequest(connector_id=connector_id,
root_dir=root_dir,
source_type=DataSourceType.S3)
return global_state.client.source_table_api.list_tables(req)
@override
def _get_table_config(self, table_name: str) -> SourceTableConfigResponse:
root_dir = self.root_dir
if not root_dir and self.name == _DEFAULT_NAME:
# Handle None root directories (table name is a path):
table_path = S3URI(table_name).validate()
root_dir = table_path.root_dir
table_name = table_path.object_name
connector_id = self.name if self.name != _DEFAULT_NAME else None
root_dir = root_dir if self.name == _DEFAULT_NAME else None
req = SourceTableConfigRequest(
connector_id=connector_id,
root_dir=root_dir,
table_name=table_name,
source_type=self.source_type,
)
return global_state.client.source_table_api.get_table_config(req)
# Class properties ########################################################
[docs] @classmethod
def get_by_name(cls, name: str) -> Self:
r"""Returns an instance of a named S3 Connector, created in the Kumo UI.
.. note::
Named S3 connectors are read-only: if you would like to modify the
root directory, please do so from the UI.
Args:
name: The name of the existing connector.
Example:
>>> import kumoai
>>> connector = kumoai.S3Connector.get_by_name("name") # doctest: +SKIP # noqa: E501
"""
api = global_state.client.connector_api
resp = api.get(name)
if resp is None:
raise ValueError(
f"There does not exist an existing stored connector with name "
f"{name}.")
config = resp.config
assert isinstance(config, FileConnectorResourceConfig)
return cls(
root_dir=None,
_connector_id=config.name,
)
def __repr__(self) -> str:
if self.name != _DEFAULT_NAME:
return f'{self.__class__.__name__}(name={self.name})'
root_dir_name = f"\"{self.root_dir}\"" if self.root_dir else "None"
return f'{self.__class__.__name__}(root_dir={root_dir_name})'
class S3URI:
r"""A utility class to parse and navigate S3 URIs."""
def __init__(self, uri: str):
self.uri: str = uri
if uri.endswith('/'): # remove trailing slash
self.uri = uri[:-1]
@property
def is_valid(self) -> bool:
# TODO(zeyuan): For SPCS, the path can be a local filesystem path
# For train/pred table.
if global_state.is_spcs:
return True
# TODO(manan): implement more checks...
return self.uri.startswith("s3://")
def validate(self) -> Self:
if not self.is_valid:
raise ValueError(f"Path {self.uri} is not a valid S3 URI.")
return self
@property
def root_dir(self) -> str:
self.validate()
return self.uri.rsplit('/', 1)[0]
@property
def object_name(self) -> str:
self.validate()
return self.uri.rsplit('/', 1)[1]
# Class properties ########################################################
def __repr__(self) -> str:
return (f'{self.__class__.__name__}('
f'uri={self.uri}, valid={self.is_valid})')