Skip to content

Commit da95872

Browse files
authored
Merge pull request #1203 from danielballan/patch-query-params
Move array patch specification from `PUT` body to query parameters
2 parents 7edb969 + 4670a63 commit da95872

File tree

8 files changed

+103
-37
lines changed

8 files changed

+103
-37
lines changed

CHANGELOG.md

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,45 @@ Write the date in place of the "Unreleased" in the case a new version is release
99

1010
- Tests to ensure that CSVAdapter can be used with a subset of columns.
1111
- `locking` key-word argument in HDFAdapter and HDF5Adapter.
12+
- Support for streaming uploaded tabular data.
1213

1314
### Changed
1415

1516
- Enable Tiled server to accept bearer access tokens for authentication.
1617
- Modernize implementation of Device Code Flow.
18+
- The websocket messages contain a `"type"` field and are not formally
19+
specified and validated using pydantic models in `tiled.stream_messages`.
1720
- In previous releases, the method `Subscription.start` launched a background
1821
thread. This was renamed to `Subscription.start_in_thread`. Now
1922
`Subscription.start` blocks the current thread, leaving any thread management
2023
up to the caller.
2124
- Encapsulated redis code in adapter.py into a StreamingCache object
22-
- Renamed `Subscription.stop` to `Subscription.close` to match the property
23-
`Subscription.closed`.
25+
- Renamed `Subscription.stop` to `Subscription.disconnect`.
2426
- In `Subscription`, use a configurable `concurrent.futures.Executor` to
2527
execute callbacks.
28+
- Removed `Subscription.add_callback`, replaced by separate callback registries
29+
connected to specific types of updates:
30+
31+
```python
32+
Subscription.stream_closed # writer-initiated
33+
Subscription.disconnected # subscriber-initiated
34+
ContainerSubscription.child_created
35+
ContainerSubscription.child_metadata_updated
36+
ArraySubscription.new_data
37+
TableSubscription.new_data
38+
```
39+
40+
- The signature of subscription callbacks has been changed. The
41+
connection-related callbacks `stream_closed` and `disconnected`
42+
receive `f(subscription: Subscription)`. The other callbacks
43+
receive an instance of `tiled.client.stream.LiveUpdate`,
44+
which provide a reference to the subscription
45+
`update.subscription`, all the data from the websocket message,
46+
and update-specific convenience methods such as
47+
`LiveContainerUpdate.child()` when a new child is created in a
48+
container and `ArraySubscription.new_data()` or
49+
`TableSubscription.new_data()` to conveniently access an array
50+
or table respectively.
2651

2752
### Fixed
2853

docs/source/tutorials/streaming.md

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
Streaming is an experimental feature in Tiled, subject to backward-compatible
66
changes without notice.
77
8-
Additionally, it is only currently supported on arrays and containers.
9-
Support for tables and other structures is planned but not yet implemented.
8+
Additionally, it is only currently supported on containers, arrays (both
9+
uploaded and external registered data), and tables (uploaded data only).
10+
Support for external registered tabular data and other structures (sparse,
11+
awkward) is planned but not yet implemented.
1012
1113
```
1214

@@ -177,5 +179,6 @@ serve` CLI they are `--cache-data-ttl` and `--cache-seq-ttl`.)
177179

178180
This feature is in a very early preview stage.
179181

180-
- Other data structures (table, sparse, awkward) are not yet supported.
181-
- Updates to metadata and deletions of nodes are not yet visible to subscribers.
182+
- External registered tabular data is not yet supported.
183+
- Other data structures (sparse, awkward) are not yet supported.
184+
- Deletions of nodes are not yet visible to subscribers.

tiled/_tests/test_subscription.py

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from ..structures.core import StructureFamily
1818
from ..structures.data_source import Asset, DataSource, Management
1919
from ..utils import safe_json_dump
20+
from .utils import fail_with_status_code
2021

2122
pytestmark = pytest.mark.skipif(
2223
sys.platform == "win32", reason="Requires Redis service"
@@ -377,14 +378,27 @@ def on_child_created(update):
377378
num=3,
378379
),
379380
)
381+
params = {
382+
"patch_shape": ",".join(map(str, [1, 7, 13])),
383+
"patch_offset": ",".join(map(str, [2, 0, 0])),
384+
}
385+
386+
# First test invalid requests that will be bounced by the server.
387+
for key in params:
388+
bad_params = params.copy()
389+
bad_params.pop(key) # missing one of the patch params!
390+
with fail_with_status_code(400):
391+
x.context.http_client.put(
392+
x.uri.replace("/metadata/", "/data_source/", 1),
393+
content=safe_json_dump({"data_source": updated_data_source}),
394+
params=bad_params,
395+
).raise_for_status()
396+
397+
# Now do a request that is valid.
380398
x.context.http_client.put(
381399
x.uri.replace("/metadata/", "/data_source/", 1),
382-
content=safe_json_dump(
383-
{
384-
"data_source": updated_data_source,
385-
"patch": {"shape": (1, 7, 13), "offset": (2, 0, 0), "extend": True},
386-
}
387-
),
400+
content=safe_json_dump({"data_source": updated_data_source}),
401+
params=params,
388402
).raise_for_status()
389403
assert event.wait(timeout=5.0), "Timeout waiting for messages"
390404
x.close_stream()
@@ -395,7 +409,6 @@ def on_child_created(update):
395409
(update,) = updates
396410
assert update.patch.shape == (1, 7, 13)
397411
assert update.patch.offset == (2, 0, 0)
398-
assert update.patch.extend
399412
actual_streamed = update.data()
400413
np.testing.assert_array_equal(actual_streamed, arr[2:])
401414

