Skip to content

Commit 0fa4dcf

Browse files
authored
Merge pull request #136 from hafenkran/fix_appendrows_with_over_10mb
Soft-limit AppendRows requests and fix post-create scans for BigQuery extension
2 parents 3e8c863 + 8f9c6b3 commit 0fa4dcf

File tree

4 files changed

+96
-7
lines changed

4 files changed

+96
-7
lines changed

src/bigquery_arrow_scan.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,10 +285,17 @@ void BigqueryArrowScanFunction::BigqueryArrowScanExecute(ClientContext &ctx,
285285
NumericCast<idx_t>(state.chunk->arrow_array.length) - state.chunk_offset);
286286
data.lines_read += output_size;
287287

288+
auto ensure_column_id_coverage = [&](DataChunk &chunk) {
289+
if (!state.column_ids.empty() && state.column_ids.size() != chunk.ColumnCount()) {
290+
state.column_ids.clear();
291+
}
292+
};
293+
288294
if (gstate.CanRemoveFilterColumns()) {
289295
state.all_columns.Reset();
290296
state.all_columns.SetCardinality(output_size);
291297

298+
ensure_column_id_coverage(state.all_columns);
292299
ArrowTableFunction::ArrowToDuckDB(state,
293300
data.arrow_table.GetColumns(),
294301
state.all_columns,
@@ -358,13 +365,15 @@ void BigqueryArrowScanFunction::BigqueryArrowScanExecute(ClientContext &ctx,
358365
bool do_cast = data.requires_cast || geometry_cast_needed;
359366
if (!do_cast) {
360367
// Direct write to output
368+
ensure_column_id_coverage(output);
361369
ArrowTableFunction::ArrowToDuckDB(state,
362370
data.arrow_table.GetColumns(),
363371
output,
364372
data.lines_read - output_size);
365373
} else {
366374
state.all_columns.Reset();
367375
state.all_columns.SetCardinality(output_size);
376+
ensure_column_id_coverage(state.all_columns);
368377
ArrowTableFunction::ArrowToDuckDB(state,
369378
data.arrow_table.GetColumns(),
370379
state.all_columns,

src/bigquery_proto_writer.cpp

Lines changed: 41 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -238,11 +238,6 @@ void BigqueryProtoWriter::WriteChunk(DataChunk &chunk, const std::map<std::strin
238238
throw BinderException("Cannot get message prototype from message descriptor");
239239
}
240240

241-
// Create the append request
242-
google::cloud::bigquery::storage::v1::AppendRowsRequest request;
243-
request.set_write_stream(write_stream.name());
244-
msg_descriptor->CopyTo(request.mutable_proto_rows()->mutable_writer_schema()->mutable_proto_descriptor());
245-
246241
vector<idx_t> column_indexes;
247242
if (column_idxs.empty()) {
248243
column_indexes.resize(chunk.ColumnCount());
@@ -255,7 +250,18 @@ void BigqueryProtoWriter::WriteChunk(DataChunk &chunk, const std::map<std::strin
255250
}
256251
}
257252

253+
auto create_request = [this]() {
254+
google::cloud::bigquery::storage::v1::AppendRowsRequest new_request;
255+
new_request.set_write_stream(write_stream.name());
256+
msg_descriptor->CopyTo(new_request.mutable_proto_rows()->mutable_writer_schema()->mutable_proto_descriptor());
257+
return new_request;
258+
};
259+
260+
auto request = create_request();
258261
auto *rows = request.mutable_proto_rows()->mutable_rows();
262+
idx_t rows_in_batch = 0;
263+
size_t current_request_bytes = request.ByteSizeLong();
264+
259265
for (idx_t i = 0; i < chunk.size(); i++) {
260266
google::protobuf::Message *msg = msg_prototype->New();
261267
const google::protobuf::Reflection *reflection = msg->GetReflection();
@@ -290,10 +296,40 @@ void BigqueryProtoWriter::WriteChunk(DataChunk &chunk, const std::map<std::strin
290296
if (!msg->SerializeToString(&serialized_msg)) {
291297
throw std::runtime_error("Failed to serialize message");
292298
}
299+
auto estimated_size_increase = serialized_msg.size() + APPEND_ROWS_ROW_OVERHEAD;
300+
301+
if (rows_in_batch > 0 && current_request_bytes + estimated_size_increase > DEFAULT_APPEND_ROWS_SOFT_LIMIT) {
302+
SendAppendRequest(request);
303+
request = create_request();
304+
rows = request.mutable_proto_rows()->mutable_rows();
305+
rows_in_batch = 0;
306+
current_request_bytes = request.ByteSizeLong();
307+
}
308+
293309
rows->add_serialized_rows(serialized_msg);
310+
rows_in_batch++;
311+
current_request_bytes += estimated_size_increase;
312+
313+
if (current_request_bytes >= DEFAULT_APPEND_ROWS_SOFT_LIMIT) {
314+
SendAppendRequest(request);
315+
request = create_request();
316+
rows = request.mutable_proto_rows()->mutable_rows();
317+
rows_in_batch = 0;
318+
current_request_bytes = request.ByteSizeLong();
319+
}
294320
delete msg;
295321
}
296322

323+
if (rows_in_batch > 0) {
324+
SendAppendRequest(request);
325+
}
326+
}
327+
328+
void BigqueryProtoWriter::SendAppendRequest(const google::cloud::bigquery::storage::v1::AppendRowsRequest &request) {
329+
if (!request.has_proto_rows() || request.proto_rows().rows().serialized_rows_size() == 0) {
330+
return;
331+
}
332+
297333
int max_retries = 100;
298334
for (int attempt = 0; attempt < max_retries; attempt++) {
299335
auto handle_broken_stream = [this](char const *where) {
@@ -322,7 +358,6 @@ void BigqueryProtoWriter::WriteChunk(DataChunk &chunk, const std::map<std::strin
322358
}
323359
}
324360

325-
// GET THE RESPONSE AND ERROR HANDLING
326361
auto response = grpc_stream->Read().get();
327362
if (!response) {
328363
if (attempt < max_retries - 1) {

src/include/bigquery_proto_writer.hpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ class BigqueryProtoWriter {
3939
void Finalize();
4040

4141
private:
42+
static constexpr idx_t DEFAULT_APPEND_ROWS_SOFT_LIMIT = 9 * 1024 * 1024;
43+
static constexpr idx_t APPEND_ROWS_ROW_OVERHEAD = 32;
44+
45+
void SendAppendRequest(const google::cloud::bigquery::storage::v1::AppendRowsRequest &request);
46+
4247
string table_string;
4348

4449
google::protobuf::DescriptorPool pool;
@@ -51,7 +56,6 @@ class BigqueryProtoWriter {
5156
std::unique_ptr<google::cloud::AsyncStreamingReadWriteRpc<google::cloud::bigquery::storage::v1::AppendRowsRequest,
5257
google::cloud::bigquery::storage::v1::AppendRowsResponse>>
5358
grpc_stream;
54-
// google::cloud::bigquery::storage::v1::AppendRowsRequest append_request;
5559
};
5660

5761
} // namespace bigquery
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# name: test/sql/storage/attach_large_insert.test
2+
# description: Ensure large string inserts (single chunk > 10 MiB) succeed by batching AppendRows requests
3+
# group: [storage]
4+
5+
require bigquery
6+
7+
require-env BQ_TEST_PROJECT
8+
9+
require-env BQ_TEST_DATASET
10+
11+
statement ok
12+
ATTACH 'project=${BQ_TEST_PROJECT} dataset=${BQ_TEST_DATASET}' AS bq (TYPE bigquery);
13+
14+
statement ok
15+
CREATE OR REPLACE TABLE bq.${BQ_TEST_DATASET}.large_json_insert (
16+
id INTEGER,
17+
payload_a STRING,
18+
payload_b STRING,
19+
payload_c STRING,
20+
payload_d STRING,
21+
payload_e STRING
22+
);
23+
24+
statement ok
25+
INSERT INTO bq.${BQ_TEST_DATASET}.large_json_insert
26+
SELECT
27+
i,
28+
'{"payload_a":"' || repeat('A', 100000) || '"}',
29+
'{"payload_b":"' || repeat('B', 100000) || '"}',
30+
'{"payload_c":"' || repeat('C', 100000) || '"}',
31+
'{"payload_d":"' || repeat('D', 100000) || '"}',
32+
'{"payload_e":"' || repeat('E', 100000) || '"}'
33+
FROM range(100) tbl(i);
34+
35+
query I
36+
SELECT COUNT(*) FROM bq.${BQ_TEST_DATASET}.large_json_insert;
37+
----
38+
100
39+
40+
statement ok
41+
DROP TABLE bq.${BQ_TEST_DATASET}.large_json_insert;

0 commit comments

Comments
 (0)