Skip to content

Commit c9394d0

Browse files
committed
Add client sending dictionary updates to test and remove unnecessary started_ boolean flag
1 parent 71f69c0 commit c9394d0

File tree

2 files changed

+16
-9
lines changed

2 files changed

+16
-9
lines changed

cpp/src/arrow/flight/transport_server.cc

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -206,17 +206,16 @@ class TransportMessageWriter final : public FlightMessageWriter {
206206

207207
Status Begin(const std::shared_ptr<Schema>& schema,
208208
const ipc::IpcWriteOptions& options) override {
209-
ipc_options_ = options;
210209
if (batch_writer_) {
211210
return Status::Invalid("This writer has already been started.");
212211
}
212+
ipc_options_ = options;
213213
std::unique_ptr<ipc::internal::IpcPayloadWriter> payload_writer(
214214
new TransportMessagePayloadWriter(stream_, &app_metadata_));
215215

216216
ARROW_ASSIGN_OR_RAISE(batch_writer_,
217217
ipc::internal::OpenRecordBatchWriter(std::move(payload_writer),
218218
schema, ipc_options_));
219-
started_ = true;
220219
return Status::OK();
221220
}
222221

@@ -264,7 +263,7 @@ class TransportMessageWriter final : public FlightMessageWriter {
264263

265264
private:
266265
Status CheckStarted() {
267-
if (!started_) {
266+
if (!batch_writer_) {
268267
return Status::Invalid("This writer is not started. Call Begin() with a schema");
269268
}
270269
return Status::OK();
@@ -274,7 +273,6 @@ class TransportMessageWriter final : public FlightMessageWriter {
274273
std::unique_ptr<ipc::RecordBatchWriter> batch_writer_;
275274
std::shared_ptr<Buffer> app_metadata_;
276275
::arrow::ipc::IpcWriteOptions ipc_options_;
277-
bool started_ = false;
278276
int64_t extra_messages_ = 0;
279277
};
280278

python/pyarrow/tests/test_flight.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2542,13 +2542,17 @@ def test_flight_dictionary_deltas_do_exchange():
25422542
class DeltaFlightServer(ConstantFlightServer):
25432543
def do_exchange(self, context, descriptor, reader, writer):
25442544
if descriptor.command == b'dict_deltas':
2545-
table = simple_dicts_table()
2545+
expected_table = simple_dicts_table()
2546+
received_table = reader.read_all()
2547+
assert received_table.equals(expected_table)
2548+
25462549
options = pa.ipc.IpcWriteOptions(emit_dictionary_deltas=True)
2547-
writer.begin(table.schema, options=options)
2548-
writer.write_table(table)
2550+
writer.begin(expected_table.schema, options=options)
2551+
writer.write_table(expected_table)
25492552

25502553
with DeltaFlightServer() as server, \
25512554
FlightClient(('localhost', server.port)) as client:
2555+
expected_table = simple_dicts_table()
25522556

25532557
descriptor = flight.FlightDescriptor.for_command(b"dict_deltas")
25542558
writer, reader = client.do_exchange(descriptor,
@@ -2557,7 +2561,12 @@ def do_exchange(self, context, descriptor, reader, writer):
25572561
emit_dictionary_deltas=True)
25582562
)
25592563
)
2564+
# Send client table with dictionary updates (deltas should be sent)
2565+
with writer:
2566+
writer.begin(expected_table.schema, options=pa.ipc.IpcWriteOptions(
2567+
emit_dictionary_deltas=True))
2568+
writer.write_table(expected_table)
2569+
writer.done_writing()
2570+
received_table = reader.read_all()
25602571

2561-
received_table = reader.read_all()
2562-
expected_table = simple_dicts_table()
25632572
assert received_table.equals(expected_table)

0 commit comments

Comments
 (0)