Skip to content

Commit 01d2efd

Browse files
Tests talk with real DFS
1 parent 68a73e2 commit 01d2efd

File tree

2 files changed

+144
-245
lines changed

2 files changed

+144
-245
lines changed

nisystemlink/clients/dataframe/_data_frame_client.py

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -312,10 +312,14 @@ def append_table_data(
312312
return
313313

314314
if isinstance(data, models.DataFrame):
315-
self._append_table_data_json(
316-
id,
317-
models.AppendTableDataRequest(frame=data, end_of_data=end_of_data)
318-
)
315+
# Only include end_of_data in the request if explicitly provided to avoid sending null
316+
if end_of_data is None:
317+
request_model = models.AppendTableDataRequest(frame=data)
318+
else:
319+
request_model = models.AppendTableDataRequest(
320+
frame=data, end_of_data=end_of_data
321+
)
322+
self._append_table_data_json(id, request_model)
319323
return
320324

321325
if isinstance(data, Iterable):
@@ -344,30 +348,36 @@ def append_table_data(
344348
)
345349

346350
def _generate_body() -> Iterable[memoryview]:
347-
data_iter = (b for b in (first_batch, *iterator))
348-
first = True
351+
data_iter = iter(data)
352+
try:
353+
batch = next(data_iter)
354+
except StopIteration:
355+
return
349356
with BytesIO() as buf:
350-
writer = None
351-
for batch in data_iter:
352-
if first:
353-
options = pa.ipc.IpcWriteOptions(compression="zstd")
354-
writer = pa.ipc.new_stream(buf, batch.schema, options=options)
355-
first = False
357+
options = pa.ipc.IpcWriteOptions(compression="zstd")
358+
writer = pa.ipc.new_stream(buf, batch.schema, options=options)
359+
360+
while True:
356361
writer.write_batch(batch)
357362
with buf.getbuffer() as view, view[0 : buf.tell()] as slice:
358363
yield slice
359364
buf.seek(0)
360-
if writer is not None:
361-
writer.close()
362-
with buf.getbuffer() as view:
363-
with view[0 : buf.tell()] as slice:
364-
yield slice
365+
try:
366+
batch = next(data_iter)
367+
except StopIteration:
368+
break
369+
370+
writer.close()
371+
with buf.getbuffer() as view:
372+
with view[0 : buf.tell()] as slice:
373+
yield slice
365374

366375
try:
376+
# Only include the endOfData query when caller specified it.
367377
self._append_table_data_arrow(
368378
id,
369379
_generate_body(),
370-
("true" if (end_of_data or False) else "false"),
380+
(end_of_data if end_of_data is not None else None),
371381
)
372382
except core.ApiException as ex:
373383
if ex.http_status_code == 400:

0 commit comments

Comments
 (0)