Skip to content

Commit 8195f6f

Browse files
authored
feat: Implement SubscriberShutdownSettings (#2569)
* feat: Implement SubscriberShutdownSettings chore: Format and cleanup sample chore: Remove hardcoded version from samples pom.xml * fix: Fix typo in Subscriber class comments
1 parent 73f6c42 commit 8195f6f

File tree

11 files changed

+916
-25
lines changed

11 files changed

+916
-25
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

Lines changed: 82 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ class MessageDispatcher {
6262

6363
@InternalApi static final double PERCENTILE_FOR_ACK_DEADLINE_UPDATES = 99.9;
6464
@InternalApi static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100);
65+
@InternalApi static final long FINAL_NACK_TIMEOUT = Duration.ofSeconds(1).toMillis();
6566

6667
private final Executor executor;
6768
private final SequentialExecutorService.AutoExecutor sequentialExecutor;
@@ -108,6 +109,8 @@ class MessageDispatcher {
108109
private final SubscriptionName subscriptionNameObject;
109110
private final boolean enableOpenTelemetryTracing;
110111
private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false);
112+
private final SubscriberShutdownSettings subscriberShutdownSettings;
113+
private final AtomicBoolean nackImmediatelyShutdownInProgress = new AtomicBoolean(false);
111114

112115
/** Internal representation of a reply to a Pubsub message, to be sent back to the service. */
113116
public enum AckReply {
@@ -170,12 +173,18 @@ public void onFailure(Throwable t) {
170173
public void onSuccess(AckReply reply) {
171174
switch (reply) {
172175
case ACK:
173-
pendingAcks.add(this.ackRequestData);
174-
// Record the latency rounded to the next closest integer.
175-
ackLatencyDistribution.record(
176-
Ints.saturatedCast(
177-
(long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D)));
178-
tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "ack");
176+
if (nackImmediatelyShutdownInProgress.get() && exactlyOnceDeliveryEnabled.get()) {
177+
this.ackRequestData.setResponse(AckResponse.OTHER, true);
178+
tracer.endSubscribeProcessSpan(
179+
this.ackRequestData.getMessageWrapper(), "ack failed_with_nack_immediately");
180+
} else {
181+
pendingAcks.add(this.ackRequestData);
182+
// Record the latency rounded to the next closest integer.
183+
ackLatencyDistribution.record(
184+
Ints.saturatedCast(
185+
(long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D)));
186+
tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "ack");
187+
}
179188
break;
180189
case NACK:
181190
pendingNacks.add(this.ackRequestData);
@@ -231,6 +240,7 @@ private MessageDispatcher(Builder builder) {
231240
if (builder.tracer != null) {
232241
tracer = builder.tracer;
233242
}
243+
this.subscriberShutdownSettings = builder.subscriberShutdownSettings;
234244
}
235245

236246
private boolean shouldSetMessageFuture() {
@@ -294,8 +304,60 @@ public void run() {
294304
}
295305
}
296306

307+
private void nackAllOutstandingMessages() {
308+
nackImmediatelyShutdownInProgress.set(true);
309+
List<AckHandler> handlersToNack = new ArrayList<>(pendingMessages.values());
310+
for (AckHandler ackHandler : handlersToNack) {
311+
pendingNacks.add(ackHandler.getAckRequestData());
312+
ackHandler.forget(); // This removes from pendingMessages, releases flow control, etc.
313+
}
314+
}
315+
297316
void stop() {
298-
messagesWaiter.waitComplete();
317+
switch (subscriberShutdownSettings.getMode()) {
318+
case WAIT_FOR_PROCESSING:
319+
logger.log(
320+
Level.FINE,
321+
"WAIT_FOR_PROCESSING shutdown mode: Waiting for outstanding messages to complete processing.");
322+
java.time.Duration timeout = subscriberShutdownSettings.getTimeout();
323+
if (timeout.isNegative()) {
324+
// Indefinite wait use existing blocking wait
325+
messagesWaiter.waitComplete();
326+
} else {
327+
// Wait for (timeout - 1 second) for messages to complete
328+
long gracePeriodMillis = Math.max(0, timeout.toMillis() - FINAL_NACK_TIMEOUT);
329+
boolean completedWait = messagesWaiter.tryWait(gracePeriodMillis, clock);
330+
if (!completedWait) {
331+
logger.log(
332+
Level.WARNING,
333+
"Grace period expired for WAIT_FOR_PROCESSING shutdown. Nacking remaining messages.");
334+
// Switch to NACK_IMMEDIATELY behavior for remaining messages
335+
nackAllOutstandingMessages();
336+
}
337+
}
338+
cancelBackgroundJob();
339+
processOutstandingOperations(); // Send any remaining acks/nacks.
340+
break;
341+
342+
case NACK_IMMEDIATELY:
343+
logger.log(Level.FINE, "NACK_IMMEDIATELY shutdown mode: Nacking all outstanding messages.");
344+
// Stop extending deadlines immediately.
345+
cancelBackgroundJob();
346+
nackAllOutstandingMessages();
347+
processOutstandingOperations(); // Send all pending nacks.
348+
break;
349+
350+
default:
351+
logger.log(Level.WARNING, "Unknown shutdown mode: " + subscriberShutdownSettings.getMode());
352+
// Default to WAIT_FOR_PROCESSING behavior
353+
messagesWaiter.waitComplete();
354+
cancelBackgroundJob();
355+
processOutstandingOperations();
356+
break;
357+
}
358+
}
359+
360+
private void cancelBackgroundJob() {
299361
jobLock.lock();
300362
try {
301363
if (backgroundJob != null) {
@@ -309,7 +371,6 @@ void stop() {
309371
} finally {
310372
jobLock.unlock();
311373
}
312-
processOutstandingOperations();
313374
}
314375

315376
@InternalApi
@@ -364,6 +425,11 @@ void setMessageOrderingEnabled(boolean messageOrderingEnabled) {
364425
this.messageOrderingEnabled.set(messageOrderingEnabled);
365426
}
366427

428+
@InternalApi
429+
boolean getNackImmediatelyShutdownInProgress() {
430+
return nackImmediatelyShutdownInProgress.get();
431+
}
432+
367433
private static class OutstandingMessage {
368434
private final AckHandler ackHandler;
369435

@@ -661,7 +727,7 @@ void processOutstandingOperations() {
661727

662728
List<AckRequestData> ackRequestDataReceipts = new ArrayList<AckRequestData>();
663729
pendingReceipts.drainTo(ackRequestDataReceipts);
664-
if (!ackRequestDataReceipts.isEmpty()) {
730+
if (!ackRequestDataReceipts.isEmpty() && !getNackImmediatelyShutdownInProgress()) {
665731
ModackRequestData receiptModack =
666732
new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts);
667733
receiptModack.setIsReceiptModack(true);
@@ -705,6 +771,7 @@ public static final class Builder {
705771
private String subscriptionName;
706772
private boolean enableOpenTelemetryTracing;
707773
private OpenTelemetryPubsubTracer tracer;
774+
private SubscriberShutdownSettings subscriberShutdownSettings;
708775

709776
protected Builder(MessageReceiver receiver) {
710777
this.receiver = receiver;
@@ -791,6 +858,12 @@ public Builder setTracer(OpenTelemetryPubsubTracer tracer) {
791858
return this;
792859
}
793860

861+
public Builder setSubscriberShutdownSettings(
862+
SubscriberShutdownSettings subscriberShutdownSettings) {
863+
this.subscriberShutdownSettings = subscriberShutdownSettings;
864+
return this;
865+
}
866+
794867
public MessageDispatcher build() {
795868
return new MessageDispatcher(this);
796869
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
9898
private final String subscription;
9999
private final SubscriptionName subscriptionNameObject;
100100
private final ScheduledExecutorService systemExecutor;
101+
private final ApiClock clock;
101102
private final MessageDispatcher messageDispatcher;
102103

103104
private final FlowControlSettings flowControlSettings;
@@ -124,11 +125,13 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
124125

125126
private final boolean enableOpenTelemetryTracing;
126127
private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false);
128+
private final SubscriberShutdownSettings subscriberShutdownSettings;
127129

128130
private StreamingSubscriberConnection(Builder builder) {
129131
subscription = builder.subscription;
130132
subscriptionNameObject = SubscriptionName.parse(builder.subscription);
131133
systemExecutor = builder.systemExecutor;
134+
clock = builder.clock;
132135

133136
// We need to set the default stream ack deadline on the initial request, this will be
134137
// updated by modack requests in the message dispatcher
@@ -163,6 +166,7 @@ private StreamingSubscriberConnection(Builder builder) {
163166
if (builder.tracer != null) {
164167
tracer = builder.tracer;
165168
}
169+
this.subscriberShutdownSettings = builder.subscriberShutdownSettings;
166170

167171
messageDispatcher =
168172
messageDispatcherBuilder
@@ -181,6 +185,7 @@ private StreamingSubscriberConnection(Builder builder) {
181185
.setSubscriptionName(subscription)
182186
.setEnableOpenTelemetryTracing(enableOpenTelemetryTracing)
183187
.setTracer(tracer)
188+
.setSubscriberShutdownSettings(subscriberShutdownSettings)
184189
.build();
185190

186191
flowControlSettings = builder.flowControlSettings;
@@ -218,8 +223,21 @@ protected void doStop() {
218223
}
219224

220225
private void runShutdown() {
226+
java.time.Duration timeout = subscriberShutdownSettings.getTimeout();
227+
if (timeout.isZero()) {
228+
return;
229+
}
230+
221231
messageDispatcher.stop();
222-
ackOperationsWaiter.waitComplete();
232+
if (timeout.isNegative()) {
233+
ackOperationsWaiter.waitComplete();
234+
} else {
235+
boolean completedWait = ackOperationsWaiter.tryWait(timeout.toMillis(), clock);
236+
if (!completedWait) {
237+
logger.log(
238+
Level.WARNING, "Timeout exceeded while waiting for ACK/NACK operations to complete.");
239+
}
240+
}
223241
}
224242

225243
private class StreamingPullResponseObserver implements ResponseObserver<StreamingPullResponse> {
@@ -554,9 +572,18 @@ public void onSuccess(Empty empty) {
554572
tracer.endSubscribeRpcSpan(rpcSpan);
555573

556574
for (AckRequestData ackRequestData : ackRequestDataList) {
557-
// This will check if a response is needed, and if it has already been set
558-
ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess);
559-
messageDispatcher.notifyAckSuccess(ackRequestData);
575+
// If we are in NACK_IMMEDIATELY shutdown mode, we will set failures on acks/nack so that
576+
// an error is surfaced if the user
577+
// manually acks or nacks in their callback.
578+
if (setResponseOnSuccess
579+
&& getExactlyOnceDeliveryEnabled()
580+
&& messageDispatcher.getNackImmediatelyShutdownInProgress()) {
581+
ackRequestData.setResponse(AckResponse.OTHER, setResponseOnSuccess);
582+
messageDispatcher.notifyAckFailed(ackRequestData);
583+
} else {
584+
ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess);
585+
messageDispatcher.notifyAckSuccess(ackRequestData);
586+
}
560587
// Remove from our pending operations
561588
pendingRequests.remove(ackRequestData);
562589
tracer.addEndRpcEvent(
@@ -751,6 +778,7 @@ public static final class Builder {
751778

752779
private boolean enableOpenTelemetryTracing;
753780
private OpenTelemetryPubsubTracer tracer;
781+
private SubscriberShutdownSettings subscriberShutdownSettings;
754782

755783
protected Builder(MessageReceiver receiver) {
756784
this.receiver = receiver;
@@ -852,6 +880,12 @@ public Builder setTracer(OpenTelemetryPubsubTracer tracer) {
852880
return this;
853881
}
854882

883+
public Builder setSubscriberShutdownSettings(
884+
SubscriberShutdownSettings subscriberShutdownSettings) {
885+
this.subscriberShutdownSettings = subscriberShutdownSettings;
886+
return this;
887+
}
888+
855889
public StreamingSubscriberConnection build() {
856890
return new StreamingSubscriberConnection(this);
857891
}

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac
168168
private final boolean enableOpenTelemetryTracing;
169169
private final OpenTelemetry openTelemetry;
170170
private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false);
171+
private final SubscriberShutdownSettings subscriberShutdownSettings;
171172

172173
private Subscriber(Builder builder) {
173174
receiver = builder.receiver;
@@ -223,6 +224,7 @@ private Subscriber(Builder builder) {
223224

224225
this.enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing;
225226
this.openTelemetry = builder.openTelemetry;
227+
this.subscriberShutdownSettings = builder.subscriberShutdownSettings;
226228
if (this.openTelemetry != null && this.enableOpenTelemetryTracing) {
227229
Tracer openTelemetryTracer = builder.openTelemetry.getTracer(OPEN_TELEMETRY_TRACER_NAME);
228230
if (openTelemetryTracer != null) {
@@ -366,7 +368,6 @@ protected void doStop() {
366368
@Override
367369
public void run() {
368370
try {
369-
// stop connection is no-op if connections haven't been started.
370371
runShutdown();
371372
notifyStopped();
372373
} catch (Exception e) {
@@ -378,7 +379,13 @@ public void run() {
378379
}
379380

380381
private void runShutdown() {
381-
stopAllStreamingConnections();
382+
java.time.Duration timeout = subscriberShutdownSettings.getTimeout();
383+
long deadlineMillis = -1;
384+
if (!timeout.isNegative()) {
385+
deadlineMillis = clock.millisTime() + timeout.toMillis();
386+
}
387+
388+
stopAllStreamingConnections(deadlineMillis);
382389
shutdownBackgroundResources();
383390
subscriberStub.shutdownNow();
384391
}
@@ -420,6 +427,7 @@ private void startStreamingConnections() {
420427
.setClock(clock)
421428
.setEnableOpenTelemetryTracing(enableOpenTelemetryTracing)
422429
.setTracer(tracer)
430+
.setSubscriberShutdownSettings(subscriberShutdownSettings)
423431
.build();
424432

425433
streamingSubscriberConnections.add(streamingSubscriberConnection);
@@ -445,8 +453,8 @@ public void failed(State from, Throwable failure) {
445453
}
446454
}
447455

448-
private void stopAllStreamingConnections() {
449-
stopConnections(streamingSubscriberConnections);
456+
private void stopAllStreamingConnections(long deadlineMillis) {
457+
stopConnections(streamingSubscriberConnections, deadlineMillis);
450458
}
451459

452460
private void shutdownBackgroundResources() {
@@ -466,7 +474,7 @@ private void startConnections(
466474
}
467475
}
468476

469-
private void stopConnections(List<? extends ApiService> connections) {
477+
private void stopConnections(List<? extends ApiService> connections, long deadlineMillis) {
470478
ArrayList<ApiService> liveConnections;
471479
synchronized (connections) {
472480
liveConnections = new ArrayList<ApiService>(connections);
@@ -477,11 +485,19 @@ private void stopConnections(List<? extends ApiService> connections) {
477485
}
478486
for (ApiService subscriber : liveConnections) {
479487
try {
480-
subscriber.awaitTerminated();
481-
} catch (IllegalStateException e) {
482-
// If the service fails, awaitTerminated will throw an exception.
483-
// However, we could be stopping services because at least one
484-
// has already failed, so we just ignore this exception.
488+
if (deadlineMillis < 0) {
489+
// Wait indefinitely
490+
subscriber.awaitTerminated();
491+
} else {
492+
long remaining = deadlineMillis - clock.millisTime();
493+
if (remaining < 0) {
494+
remaining = 0;
495+
}
496+
subscriber.awaitTerminated(remaining, java.util.concurrent.TimeUnit.MILLISECONDS);
497+
}
498+
} catch (Exception e) {
499+
logger.log(Level.FINE, "Exception while waiting for a connection to terminate", e);
500+
break; // Stop waiting for other connections.
485501
}
486502
}
487503
}
@@ -532,6 +548,9 @@ public static final class Builder {
532548
private boolean enableOpenTelemetryTracing = false;
533549
private OpenTelemetry openTelemetry = null;
534550

551+
private SubscriberShutdownSettings subscriberShutdownSettings =
552+
SubscriberShutdownSettings.newBuilder().build();
553+
535554
Builder(String subscription, MessageReceiver receiver) {
536555
this.subscription = subscription;
537556
this.receiver = receiver;
@@ -772,6 +791,18 @@ public Builder setOpenTelemetry(OpenTelemetry openTelemetry) {
772791
return this;
773792
}
774793

794+
/**
795+
* Sets the shutdown settings for the subscriber. Defaults to {@link
796+
* SubscriberShutdownSettings#newBuilder() default settings}.
797+
*/
798+
@BetaApi(
799+
"The surface for SubscriberShutdownSettings is not stable yet and may be changed in the future.")
800+
public Builder setSubscriberShutdownSettings(
801+
SubscriberShutdownSettings subscriberShutdownSettings) {
802+
this.subscriberShutdownSettings = Preconditions.checkNotNull(subscriberShutdownSettings);
803+
return this;
804+
}
805+
775806
/** Returns the default FlowControlSettings used by the client if settings are not provided. */
776807
public static FlowControlSettings getDefaultFlowControlSettings() {
777808
return DEFAULT_FLOW_CONTROL_SETTINGS;

0 commit comments

Comments
 (0)