Skip to content

Commit 5f128b1

Browse files
Address review feedback
1 parent 05605dd commit 5f128b1

File tree

7 files changed

+138
-23
lines changed

7 files changed

+138
-23
lines changed

README.rst

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,16 @@ To install **nisystemlink-clients**, use one of the following methods:
3434

3535
$ python -m easy_install nisystemlink-clients
3636

37+
Optional Arrow (pyarrow) Support
38+
--------------------------------
39+
The base install does not pull in ``pyarrow``. Install the optional extra if you
40+
plan to use Arrow RecordBatch ingestion with ``DataFrameClient.append_table_data``::
41+
42+
$ python -m pip install "nisystemlink-clients[pyarrow]"
43+
44+
Without the extra, Arrow-specific code paths will raise a ``RuntimeError`` when
45+
attempting to append ``pyarrow.RecordBatch`` instances.
46+
3747
.. _usage_section:
3848

3949
Usage

docs/api_reference/dataframe.rst

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,27 @@ nisystemlink.clients.dataframe
2525
.. automodule:: nisystemlink.clients.dataframe.models
2626
:members:
2727
:imported-members:
28+
29+
Arrow / JSON Ingestion Notes
30+
----------------------------
31+
``append_table_data`` accepts multiple data forms:
32+
33+
* ``AppendTableDataRequest`` (JSON)
34+
* ``DataFrame`` model (JSON)
35+
* Single ``pyarrow.RecordBatch`` (Arrow IPC)
36+
* Iterable of ``pyarrow.RecordBatch`` (Arrow IPC)
37+
* ``None`` with ``end_of_data`` (flush only)
38+
39+
Arrow support is optional and requires installing the ``pyarrow`` extra::
40+
41+
pip install "nisystemlink-clients[pyarrow]"
42+
43+
If ``pyarrow`` is not installed and a RecordBatch (or iterable) is passed, a
44+
``RuntimeError`` is raised. When Arrow is used, the batches are serialized into
45+
an IPC stream and sent with content type
46+
``application/vnd.apache.arrow.stream``; the ``end_of_data`` flag is sent as a
47+
query parameter. JSON ingestion places ``endOfData`` in the request body.
48+
49+
If the target SystemLink DataFrame Service does not yet support Arrow and
50+
responds with HTTP 400, the client raises an explanatory ``ApiException``
51+
advising to upgrade or fall back to JSON ingestion.

examples/dataframe/create_write_data.py

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import random
22
from datetime import datetime
33

4+
try:
5+
import pyarrow as pa # type: ignore
6+
except Exception:
7+
pa = None
48
from nisystemlink.clients.dataframe import DataFrameClient
59
from nisystemlink.clients.dataframe.models import (
610
AppendTableDataRequest,
@@ -25,12 +29,48 @@
2529
)
2630
)
2731

