Skip to content

Commit a975186

Browse files
committed
Fix: Update BigQuery Storage Arrow samples batching logic
Bases batching on size rather than row count to avoid exceeding an internal 10MB limit. Also removes an obsolete assertion in the test.
1 parent 4927067 commit a975186

File tree

1 file changed

+36
-10
lines changed

1 file changed

+36
-10
lines changed

packages/google-cloud-bigquery-storage/samples/pyarrow/append_rows_with_arrow.py

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -160,17 +160,43 @@ def generate_pyarrow_table(num_rows=TABLE_LENGTH):
160160

161161

162162
def generate_write_requests(pyarrow_table):
163-
# Determine max_chunksize of the record batches. Because max size of
164-
# AppendRowsRequest is 10 MB, we need to split the table if it's too big.
165-
# See: https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#appendrowsrequest
166-
max_request_bytes = 10 * 2**20 # 10 MB
167-
chunk_num = int(pyarrow_table.nbytes / max_request_bytes) + 1
168-
chunk_size = int(pyarrow_table.num_rows / chunk_num)
169-
170-
# Construct request(s).
171-
for batch in pyarrow_table.to_batches(max_chunksize=chunk_size):
163+
# Maximum size for a single AppendRowsRequest is 10 MB.
164+
# To be safe, we'll aim for a soft limit of 7 MB.
165+
max_request_bytes = 7 * 1024 * 1024 # 7 MB
166+
167+
batches_in_request = []
168+
current_size = 0
169+
170+
# Split table into batches of one row.
171+
for row_batch in pyarrow_table.to_batches(max_chunksize=1):
172+
serialized_batch = row_batch.serialize().to_pybytes()
173+
batch_size = len(serialized_batch)
174+
175+
if batch_size > max_request_bytes:
176+
raise ValueError(
177+
f"A single PyArrow batch of one row is larger than the maximum request size (batch size: {batch_size} > max request size: {max_request_bytes}). "
178+
"Cannot proceed."
179+
)
180+
181+
if current_size + batch_size > max_request_bytes and batches_in_request:
182+
# Combine collected batches and yield request
183+
combined_table = pa.Table.from_batches(batches_in_request)
184+
request = gapic_types.AppendRowsRequest()
185+
request.arrow_rows.rows.serialized_record_batch = combined_table.serialize().to_pybytes()
186+
yield request
187+
188+
# Reset for next request.
189+
batches_in_request = []
190+
current_size = 0
191+
192+
batches_in_request.append(row_batch)
193+
current_size += batch_size
194+
195+
# Yield any remaining batches
196+
if batches_in_request:
197+
combined_table = pa.Table.from_batches(batches_in_request)
172198
request = gapic_types.AppendRowsRequest()
173-
request.arrow_rows.rows.serialized_record_batch = batch.serialize().to_pybytes()
199+
request.arrow_rows.rows.serialized_record_batch = combined_table.serialize().to_pybytes()
174200
yield request
175201

176202

0 commit comments

Comments
 (0)