Skip to content

Commit be78e64

Browse files
feat: track batch size using serialized size of PublishRequest (#2113)
* feat: track batch size using serialized size of PublishRequest * fix: compare against batchedBytes instead of messageSize in flush condition * fix: also count static overhead in flow control * fix: adjust thresholds in tests * fix: clean up merge issue * fix: revert use of topicNameSize in MessageFlowController * fix: store topicNameSize as initialBatchedBytes in MessagesBatch --------- Co-authored-by: Mike Prieto <[email protected]>
1 parent 6c67798 commit be78e64

File tree

2 files changed

+20
-9
lines changed

2 files changed

+20
-9
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import com.google.cloud.pubsub.v1.stub.PublisherStub;
4747
import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
4848
import com.google.common.base.Preconditions;
49+
import com.google.protobuf.CodedOutputStream;
4950
import com.google.pubsub.v1.PublishRequest;
5051
import com.google.pubsub.v1.PublishResponse;
5152
import com.google.pubsub.v1.PubsubMessage;
@@ -99,6 +100,7 @@ public class Publisher implements PublisherInterface {
99100
private static final String OPEN_TELEMETRY_TRACER_NAME = "com.google.cloud.pubsub.v1";
100101

101102
private final String topicName;
103+
private final int topicNameSize;
102104

103105
private final BatchingSettings batchingSettings;
104106
private final boolean enableMessageOrdering;
@@ -145,6 +147,8 @@ public static long getApiMaxRequestBytes() {
145147

146148
private Publisher(Builder builder) throws IOException {
147149
topicName = builder.topicName;
150+
topicNameSize =
151+
CodedOutputStream.computeStringSize(PublishRequest.TOPIC_FIELD_NUMBER, this.topicName);
148152

149153
this.batchingSettings = builder.batchingSettings;
150154
FlowControlSettings flowControl = this.batchingSettings.getFlowControlSettings();
@@ -309,7 +313,7 @@ public ApiFuture<String> publish(PubsubMessage message) {
309313
}
310314
MessagesBatch messagesBatch = messagesBatches.get(orderingKey);
311315
if (messagesBatch == null) {
312-
messagesBatch = new MessagesBatch(batchingSettings, orderingKey);
316+
messagesBatch = new MessagesBatch(batchingSettings, topicNameSize, orderingKey);
313317
messagesBatches.put(orderingKey, messagesBatch);
314318
}
315319

@@ -636,7 +640,9 @@ private static final class OutstandingPublish {
636640
OutstandingPublish(PubsubMessageWrapper messageWrapper) {
637641
this.publishResult = SettableApiFuture.create();
638642
this.messageWrapper = messageWrapper;
639-
this.messageSize = messageWrapper.getPubsubMessage().getSerializedSize();
643+
this.messageSize =
644+
CodedOutputStream.computeMessageSize(
645+
PublishRequest.MESSAGES_FIELD_NUMBER, messageWrapper.getPubsubMessage());
640646
}
641647
}
642648

@@ -1093,12 +1099,15 @@ void release(long messageSize) {
10931099

10941100
private class MessagesBatch {
10951101
private List<OutstandingPublish> messages;
1102+
private int initialBatchedBytes;
10961103
private int batchedBytes;
10971104
private String orderingKey;
10981105
private final BatchingSettings batchingSettings;
10991106

1100-
private MessagesBatch(BatchingSettings batchingSettings, String orderingKey) {
1107+
private MessagesBatch(
1108+
BatchingSettings batchingSettings, int initialBatchedBytes, String orderingKey) {
11011109
this.batchingSettings = batchingSettings;
1110+
this.initialBatchedBytes = initialBatchedBytes;
11021111
this.orderingKey = orderingKey;
11031112
reset();
11041113
}
@@ -1111,7 +1120,7 @@ private OutstandingBatch popOutstandingBatch() {
11111120

11121121
private void reset() {
11131122
messages = new LinkedList<>();
1114-
batchedBytes = 0;
1123+
batchedBytes = initialBatchedBytes;
11151124
}
11161125

11171126
private boolean isEmpty() {
@@ -1150,7 +1159,9 @@ && getBatchedBytes() + outstandingPublish.messageSize >= getMaxBatchBytes()) {
11501159
// immediately.
11511160
// Alternatively if after adding the message we have reached the batch max messages then we
11521161
// have a batch to send.
1153-
if ((hasBatchingBytes() && outstandingPublish.messageSize >= getMaxBatchBytes())
1162+
// Note that exceeding {@link Publisher#getApiMaxRequestBytes()} will result in failed
1163+
// publishes without compression and may yet fail if a request is not sufficiently compressed.
1164+
if ((hasBatchingBytes() && getBatchedBytes() >= getMaxBatchBytes())
11541165
|| getMessagesCount() == batchingSettings.getElementCountThreshold()) {
11551166
batchesToSend.add(popOutstandingBatch());
11561167
}

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/PublisherImplTest.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -447,7 +447,7 @@ public void testLargeMessagesDoNotReorderBatches() throws Exception {
447447
Publisher.Builder.DEFAULT_BATCHING_SETTINGS
448448
.toBuilder()
449449
.setElementCountThreshold(10L)
450-
.setRequestByteThreshold(20L)
450+
.setRequestByteThreshold(64L)
451451
.setDelayThreshold(Duration.ofSeconds(100))
452452
.build())
453453
.setEnableMessageOrdering(true)
@@ -1150,7 +1150,7 @@ public void testPublishFlowControl_throwException() throws Exception {
11501150
.setLimitExceededBehavior(
11511151
FlowController.LimitExceededBehavior.ThrowException)
11521152
.setMaxOutstandingElementCount(1L)
1153-
.setMaxOutstandingRequestBytes(10L)
1153+
.setMaxOutstandingRequestBytes(13L)
11541154
.build())
11551155
.build())
11561156
.build();
@@ -1192,7 +1192,7 @@ public void testPublishFlowControl_throwExceptionWithOrderingKey() throws Except
11921192
.setLimitExceededBehavior(
11931193
FlowController.LimitExceededBehavior.ThrowException)
11941194
.setMaxOutstandingElementCount(1L)
1195-
.setMaxOutstandingRequestBytes(10L)
1195+
.setMaxOutstandingRequestBytes(13L)
11961196
.build())
11971197
.build())
11981198
.setEnableMessageOrdering(true)
@@ -1238,7 +1238,7 @@ public void testPublishFlowControl_block() throws Exception {
12381238
FlowControlSettings.newBuilder()
12391239
.setLimitExceededBehavior(FlowController.LimitExceededBehavior.Block)
12401240
.setMaxOutstandingElementCount(2L)
1241-
.setMaxOutstandingRequestBytes(10L)
1241+
.setMaxOutstandingRequestBytes(13L)
12421242
.build())
12431243
.build())
12441244
.build();

0 commit comments

Comments
 (0)