28-
# Generate example data
29-
frame = DataFrame(
30-
data=[[i, random.random(), datetime.now().isoformat()] for i in range(100)]
32+
# Append via explicit AppendTableDataRequest (JSON)
33+
frame_request = DataFrame(
34+
data=[[i, random.random(), datetime.now().isoformat()] for i in range(3)]
3135
)
36+
client.append_table_data(table_id, AppendTableDataRequest(frame=frame_request))
3237

33-
# Write example data to table
34-
client.append_table_data(
35-
table_id, data=AppendTableDataRequest(frame=frame, endOfData=True)
38+
# Append via DataFrame model directly (JSON)
39+
frame_direct = DataFrame(
40+
data=[[i + 3, random.random(), datetime.now().isoformat()] for i in range(3)]
3641
)
42+
client.append_table_data(table_id, frame_direct)
43+
44+
if pa is not None:
45+
# Append via single RecordBatch (Arrow)
46+
batch_single = pa.record_batch(
47+
[
48+
pa.array([6, 7, 8]),
49+
pa.array([0.1, 0.2, 0.3]),
50+
pa.array([datetime.now().isoformat()] * 3),
51+
],
52+
names=["ix", "Float_Column", "Timestamp_Column"],
53+
)
54+
client.append_table_data(table_id, batch_single)
55+
56+
# Append via iterable of RecordBatches (Arrow)
57+
batch_list = [
58+
pa.record_batch(
59+
[
60+
pa.array([9, 10]),
61+
pa.array([0.4, 0.5]),
62+
pa.array([datetime.now().isoformat(), datetime.now().isoformat()]),
63+
],
64+
names=["ix", "Float_Column", "Timestamp_Column"],
65+
)
66+
]
67+
client.append_table_data(table_id, batch_list)
68+
69+
# Flush only (no additional rows, mark end_of_data)
70+
client.append_table_data(table_id, None, end_of_data=True)
71+
72+
# Empty iterable (must supply end_of_data)
73+
client.append_table_data(table_id, [], end_of_data=True)
74+
else:
75+
# If pyarrow not installed, flush via JSON path
76+
client.append_table_data(table_id, None, end_of_data=True)

nisystemlink/clients/dataframe/_data_frame_client.py

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44
from io import BytesIO
55
from typing import List, Optional, Union
66

7-
import pyarrow as pa # type: ignore
7+
try:
8+
import pyarrow as pa # type: ignore
9+
except Exception:
10+
pa = None
811
from nisystemlink.clients import core
912
from nisystemlink.clients.core._uplink._base_client import BaseClient
1013
from nisystemlink.clients.core._uplink._methods import (
@@ -265,7 +268,7 @@ def _append_table_data_json(
265268
content_type="application/vnd.apache.arrow.stream",
266269
)
267270
def _append_table_data_arrow(
268-
self, id: str, data: bytes, end_of_data: Optional[bool] = None
271+
self, id: str, data: Iterable[bytes], end_of_data: Optional[bool] = None
269272
) -> None:
270273
"""Internal uplink-implemented Arrow (binary) append call."""
271274
...
@@ -277,6 +280,7 @@ def append_table_data(
277280
Union[
278281
models.AppendTableDataRequest,
279282
models.DataFrame,
283+
"pa.RecordBatch", # type: ignore[name-defined]
280284
Iterable["pa.RecordBatch"], # type: ignore[name-defined]
281285
]
282286
],
@@ -291,9 +295,12 @@ def append_table_data(
291295
* AppendTableDataRequest: Sent as-is via JSON; ``end_of_data`` must be ``None``.
292296
* DataFrame (service model): Wrapped into an AppendTableDataRequest (``end_of_data``
293297
optional) and sent as JSON.
298+
* Single pyarrow.RecordBatch: Treated the same as an iterable containing one batch
299+
and streamed as Arrow IPC. ``end_of_data`` (if provided) is sent as a query
300+
parameter.
294301
* Iterable[pyarrow.RecordBatch]: Streamed as Arrow IPC. ``end_of_data`` (if provided)
295-
is sent as a query parameter. If the iterator yields no batches, it is treated as
296-
``None`` and requires ``end_of_data``.
302+
is sent as a query parameter. If the iterator yields no batches, it is treated
303+
as ``None`` and requires ``end_of_data``.
297304
* None: ``end_of_data`` must be provided; sends JSON containing only the
298305
``endOfData`` flag.
299306
end_of_data: Whether additional rows may be appended in future requests. Required when
@@ -323,6 +330,9 @@ def append_table_data(
323330
self._append_table_data_json(id, request_model)
324331
return
325332

333+
if pa is not None and isinstance(data, pa.RecordBatch):
334+
data = [data]
335+
326336
if isinstance(data, Iterable):
327337
iterator = iter(data)
328338
try:
@@ -348,22 +358,36 @@ def append_table_data(
348358
"Iterable provided to data must yield pyarrow.RecordBatch objects."
349359
)
350360

351-
def _build_body() -> bytes:
361+
def _generate_body() -> Iterable[memoryview]:
362+
data_iter = iter(data)
363+
try:
364+
batch = next(data_iter)
365+
except StopIteration:
366+
return
352367
with BytesIO() as buf:
353368
options = pa.ipc.IpcWriteOptions(compression="zstd")
354-
with pa.ipc.new_stream(
355-
buf, first_batch.schema, options=options
356-
) as writer:
357-
writer.write_batch(first_batch)
358-
for batch in iterator:
359-
writer.write_batch(batch)
360-
return buf.getvalue()
369+
writer = pa.ipc.new_stream(buf, batch.schema, options=options)
370+
371+
while True:
372+
writer.write_batch(batch)
373+
with buf.getbuffer() as view, view[0 : buf.tell()] as slice:
374+
yield slice
375+
buf.seek(0)
376+
try:
377+
batch = next(data_iter)
378+
except StopIteration:
379+
break
380+
381+
writer.close()
382+
with buf.getbuffer() as view:
383+
with view[0 : buf.tell()] as slice:
384+
yield slice
361385

362386
try:
363387
self._append_table_data_arrow(
364388
id,
365-
_build_body(),
366-
(end_of_data if end_of_data is not None else None),
389+
_generate_body(),
390+
end_of_data,
367391
)
368392
except core.ApiException as ex:
369393
if ex.http_status_code == 400:

poetry.lock

Lines changed: 6 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,10 @@ uplink = [
4343
pydantic = "^2.11.3"
4444
pyyaml = "^6.0.1"
4545
pandas = "^2.1.0"
46-
pyarrow = "^21.0.0"
46+
pyarrow = { version = "^21.0.0", optional = true }
47+
48+
[tool.poetry.extras]
49+
pyarrow = ["pyarrow"]
4750

4851
[tool.poetry.group.dev.dependencies]
4952
black = ">=22.10,<25.0"

tests/integration/dataframe/test_dataframe.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -698,6 +698,16 @@ def test__append_table_data__arrow_ingestion_success(
698698
with pytest.raises(ApiException):
699699
client.append_table_data(table_id, None, end_of_data=True)
700700

701+
def test__append_table_data__single_recordbatch_success(
702+
self, client: DataFrameClient, create_table
703+
):
704+
pa = pytest.importorskip("pyarrow")
705+
table_id = self._new_single_int_table(create_table)
706+
batch = pa.record_batch([pa.array([1, 2, 3])], names=["a"])
707+
client.append_table_data(table_id, batch, end_of_data=True)
708+
with pytest.raises(ApiException):
709+
client.append_table_data(table_id, None, end_of_data=True)
710+
701711
def test__append_table_data__arrow_ingestion_with_end_of_data_query_param_false(
702712
self, client: DataFrameClient, create_table
703713
):

0 commit comments

Comments
 (0)