Skip to content

Commit 325f313

Browse files
committed
Fix PubsubUnboundedSink not following 1000 messages per batch limit
This commit fixes the issue where PubsubUnboundedSink's WriterFn was not properly enforcing the message count limit per publish batch. The publishBatchSize parameter was being passed to the constructor but not stored or used in the processElement method. Changes: - Add publishBatchSize field to WriterFn class - Store publishBatchSize in both WriterFn constructors - Add message count check in processElement alongside existing byte size check - Update both PubsubSinkDynamicDestinations and PubsubSink to pass publishBatchSize when creating WriterFn instances The fix ensures that batches are split when they reach either the message count limit or the byte size limit, preventing Pubsub from rejecting batches that exceed the 1000 messages per request limit. Fixes #36885
1 parent b313c30 commit 325f313

File tree

1 file changed

+12
-5
lines changed

1 file changed

+12
-5
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,7 @@ private static class OutgoingData {
248248
private final @Nullable ValueProvider<TopicPath> topic;
249249
private final String timestampAttribute;
250250
private final String idAttribute;
251+
private final int publishBatchSize;
251252
private final int publishBatchBytes;
252253

253254
private final String pubsubRootUrl;
@@ -270,6 +271,7 @@ private static class OutgoingData {
270271
this.topic = topic;
271272
this.timestampAttribute = timestampAttribute;
272273
this.idAttribute = idAttribute;
274+
this.publishBatchSize = publishBatchSize;
273275
this.publishBatchBytes = publishBatchBytes;
274276
this.pubsubRootUrl = null;
275277
}
@@ -279,12 +281,14 @@ private static class OutgoingData {
279281
@Nullable ValueProvider<TopicPath> topic,
280282
String timestampAttribute,
281283
String idAttribute,
284+
int publishBatchSize,
282285
int publishBatchBytes,
283286
String pubsubRootUrl) {
284287
this.pubsubFactory = pubsubFactory;
285288
this.topic = topic;
286289
this.timestampAttribute = timestampAttribute;
287290
this.idAttribute = idAttribute;
291+
this.publishBatchSize = publishBatchSize;
288292
this.publishBatchBytes = publishBatchBytes;
289293
this.pubsubRootUrl = pubsubRootUrl;
290294
}
@@ -354,13 +358,14 @@ public void processElement(ProcessContext c) throws Exception {
354358
if (currentBatch == null) {
355359
currentBatch = new OutgoingData();
356360
orderingKeyBatches.put(currentOrderingKey, currentBatch);
357-
} else if (currentBatch.bytes + message.getMessage().getData().size() > publishBatchBytes) {
361+
} else if (currentBatch.messages.size() >= publishBatchSize
362+
|| currentBatch.bytes + message.getMessage().getData().size() > publishBatchBytes) {
358363
// TODO(sjvanrossum): https://github.com/apache/beam/issues/31800
359364

360-
// Break large (in bytes) batches into smaller.
361-
// (We've already broken by batch size using the trigger below, though that may
362-
// run slightly over the actual PUBLISH_BATCH_SIZE. We'll consider that ok since
363-
// the hard limit from Pubsub is by bytes rather than number of messages.)
365+
// Break large batches into smaller by message count or byte size.
366+
// The trigger below breaks by batch size, but may run slightly over.
367+
// Pubsub has hard limits on both bytes and number of messages per batch.
368+
// See https://cloud.google.com/pubsub/quotas#resource_limits for details.
364369
// BLOCKS until published.
365370
publishBatch(currentBatch.messages, currentBatch.bytes);
366371
currentBatch.messages.clear();
@@ -659,6 +664,7 @@ public PDone expand(PCollection<KV<String, byte[]>> input) {
659664
outer.topic,
660665
outer.timestampAttribute,
661666
outer.idAttribute,
667+
outer.publishBatchSize,
662668
outer.publishBatchBytes,
663669
outer.pubsubRootUrl)));
664670
return PDone.in(input.getPipeline());
@@ -711,6 +717,7 @@ public PDone expand(PCollection<byte[]> input) {
711717
outer.topic,
712718
outer.timestampAttribute,
713719
outer.idAttribute,
720+
outer.publishBatchSize,
714721
outer.publishBatchBytes,
715722
outer.pubsubRootUrl)));
716723
return PDone.in(input.getPipeline());

0 commit comments

Comments
 (0)