Skip to content

Commit 224c269

Browse files
authored
fix: Use a separate cached thread pool for handling ack and modack response callback for EOD-enabled subscriptions (#2505)
1 parent f911cb0 commit 224c269

File tree

3 files changed

+34
-2
lines changed

3 files changed

+34
-2
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@
6161
import java.util.Set;
6262
import java.util.UUID;
6363
import java.util.concurrent.ConcurrentHashMap;
64+
import java.util.concurrent.Executor;
65+
import java.util.concurrent.ExecutorService;
6466
import java.util.concurrent.ScheduledExecutorService;
6567
import java.util.concurrent.TimeUnit;
6668
import java.util.concurrent.atomic.AtomicBoolean;
@@ -97,6 +99,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
9799
private final String subscription;
98100
private final SubscriptionName subscriptionNameObject;
99101
private final ScheduledExecutorService systemExecutor;
102+
private final ExecutorService eodAckCallbackExecutor;
100103
private final MessageDispatcher messageDispatcher;
101104

102105
private final FlowControlSettings flowControlSettings;
@@ -128,6 +131,7 @@ private StreamingSubscriberConnection(Builder builder) {
128131
subscription = builder.subscription;
129132
subscriptionNameObject = SubscriptionName.parse(builder.subscription);
130133
systemExecutor = builder.systemExecutor;
134+
eodAckCallbackExecutor = builder.eodAckCallbackExecutor;
131135

132136
// We need to set the default stream ack deadline on the initial request, this will be
133137
// updated by modack requests in the message dispatcher
@@ -455,7 +459,7 @@ private void sendAckOperations(
455459
.setSubscription(subscription)
456460
.addAllAckIds(ackIdsInRequest)
457461
.build());
458-
ApiFutures.addCallback(ackFuture, callback, directExecutor());
462+
ApiFutures.addCallback(ackFuture, callback, getCallbackExecutor());
459463
pendingOperations++;
460464
}
461465
ackOperationsWaiter.incrementPendingCount(pendingOperations);
@@ -504,7 +508,7 @@ private void sendModackOperations(
504508
.addAllAckIds(ackIdsInRequest)
505509
.setAckDeadlineSeconds(modackRequestData.getDeadlineExtensionSeconds())
506510
.build());
507-
ApiFutures.addCallback(modackFuture, callback, directExecutor());
511+
ApiFutures.addCallback(modackFuture, callback, getCallbackExecutor());
508512
pendingOperations++;
509513
}
510514
}
@@ -716,6 +720,13 @@ public void run() {
716720
};
717721
}
718722

723+
private Executor getCallbackExecutor() {
724+
if (!getExactlyOnceDeliveryEnabled()) {
725+
return directExecutor();
726+
}
727+
return eodAckCallbackExecutor;
728+
}
729+
719730
/** Builder of {@link StreamingSubscriberConnection StreamingSubscriberConnections}. */
720731
public static final class Builder {
721732
private MessageReceiver receiver;
@@ -736,6 +747,7 @@ public static final class Builder {
736747
private boolean useLegacyFlowControl;
737748
private ScheduledExecutorService executor;
738749
private ScheduledExecutorService systemExecutor;
750+
private ExecutorService eodAckCallbackExecutor;
739751
private ApiClock clock;
740752

741753
private boolean enableOpenTelemetryTracing;
@@ -826,6 +838,11 @@ public Builder setSystemExecutor(ScheduledExecutorService systemExecutor) {
826838
return this;
827839
}
828840

841+
public Builder setEodAckCallbackExecutor(ExecutorService eodAckCallbackExecutor) {
842+
this.eodAckCallbackExecutor = eodAckCallbackExecutor;
843+
return this;
844+
}
845+
829846
public Builder setClock(ApiClock clock) {
830847
this.clock = clock;
831848
return this;

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.io.IOException;
5252
import java.util.ArrayList;
5353
import java.util.List;
54+
import java.util.concurrent.ExecutorService;
5455
import java.util.concurrent.Executors;
5556
import java.util.concurrent.ScheduledExecutorService;
5657
import java.util.concurrent.ThreadFactory;
@@ -150,6 +151,9 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac
150151
// An instantiation of the SystemExecutorProvider used for processing acks
151152
// and other system actions.
152153
@Nullable private final ScheduledExecutorService alarmsExecutor;
154+
// An executor used for handling ack and modack callbacks when exactly-once delivery is enabled.
155+
private final ExecutorService eodAckCallbackExecutor;
156+
153157
private final Distribution ackLatencyDistribution =
154158
new Distribution(Math.toIntExact(MAX_STREAM_ACK_DEADLINE.getSeconds()) + 1);
155159

@@ -200,6 +204,15 @@ private Subscriber(Builder builder) {
200204
backgroundResources.add(new ExecutorAsBackgroundResource((alarmsExecutor)));
201205
}
202206

207+
// A cached thread pool will create new threads as needed but can reuse previously constructed
208+
// threads when available, which helps to improve performance.
209+
ThreadFactory eodAckCallbackThreadFactory =
210+
new ThreadFactoryBuilder()
211+
.setDaemon(true)
212+
.setNameFormat("Subscriber-EOD-CallbackExecutor-%d")
213+
.build();
214+
eodAckCallbackExecutor = Executors.newCachedThreadPool(eodAckCallbackThreadFactory);
215+
203216
TransportChannelProvider channelProvider = builder.channelProvider;
204217
if (channelProvider.acceptsPoolSize()) {
205218
channelProvider = channelProvider.withPoolSize(numPullers);
@@ -416,6 +429,7 @@ private void startStreamingConnections() {
416429
.setUseLegacyFlowControl(useLegacyFlowControl)
417430
.setExecutor(executor)
418431
.setSystemExecutor(alarmsExecutor)
432+
.setEodAckCallbackExecutor(eodAckCallbackExecutor)
419433
.setClock(clock)
420434
.setEnableOpenTelemetryTracing(enableOpenTelemetryTracing)
421435
.setTracer(tracer)

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -587,6 +587,7 @@ private StreamingSubscriberConnection getStreamingSubscriberConnectionFromBuilde
587587
.setFlowController(mock(FlowController.class))
588588
.setExecutor(executor)
589589
.setSystemExecutor(systemExecutor)
590+
.setEodAckCallbackExecutor(systemExecutor)
590591
.setClock(clock)
591592
.setMinDurationPerAckExtension(Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION)
592593
.setMinDurationPerAckExtensionDefaultUsed(true)

0 commit comments

Comments
 (0)