Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ private static class OutgoingData {
private final @Nullable ValueProvider<TopicPath> topic;
private final String timestampAttribute;
private final String idAttribute;
private final int publishBatchSize;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when publishBatchSize is not explicitly set? Does it default to 1000 (the Pub/Sub limit)? If so, this is safe. If it defaults to 0 or some other value, there could be a regression. It would be helpful to add a test that verifies the default behavior without explicitly setting the batch size.

private final int publishBatchBytes;

private final String pubsubRootUrl;
Expand All @@ -270,6 +271,7 @@ private static class OutgoingData {
this.topic = topic;
this.timestampAttribute = timestampAttribute;
this.idAttribute = idAttribute;
this.publishBatchSize = publishBatchSize;
this.publishBatchBytes = publishBatchBytes;
this.pubsubRootUrl = null;
}
Expand All @@ -279,12 +281,14 @@ private static class OutgoingData {
@Nullable ValueProvider<TopicPath> topic,
String timestampAttribute,
String idAttribute,
int publishBatchSize,
int publishBatchBytes,
String pubsubRootUrl) {
this.pubsubFactory = pubsubFactory;
this.topic = topic;
this.timestampAttribute = timestampAttribute;
this.idAttribute = idAttribute;
this.publishBatchSize = publishBatchSize;
this.publishBatchBytes = publishBatchBytes;
this.pubsubRootUrl = pubsubRootUrl;
}
Expand Down Expand Up @@ -354,13 +358,14 @@ public void processElement(ProcessContext c) throws Exception {
if (currentBatch == null) {
currentBatch = new OutgoingData();
orderingKeyBatches.put(currentOrderingKey, currentBatch);
} else if (currentBatch.bytes + message.getMessage().getData().size() > publishBatchBytes) {
} else if (currentBatch.messages.size() >= publishBatchSize
|| currentBatch.bytes + message.getMessage().getData().size() > publishBatchBytes) {
// TODO(sjvanrossum): https://github.com/apache/beam/issues/31800

// Break large (in bytes) batches into smaller.
// (We've already broken by batch size using the trigger below, though that may
// run slightly over the actual PUBLISH_BATCH_SIZE. We'll consider that ok since
// the hard limit from Pubsub is by bytes rather than number of messages.)
// Break large batches into smaller by message count or byte size.
// The trigger below breaks by batch size, but may run slightly over.
// Pubsub has hard limits on both bytes and number of messages per batch.
// See https://cloud.google.com/pubsub/quotas#resource_limits for details.
// BLOCKS until published.
publishBatch(currentBatch.messages, currentBatch.bytes);
currentBatch.messages.clear();
Expand Down Expand Up @@ -659,6 +664,7 @@ public PDone expand(PCollection<KV<String, byte[]>> input) {
outer.topic,
outer.timestampAttribute,
outer.idAttribute,
outer.publishBatchSize,
outer.publishBatchBytes,
outer.pubsubRootUrl)));
return PDone.in(input.getPipeline());
Expand Down Expand Up @@ -711,6 +717,7 @@ public PDone expand(PCollection<byte[]> input) {
outer.topic,
outer.timestampAttribute,
outer.idAttribute,
outer.publishBatchSize,
outer.publishBatchBytes,
outer.pubsubRootUrl)));
return PDone.in(input.getPipeline());
Expand Down
Loading