Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,14 @@ Write the date in place of the "Unreleased" in the case a new version is release

## Unreleased

### Changed

- The `write_table` method now attempts to write the data as an appendable table if
the `appendable` argument is set to True (default). If the server does not support
this, it will fall back to writing a regular non-appendable table and issue a warning.

## v0.2.7 (2026-02-27)

### Fixed

- A potential race condition when subscribing to an already started stream
Expand Down
92 changes: 50 additions & 42 deletions tiled/catalog/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
FileStorage,
ObjectStorage,
SQLStorage,
UnsupportedStorageError,
get_storage,
parse_storage,
register_storage,
Expand Down Expand Up @@ -657,7 +658,55 @@ async def create_node(
access_blob = access_blob or {}
key = key or self.context.key_maker()
data_sources = data_sources or []
storage_for_mimetype = {}

# Check that data sources have valid mimetypes and that we have adapters/storage for them
for data_source in data_sources:
if data_source.management != Management.external:
if structure_family == StructureFamily.container:
raise NotImplementedError(structure_family)
if data_source.mimetype is None:
data_source.mimetype = DEFAULT_CREATION_MIMETYPE[
data_source.structure_family
]
if data_source.mimetype not in STORAGE_ADAPTERS_BY_MIMETYPE:
raise HTTPException(
status_code=415,
detail=(
f"The given data source mimetype, {data_source.mimetype}, "
"is not one that the Tiled server knows how to write."
),
)
adapter_cls = STORAGE_ADAPTERS_BY_MIMETYPE[data_source.mimetype]
# Choose writable storage. Use the first writable storage item
# with a scheme that is supported by this adapter.
# For back-compat, if an adapter does not declare `supported_storage`
# assume it supports file-based storage only.
supported_storage = getattr(
adapter_cls, "supported_storage", lambda: {FileStorage}
)()
for storage in self.context.writable_storage.values():
if isinstance(storage, tuple(supported_storage)):
storage_for_mimetype[data_source.mimetype] = storage
break
else:
raise UnsupportedStorageError(
f"Attempted to use {adapter_cls.__name__}, which supports storage types "
f"{[cls.__name__ for cls in supported_storage]}, "
"but the only available storage types "
f"are {[val.__class__.__name__ for val in self.context.writable_storage.values()]}." # noqa
)
else:
if data_source.mimetype not in self.context.adapters_by_mimetype:
raise HTTPException(
status_code=HTTP_415_UNSUPPORTED_MEDIA_TYPE,
detail=(
f"The given data source mimetype, {data_source.mimetype}, "
"is not one that the Tiled server knows how to read."
),
)

# If all the checks pass, create the node in the database and the storage for the data sources
node = orm.Node(
key=key,
parent=self.node.id,
Expand All @@ -682,54 +731,13 @@ async def create_node(
await db.refresh(node)
for data_source in data_sources:
if data_source.management != Management.external:
if structure_family == StructureFamily.container:
raise NotImplementedError(structure_family)
if data_source.mimetype is None:
data_source.mimetype = DEFAULT_CREATION_MIMETYPE[
data_source.structure_family
]
if data_source.mimetype not in STORAGE_ADAPTERS_BY_MIMETYPE:
raise HTTPException(
status_code=415,
detail=(
f"The given data source mimetype, {data_source.mimetype}, "
"is not one that the Tiled server knows how to write."
),
)
adapter_cls = STORAGE_ADAPTERS_BY_MIMETYPE[data_source.mimetype]
# Choose writable storage. Use the first writable storage item
# with a scheme that is supported by this adapter.
# For back-compat, if an adapter does not declare `supported_storage`
# assume it supports file-based storage only.
supported_storage = getattr(
adapter_cls, "supported_storage", lambda: {FileStorage}
)()
for storage in self.context.writable_storage.values():
if isinstance(storage, tuple(supported_storage)):
break
else:
raise RuntimeError(
f"The adapter {adapter_cls} supports storage types "
f"{[cls.__name__ for cls in supported_storage]} "
"but the only available storage types "
f"are {self.context.writable_storage.values()}."
)
data_source = await ensure_awaitable(
adapter_cls.init_storage,
storage,
storage_for_mimetype[data_source.mimetype],
data_source,
await self.path_segments() + [key],
)
else:
if data_source.mimetype not in self.context.adapters_by_mimetype:
raise HTTPException(
status_code=HTTP_415_UNSUPPORTED_MEDIA_TYPE,
detail=(
f"The given data source mimetype, {data_source.mimetype}, "
"is not one that the Tiled server knows how to read."
),
)

if data_source.structure is None:
structure_id = None
else:
Expand Down
22 changes: 18 additions & 4 deletions tiled/client/composite.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import itertools as it
import time
from typing import TYPE_CHECKING, Iterable, Optional, Union
from typing import TYPE_CHECKING, Any, Iterable, Optional, Union
from urllib.parse import parse_qs, urlparse

from ..structures.core import StructureFamily
from .container import LENGTH_CACHE_TTL, Container
from .utils import MSGPACK_MIME_TYPE, handle_error, retry_context

if TYPE_CHECKING:
import pandas
import pyarrow


Expand Down Expand Up @@ -251,14 +252,27 @@ def new(
)

def write_table(
self, data, *, key=None, metadata=None, specs=None, access_tags=None
self,
data: Union["pandas.DataFrame", dict[str, Any]],
*,
key=None,
metadata=None,
specs=None,
access_tags=None,
appendable=True,
):
if set(self.keys()).intersection(data.columns):
columns = set(data.keys()) if isinstance(data, dict) else data.columns
if set(self.keys()).intersection(columns):
raise ValueError(
"DataFrame columns must not overlap with existing keys in the composite node."
)
return super().write_table(
data, key=key, metadata=metadata, specs=specs, access_tags=access_tags
data,
key=key,
metadata=metadata,
specs=specs,
access_tags=access_tags,
appendable=appendable,
)

def create_appendable_table(
Expand Down
92 changes: 82 additions & 10 deletions tiled/client/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -1140,6 +1140,7 @@ def write_table(
metadata=None,
specs=None,
access_tags=None,
appendable: bool = True,
):
"""Write tabular data.

Expand All @@ -1160,6 +1161,9 @@ def write_table(
conform to some named standard specification.
access_tags: List[str], optional
Server-specific authZ tags in list form, used to confer access to the node.
appendable: bool, optional
If True, attempt to create an appendable table. If the server does not support this,
a regular non-appendable table will be created instead and a warning will be issued.

See Also
--------
Expand All @@ -1170,6 +1174,27 @@ def write_table(

from ..structures.table import TableStructure

if hasattr(data, "partitions") and not isinstance(
data, dask.dataframe.DataFrame
):
raise NotImplementedError(
f"The `write_table` method does not support {type(data)} with partitions. "
"Please convert it to dask.dataframe.DataFrame first."
)

if Version(self.context.server_info.library_version) <= Version("0.2.8"):
if appendable:
warnings.warn(
"You are trying to create an appendable table, but the `write_table` "
"method can not be used for this with the current server version (<0.2.9)."
"If SQL-backed storage is configured on the server, use the explicit "
"`create_appendable_table` method instead and then `append_partition` "
"to add data to it. "
"Otherwise, to silence this warning and use a fixed-size storage format, "
"set `appendable=False` when calling `write_table`."
)
appendable = False

if isinstance(data, dask.dataframe.DataFrame):
structure = TableStructure.from_dask_dataframe(data)
elif isinstance(data, dict):
Expand All @@ -1178,6 +1203,58 @@ def write_table(
else:
structure = TableStructure.from_pandas(data)

# First attempt to create an appendable table if requested.
# If the server does not support it, fall back to creating a regular table.
if appendable:
import pyarrow

try:
client = self.create_appendable_table(
schema=structure.arrow_schema_decoded,
npartitions=structure.npartitions,
key=key,
metadata=metadata,
specs=specs,
access_tags=access_tags,
)
except ClientError as err:
if (
err.response.status_code == httpx.codes.NOT_ACCEPTABLE
and "Requested storage type is not supported" in err.response.text
):
warnings.warn(
"You are trying to create an appendable table, but the server does not "
"support this; the table will be saved in a fixed-size format instead. "
"We recommend configuring the Tiled server with an SQL-backed storage, "
"which would enable this capability. Otherwise, to silence this warning, "
"set `appendable=False` when calling `write_table`."
)
appendable = False
else:
raise

# If we created an appendable table, write to it and return it.
if appendable:
if hasattr(data, "partitions"):

def _append_partition(x, partition_info, client):
table = pyarrow.Table.from_pandas(x)
client.append_partition(partition_info["number"], table)
return x

data.map_partitions(
functools.partial(_append_partition, client=client), meta=data._meta
).compute()
else:
table = (
pyarrow.Table.from_pydict(data)
if isinstance(data, dict)
else pyarrow.Table.from_pandas(data)
)
client.append_partition(0, table)
return client

# If we didn't create an appendable table, create a regular table instead.
client = self.new(
StructureFamily.table,
[
Expand All @@ -1193,12 +1270,12 @@ def write_table(
)

if hasattr(data, "partitions"):
if isinstance(data, dask.dataframe.DataFrame):
ddf = data
else:
raise NotImplementedError(f"Unsure how to handle type {type(data)}")

ddf.map_partitions(
def _write_partition(x, partition_info, client):
client.write_partition(partition_info["number"], x)
return x

data.map_partitions(
functools.partial(_write_partition, client=client), meta=data._meta
).compute()
else:
Expand Down Expand Up @@ -1302,11 +1379,6 @@ def __call__(self):
return self.obj


def _write_partition(x, partition_info, client):
client.write_partition(partition_info["number"], x)
return x


DEFAULT_STRUCTURE_CLIENT_DISPATCH = {
"numpy": OneShotCachedMap(
{
Expand Down
23 changes: 15 additions & 8 deletions tiled/server/router.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from .. import __version__
from ..links import links_for_node
from ..ndslice import NDSlice
from ..storage import UnsupportedStorageError
from ..stream_messages import ArrayPatch
from ..structures.core import Spec, StructureFamily
from ..type_aliases import AccessTags, Scopes
Expand Down Expand Up @@ -1601,14 +1602,20 @@ async def _create_node(
access_blob_modified = access_blob != {}
access_blob = {}

node = await entry.create_node(
metadata=body.metadata,
structure_family=body.structure_family,
key=key,
specs=body.specs,
data_sources=body.data_sources,
access_blob=access_blob,
)
try:
node = await entry.create_node(
metadata=body.metadata,
structure_family=body.structure_family,
key=key,
specs=body.specs,
data_sources=body.data_sources,
access_blob=access_blob,
)
except UnsupportedStorageError as err:
raise HTTPException(
status_code=HTTP_406_NOT_ACCEPTABLE,
detail=f"Requested storage type is not supported: {err.args[0]}",
)
links = links_for_node(
structure_family, structure, get_base_url(request), path + f"/{node.key}"
)
Expand Down
5 changes: 5 additions & 0 deletions tiled/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@
SUPPORTED_OBJECT_URI_SCHEMES = {"http", "https"} # TODO: Add "s3", "gs", "azure", "az"


class UnsupportedStorageError(ValueError):
"Raised when the Tiled server does not have a suitable storage for a given adapter."
pass


@dataclasses.dataclass(frozen=True)
class Storage:
"Base class for representing storage location"
Expand Down
Loading