@@ -519,7 +532,7 @@ def collect(update):
519532
assert_frame_equal(actual_updated, df2)
520533

521534

522-
def test_streaming_table_appends(tiled_websocket_context):
535+
def test_streaming_table_append(tiled_websocket_context):
523536
context = tiled_websocket_context
524537
client = from_context(context)
525538
updates = []

tiled/client/stream.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -655,11 +655,11 @@ class LiveTableData(TableData):
655655
def data(self):
656656
"Get table"
657657
# Registration occurs on import. Ensure this is imported.
658-
from ..serialization import array
658+
from ..serialization import table
659659

660-
del array
660+
del table
661661

662-
# Decode payload (bytes) into array.
662+
# Decode payload (bytes) into table.
663663
deserializer = default_deserialization_registry.dispatch("table", self.mimetype)
664664
return deserializer(self.payload)
665665

tiled/server/dependencies.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,3 +160,25 @@ def offset_param(
160160
):
161161
"Specify and parse an offset parameter."
162162
return tuple(map(int, offset.split(",")))
163+
164+
165+
def patch_shape_param(
166+
patch_shape: Optional[str] = Query(
167+
None, min_length=1, pattern="^[0-9]+(,[0-9]+)*$|^scalar$"
168+
),
169+
):
170+
"Specify and parse an array patch shape parameter."
171+
if patch_shape is None:
172+
return None
173+
return tuple(map(int, patch_shape.split(",")))
174+
175+
176+
def patch_offset_param(
177+
patch_offset: Optional[str] = Query(
178+
None, min_length=1, pattern="^[0-9]+(,[0-9]+)*$"
179+
),
180+
):
181+
"Specify and parse an array patch offset parameter."
182+
if patch_offset is None:
183+
return None
184+
return tuple(map(int, patch_offset.split(",")))

tiled/server/router.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
from .. import __version__
5353
from ..links import links_for_node
5454
from ..ndslice import NDSlice
55+
from ..stream_messages import ArrayPatch
5556
from ..structures.core import Spec, StructureFamily
5657
from ..type_aliases import AccessTags, Scopes
5758
from ..utils import BrokenLink, ensure_awaitable, patch_mimetypes, path_from_uri
@@ -89,6 +90,8 @@
8990
get_entry,
9091
get_root_tree,
9192
offset_param,
93+
patch_offset_param,
94+
patch_shape_param,
9295
shape_param,
9396
)
9497
from .file_response_with_range import FileResponseWithRange
@@ -1636,6 +1639,8 @@ async def put_data_source(
16361639
session_state: dict = Depends(get_session_state),
16371640
authn_access_tags: Optional[AccessTags] = Depends(get_current_access_tags),
16381641
authn_scopes: Scopes = Depends(get_current_scopes),
1642+
patch_shape: Optional[tuple[int, ...]] = Depends(patch_shape_param),
1643+
patch_offset: Optional[tuple[int, ...]] = Depends(patch_offset_param),
16391644
_=Security(check_scopes, scopes=["write:metadata", "register"]),
16401645
):
16411646
entry = await get_entry(
@@ -1650,7 +1655,23 @@ async def put_data_source(
16501655
None,
16511656
getattr(request.app.state, "access_policy", None),
16521657
)
1653-
await entry.put_data_source(data_source=body.data_source, patch=body.patch)
1658+
patch_params = {
1659+
"shape": patch_shape,
1660+
"offset": patch_offset,
1661+
}
1662+
if all(value is None for value in patch_params.values()):
1663+
patch = None
1664+
elif all(value is not None for value in patch_params.values()):
1665+
patch = ArrayPatch(**patch_params)
1666+
else:
1667+
raise HTTPException(
1668+
status_code=HTTP_400_BAD_REQUEST,
1669+
detail=(
1670+
"The query parameters patch_shape and patch_offset"
1671+
"go together; either all or none must be specified."
1672+
),
1673+
)
1674+
await entry.put_data_source(data_source=body.data_source, patch=patch)
16541675

16551676
@router.delete("/metadata/{path:path}")
16561677
async def delete(

tiled/server/schemas.py

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,7 @@
33
import enum
44
import uuid
55
from datetime import datetime
6-
from typing import (
7-
TYPE_CHECKING,
8-
Any,
9-
Dict,
10-
Generic,
11-
List,
12-
Optional,
13-
Tuple,
14-
TypeVar,
15-
Union,
16-
)
6+
from typing import TYPE_CHECKING, Any, Dict, Generic, List, Optional, TypeVar, Union
177

188
import pydantic.generics
199
from pydantic import ConfigDict, Field, StringConstraints
@@ -153,12 +143,6 @@ def from_orm(cls, orm: tiled.catalog.orm.Revision) -> Revision:
153143
)
154144

155145

156-
class Patch(pydantic.BaseModel):
157-
offset: Tuple[int, ...]
158-
shape: Tuple[int, ...]
159-
extend: bool
160-
161-
162146
class DataSource(pydantic.BaseModel, Generic[StructureT]):
163147
id: Optional[int] = None
164148
structure_family: StructureFamily
@@ -465,7 +449,6 @@ def narrow_structure_type(self):
465449

466450
class PutDataSourceRequest(pydantic.BaseModel):
467451
data_source: DataSource
468-
patch: Optional[Patch] = None
469452

470453

471454
class PostMetadataResponse(pydantic.BaseModel, Generic[ResourceLinksT]):

tiled/stream_messages.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,6 @@ class ArrayData(Update):
8888
class ArrayPatch(BaseModel):
8989
offset: tuple[int, ...]
9090
shape: tuple[int, ...]
91-
extend: bool
9291

9392

9493
class ArrayRef(Update):

0 commit comments

Comments
 (0)