Skip to content

Commit ffeb017

Browse files
authored
fix: Use the system executor instead of a separate thread pool for EOD ack/modack callbacks (#2526)
* fix: Use the system executor instead of a separate thread pool for EOD ack/modack callbacks * fix: Add obsolete method to clirr ignored file * fix: Update clirr ignore rule
1 parent 409398a commit ffeb017

File tree

4 files changed

+10
-24
lines changed

4 files changed

+10
-24
lines changed

google-cloud-pubsub/clirr-ignored-differences.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,10 @@
1515
<method>*(org.threeten.bp.Duration)</method>
1616
<to>*(java.time.Duration)</to>
1717
</difference>
18+
<difference>
19+
<differenceType>7002</differenceType>
20+
<!--Ignore changes in this class because it's package private-->
21+
<className>com/google/cloud/pubsub/v1/StreamingSubscriberConnection$Builder</className>
22+
<method>* setEodAckCallbackExecutor(*)</method>
23+
</difference>
1824
</differences>

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@
6262
import java.util.UUID;
6363
import java.util.concurrent.ConcurrentHashMap;
6464
import java.util.concurrent.Executor;
65-
import java.util.concurrent.ExecutorService;
6665
import java.util.concurrent.ScheduledExecutorService;
6766
import java.util.concurrent.TimeUnit;
6867
import java.util.concurrent.atomic.AtomicBoolean;
@@ -99,7 +98,6 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
9998
private final String subscription;
10099
private final SubscriptionName subscriptionNameObject;
101100
private final ScheduledExecutorService systemExecutor;
102-
private final ExecutorService eodAckCallbackExecutor;
103101
private final MessageDispatcher messageDispatcher;
104102

105103
private final FlowControlSettings flowControlSettings;
@@ -131,7 +129,6 @@ private StreamingSubscriberConnection(Builder builder) {
131129
subscription = builder.subscription;
132130
subscriptionNameObject = SubscriptionName.parse(builder.subscription);
133131
systemExecutor = builder.systemExecutor;
134-
eodAckCallbackExecutor = builder.eodAckCallbackExecutor;
135132

136133
// We need to set the default stream ack deadline on the initial request, this will be
137134
// updated by modack requests in the message dispatcher
@@ -720,11 +717,14 @@ public void run() {
720717
};
721718
}
722719

720+
// If exactly-once is enabled, we hold a lock for the ack/modack response callback, so we want to
721+
// avoid using the directExecutor() which runs the callback on the invoking thread. Instead, we
722+
// want to schedule the callback to be run on a different thread.
723723
private Executor getCallbackExecutor() {
724724
if (!getExactlyOnceDeliveryEnabled()) {
725725
return directExecutor();
726726
}
727-
return eodAckCallbackExecutor;
727+
return systemExecutor;
728728
}
729729

730730
/** Builder of {@link StreamingSubscriberConnection StreamingSubscriberConnections}. */
@@ -747,7 +747,6 @@ public static final class Builder {
747747
private boolean useLegacyFlowControl;
748748
private ScheduledExecutorService executor;
749749
private ScheduledExecutorService systemExecutor;
750-
private ExecutorService eodAckCallbackExecutor;
751750
private ApiClock clock;
752751

753752
private boolean enableOpenTelemetryTracing;
@@ -838,11 +837,6 @@ public Builder setSystemExecutor(ScheduledExecutorService systemExecutor) {
838837
return this;
839838
}
840839

841-
public Builder setEodAckCallbackExecutor(ExecutorService eodAckCallbackExecutor) {
842-
this.eodAckCallbackExecutor = eodAckCallbackExecutor;
843-
return this;
844-
}
845-
846840
public Builder setClock(ApiClock clock) {
847841
this.clock = clock;
848842
return this;

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@
5151
import java.io.IOException;
5252
import java.util.ArrayList;
5353
import java.util.List;
54-
import java.util.concurrent.ExecutorService;
5554
import java.util.concurrent.Executors;
5655
import java.util.concurrent.ScheduledExecutorService;
5756
import java.util.concurrent.ThreadFactory;
@@ -151,8 +150,6 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac
151150
// An instantiation of the SystemExecutorProvider used for processing acks
152151
// and other system actions.
153152
@Nullable private final ScheduledExecutorService alarmsExecutor;
154-
// An executor used for handling ack and modack callbacks when exactly-once delivery is enabled.
155-
private final ExecutorService eodAckCallbackExecutor;
156153

157154
private final Distribution ackLatencyDistribution =
158155
new Distribution(Math.toIntExact(MAX_STREAM_ACK_DEADLINE.getSeconds()) + 1);
@@ -204,15 +201,6 @@ private Subscriber(Builder builder) {
204201
backgroundResources.add(new ExecutorAsBackgroundResource((alarmsExecutor)));
205202
}
206203

207-
// A cached thread pool will create new threads as needed but can reuse previously constructed
208-
// threads when available, which helps to improve performance.
209-
ThreadFactory eodAckCallbackThreadFactory =
210-
new ThreadFactoryBuilder()
211-
.setDaemon(true)
212-
.setNameFormat("Subscriber-EOD-CallbackExecutor-%d")
213-
.build();
214-
eodAckCallbackExecutor = Executors.newCachedThreadPool(eodAckCallbackThreadFactory);
215-
216204
TransportChannelProvider channelProvider = builder.channelProvider;
217205
if (channelProvider.acceptsPoolSize()) {
218206
channelProvider = channelProvider.withPoolSize(numPullers);
@@ -429,7 +417,6 @@ private void startStreamingConnections() {
429417
.setUseLegacyFlowControl(useLegacyFlowControl)
430418
.setExecutor(executor)
431419
.setSystemExecutor(alarmsExecutor)
432-
.setEodAckCallbackExecutor(eodAckCallbackExecutor)
433420
.setClock(clock)
434421
.setEnableOpenTelemetryTracing(enableOpenTelemetryTracing)
435422
.setTracer(tracer)

google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -587,7 +587,6 @@ private StreamingSubscriberConnection getStreamingSubscriberConnectionFromBuilde
587587
.setFlowController(mock(FlowController.class))
588588
.setExecutor(executor)
589589
.setSystemExecutor(systemExecutor)
590-
.setEodAckCallbackExecutor(systemExecutor)
591590
.setClock(clock)
592591
.setMinDurationPerAckExtension(Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION)
593592
.setMinDurationPerAckExtensionDefaultUsed(true)

0 commit comments

Comments
 (0)