Skip to content

Commit 5806c25

Browse files
committed
limit size of AppendRows requests
1 parent 3e8c863 commit 5806c25

File tree

2 files changed

+48
-7
lines changed

2 files changed

+48
-7
lines changed

src/bigquery_proto_writer.cpp

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -238,11 +238,6 @@ void BigqueryProtoWriter::WriteChunk(DataChunk &chunk, const std::map<std::strin
238238
throw BinderException("Cannot get message prototype from message descriptor");
239239
}
240240

241-
// Create the append request
242-
google::cloud::bigquery::storage::v1::AppendRowsRequest request;
243-
request.set_write_stream(write_stream.name());
244-
msg_descriptor->CopyTo(request.mutable_proto_rows()->mutable_writer_schema()->mutable_proto_descriptor());
245-
246241
vector<idx_t> column_indexes;
247242
if (column_idxs.empty()) {
248243
column_indexes.resize(chunk.ColumnCount());
@@ -255,7 +250,18 @@ void BigqueryProtoWriter::WriteChunk(DataChunk &chunk, const std::map<std::strin
255250
}
256251
}
257252

253+
auto create_request = [this]() {
254+
google::cloud::bigquery::storage::v1::AppendRowsRequest new_request;
255+
new_request.set_write_stream(write_stream.name());
256+
msg_descriptor->CopyTo(new_request.mutable_proto_rows()->mutable_writer_schema()->mutable_proto_descriptor());
257+
return new_request;
258+
};
259+
260+
auto request = create_request();
258261
auto *rows = request.mutable_proto_rows()->mutable_rows();
262+
idx_t rows_in_batch = 0;
263+
size_t current_request_bytes = request.ByteSizeLong();
264+
259265
for (idx_t i = 0; i < chunk.size(); i++) {
260266
google::protobuf::Message *msg = msg_prototype->New();
261267
const google::protobuf::Reflection *reflection = msg->GetReflection();
@@ -290,10 +296,42 @@ void BigqueryProtoWriter::WriteChunk(DataChunk &chunk, const std::map<std::strin
290296
if (!msg->SerializeToString(&serialized_msg)) {
291297
throw std::runtime_error("Failed to serialize message");
292298
}
299+
auto estimated_size_increase = serialized_msg.size() + APPEND_ROWS_ROW_OVERHEAD;
300+
301+
if (rows_in_batch > 0 &&
302+
current_request_bytes + estimated_size_increase > DEFAULT_APPEND_ROWS_SOFT_LIMIT) {
303+
SendAppendRequest(request);
304+
request = create_request();
305+
rows = request.mutable_proto_rows()->mutable_rows();
306+
rows_in_batch = 0;
307+
current_request_bytes = request.ByteSizeLong();
308+
}
309+
293310
rows->add_serialized_rows(serialized_msg);
311+
rows_in_batch++;
312+
current_request_bytes += estimated_size_increase;
313+
314+
if (current_request_bytes >= DEFAULT_APPEND_ROWS_SOFT_LIMIT) {
315+
SendAppendRequest(request);
316+
request = create_request();
317+
rows = request.mutable_proto_rows()->mutable_rows();
318+
rows_in_batch = 0;
319+
current_request_bytes = request.ByteSizeLong();
320+
}
294321
delete msg;
295322
}
296323

324+
if (rows_in_batch > 0) {
325+
SendAppendRequest(request);
326+
}
327+
}
328+
329+
void BigqueryProtoWriter::SendAppendRequest(
330+
const google::cloud::bigquery::storage::v1::AppendRowsRequest &request) {
331+
if (!request.has_proto_rows() || request.proto_rows().rows().serialized_rows_size() == 0) {
332+
return;
333+
}
334+
297335
int max_retries = 100;
298336
for (int attempt = 0; attempt < max_retries; attempt++) {
299337
auto handle_broken_stream = [this](char const *where) {
@@ -322,7 +360,6 @@ void BigqueryProtoWriter::WriteChunk(DataChunk &chunk, const std::map<std::strin
322360
}
323361
}
324362

325-
// GET THE RESPONSE AND ERROR HANDLING
326363
auto response = grpc_stream->Read().get();
327364
if (!response) {
328365
if (attempt < max_retries - 1) {

src/include/bigquery_proto_writer.hpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ class BigqueryProtoWriter {
3939
void Finalize();
4040

4141
private:
42+
static constexpr idx_t DEFAULT_APPEND_ROWS_SOFT_LIMIT = 9 * 1024 * 1024;
43+
static constexpr idx_t APPEND_ROWS_ROW_OVERHEAD = 32;
44+
45+
void SendAppendRequest(const google::cloud::bigquery::storage::v1::AppendRowsRequest &request);
46+
4247
string table_string;
4348

4449
google::protobuf::DescriptorPool pool;
@@ -51,7 +56,6 @@ class BigqueryProtoWriter {
5156
std::unique_ptr<google::cloud::AsyncStreamingReadWriteRpc<google::cloud::bigquery::storage::v1::AppendRowsRequest,
5257
google::cloud::bigquery::storage::v1::AppendRowsResponse>>
5358
grpc_stream;
54-
// google::cloud::bigquery::storage::v1::AppendRowsRequest append_request;
5559
};
5660

5761
} // namespace bigquery

0 commit comments

Comments
 (0)