From a9785b1462ffc185cab8025f3e08ab4b841e7ae7 Mon Sep 17 00:00:00 2001 From: Mike Prieto Date: Tue, 30 Sep 2025 20:48:46 +0000 Subject: [PATCH 1/2] feat: Implement SubscriberShutdownSettings chore: Format and cleanup sample chore: Remove hardcoded version from samples pom.xml --- .../cloud/pubsub/v1/MessageDispatcher.java | 91 ++++++- .../v1/StreamingSubscriberConnection.java | 42 +++- .../google/cloud/pubsub/v1/Subscriber.java | 53 +++- .../pubsub/v1/SubscriberShutdownSettings.java | 102 ++++++++ .../com/google/cloud/pubsub/v1/Waiter.java | 27 ++ .../pubsub/v1/FakeSubscriberServiceImpl.java | 23 +- .../pubsub/v1/MessageDispatcherTest.java | 232 ++++++++++++++++++ .../v1/StreamingSubscriberConnectionTest.java | 129 ++++++++++ .../v1/SubscriberShutdownSettingsTest.java | 100 ++++++++ .../cloud/pubsub/v1/SubscriberTest.java | 98 ++++++++ .../google/cloud/pubsub/v1/WaiterTest.java | 46 ++++ 11 files changed, 917 insertions(+), 26 deletions(-) create mode 100644 google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SubscriberShutdownSettings.java create mode 100644 google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberShutdownSettingsTest.java diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java index 983844a62..1a6dab2b2 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java @@ -62,6 +62,7 @@ class MessageDispatcher { @InternalApi static final double PERCENTILE_FOR_ACK_DEADLINE_UPDATES = 99.9; @InternalApi static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100); + @InternalApi static final long FINAL_NACK_TIMEOUT = Duration.ofSeconds(1).toMillis(); private final Executor executor; private final SequentialExecutorService.AutoExecutor sequentialExecutor; @@ -108,6 +109,8 @@ class MessageDispatcher { private final SubscriptionName subscriptionNameObject; private final boolean enableOpenTelemetryTracing; private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false); + private final SubscriberShutdownSettings subscriberShutdownSettings; + private final AtomicBoolean nackImmediatelyShutdownInProgress = new AtomicBoolean(false); /** Internal representation of a reply to a Pubsub message, to be sent back to the service. */ public enum AckReply { @@ -170,12 +173,18 @@ public void onFailure(Throwable t) { public void onSuccess(AckReply reply) { switch (reply) { case ACK: - pendingAcks.add(this.ackRequestData); - // Record the latency rounded to the next closest integer. - ackLatencyDistribution.record( - Ints.saturatedCast( - (long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D))); - tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "ack"); + if (nackImmediatelyShutdownInProgress.get() && exactlyOnceDeliveryEnabled.get()) { + this.ackRequestData.setResponse(AckResponse.OTHER, true); + tracer.endSubscribeProcessSpan( + this.ackRequestData.getMessageWrapper(), "ack failed_with_nack_immediately"); + } else { + pendingAcks.add(this.ackRequestData); + // Record the latency rounded to the next closest integer. + ackLatencyDistribution.record( + Ints.saturatedCast( + (long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D))); + tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "ack"); + } break; case NACK: pendingNacks.add(this.ackRequestData); @@ -231,6 +240,7 @@ private MessageDispatcher(Builder builder) { if (builder.tracer != null) { tracer = builder.tracer; } + this.subscriberShutdownSettings = builder.subscriberShutdownSettings; } private boolean shouldSetMessageFuture() { @@ -294,8 +304,60 @@ public void run() { } } + private void nackAllOutstandingMessages() { + nackImmediatelyShutdownInProgress.set(true); + List handlersToNack = new ArrayList<>(pendingMessages.values()); + for (AckHandler ackHandler : handlersToNack) { + pendingNacks.add(ackHandler.getAckRequestData()); + ackHandler.forget(); // This removes from pendingMessages, releases flow control, etc. + } + } + void stop() { - messagesWaiter.waitComplete(); + switch (subscriberShutdownSettings.getMode()) { + case WAIT_FOR_PROCESSING: + logger.log( + Level.FINE, + "WAIT_FOR_PROCESSING shutdown mode: Waiting for outstanding messages to complete processing."); + java.time.Duration timeout = subscriberShutdownSettings.getTimeout(); + if (timeout.isNegative()) { + // Indefinite wait use existing blocking wait + messagesWaiter.waitComplete(); + } else { + // Wait for (timeout - 1 second) for messages to complete + long gracePeriodMillis = Math.max(0, timeout.toMillis() - FINAL_NACK_TIMEOUT); + boolean completedWait = messagesWaiter.tryWait(gracePeriodMillis, clock); + if (!completedWait) { + logger.log( + Level.WARNING, + "Grace period expired for WAIT_FOR_PROCESSING shutdown. Nacking remaining messages."); + // Switch to NACK_IMMEDIATELY behavior for remaining messages + nackAllOutstandingMessages(); + } + } + cancelBackgroundJob(); + processOutstandingOperations(); // Send any remaining acks/nacks. + break; + + case NACK_IMMEDIATELY: + logger.log(Level.FINE, "NACK_IMMEDIATELY shutdown mode: Nacking all outstanding messages."); + // Stop extending deadlines immediately. + cancelBackgroundJob(); + nackAllOutstandingMessages(); + processOutstandingOperations(); // Send all pending nacks. + break; + + default: + logger.log(Level.WARNING, "Unknown shutdown mode: " + subscriberShutdownSettings.getMode()); + // Default to WAIT_FOR_PROCESSING behavior + messagesWaiter.waitComplete(); + cancelBackgroundJob(); + processOutstandingOperations(); + break; + } + } + + private void cancelBackgroundJob() { jobLock.lock(); try { if (backgroundJob != null) { @@ -309,7 +371,6 @@ void stop() { } finally { jobLock.unlock(); } - processOutstandingOperations(); } @InternalApi @@ -364,6 +425,11 @@ void setMessageOrderingEnabled(boolean messageOrderingEnabled) { this.messageOrderingEnabled.set(messageOrderingEnabled); } + @InternalApi + boolean getNackImmediatelyShutdownInProgress() { + return nackImmediatelyShutdownInProgress.get(); + } + private static class OutstandingMessage { private final AckHandler ackHandler; @@ -661,7 +727,7 @@ void processOutstandingOperations() { List ackRequestDataReceipts = new ArrayList(); pendingReceipts.drainTo(ackRequestDataReceipts); - if (!ackRequestDataReceipts.isEmpty()) { + if (!ackRequestDataReceipts.isEmpty() && !getNackImmediatelyShutdownInProgress()) { ModackRequestData receiptModack = new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts); receiptModack.setIsReceiptModack(true); @@ -705,6 +771,7 @@ public static final class Builder { private String subscriptionName; private boolean enableOpenTelemetryTracing; private OpenTelemetryPubsubTracer tracer; + private SubscriberShutdownSettings subscriberShutdownSettings; protected Builder(MessageReceiver receiver) { this.receiver = receiver; @@ -791,6 +858,12 @@ public Builder setTracer(OpenTelemetryPubsubTracer tracer) { return this; } + public Builder setSubscriberShutdownSettings( + SubscriberShutdownSettings subscriberShutdownSettings) { + this.subscriberShutdownSettings = subscriberShutdownSettings; + return this; + } + public MessageDispatcher build() { return new MessageDispatcher(this); } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java index baf96f385..319dd31f5 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnection.java @@ -98,6 +98,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private final String subscription; private final SubscriptionName subscriptionNameObject; private final ScheduledExecutorService systemExecutor; + private final ApiClock clock; private final MessageDispatcher messageDispatcher; private final FlowControlSettings flowControlSettings; @@ -124,11 +125,13 @@ final class StreamingSubscriberConnection extends AbstractApiService implements private final boolean enableOpenTelemetryTracing; private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false); + private final SubscriberShutdownSettings subscriberShutdownSettings; private StreamingSubscriberConnection(Builder builder) { subscription = builder.subscription; subscriptionNameObject = SubscriptionName.parse(builder.subscription); systemExecutor = builder.systemExecutor; + clock = builder.clock; // We need to set the default stream ack deadline on the initial request, this will be // updated by modack requests in the message dispatcher @@ -163,6 +166,7 @@ private StreamingSubscriberConnection(Builder builder) { if (builder.tracer != null) { tracer = builder.tracer; } + this.subscriberShutdownSettings = builder.subscriberShutdownSettings; messageDispatcher = messageDispatcherBuilder @@ -181,6 +185,7 @@ private StreamingSubscriberConnection(Builder builder) { .setSubscriptionName(subscription) .setEnableOpenTelemetryTracing(enableOpenTelemetryTracing) .setTracer(tracer) + .setSubscriberShutdownSettings(subscriberShutdownSettings) .build(); flowControlSettings = builder.flowControlSettings; @@ -218,8 +223,21 @@ protected void doStop() { } private void runShutdown() { + java.time.Duration timeout = subscriberShutdownSettings.getTimeout(); + if (timeout.isZero()) { + return; + } + messageDispatcher.stop(); - ackOperationsWaiter.waitComplete(); + if (timeout.isNegative()) { + ackOperationsWaiter.waitComplete(); + } else { + boolean completedWait = ackOperationsWaiter.tryWait(timeout.toMillis(), clock); + if (!completedWait) { + logger.log( + Level.WARNING, "Timeout exceeded while waiting for ACK/NACK operations to complete."); + } + } } private class StreamingPullResponseObserver implements ResponseObserver { @@ -554,9 +572,18 @@ public void onSuccess(Empty empty) { tracer.endSubscribeRpcSpan(rpcSpan); for (AckRequestData ackRequestData : ackRequestDataList) { - // This will check if a response is needed, and if it has already been set - ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess); - messageDispatcher.notifyAckSuccess(ackRequestData); + // If we are in NACK_IMMEDIATELY shutdown mode, we will set failures on acks/nack so that + // an error is surfaced if the user + // manually acks or nacks in their callback. + if (setResponseOnSuccess + && getExactlyOnceDeliveryEnabled() + && messageDispatcher.getNackImmediatelyShutdownInProgress()) { + ackRequestData.setResponse(AckResponse.OTHER, setResponseOnSuccess); + messageDispatcher.notifyAckFailed(ackRequestData); + } else { + ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess); + messageDispatcher.notifyAckSuccess(ackRequestData); + } // Remove from our pending operations pendingRequests.remove(ackRequestData); tracer.addEndRpcEvent( @@ -751,6 +778,7 @@ public static final class Builder { private boolean enableOpenTelemetryTracing; private OpenTelemetryPubsubTracer tracer; + private SubscriberShutdownSettings subscriberShutdownSettings; protected Builder(MessageReceiver receiver) { this.receiver = receiver; @@ -852,6 +880,12 @@ public Builder setTracer(OpenTelemetryPubsubTracer tracer) { return this; } + public Builder setSubscriberShutdownSettings( + SubscriberShutdownSettings subscriberShutdownSettings) { + this.subscriberShutdownSettings = subscriberShutdownSettings; + return this; + } + public StreamingSubscriberConnection build() { return new StreamingSubscriberConnection(this); } diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index b149bed37..580cc0632 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -168,6 +168,7 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac private final boolean enableOpenTelemetryTracing; private final OpenTelemetry openTelemetry; private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false); + private final SubscriberShutdownSettings subscriberShutdownSettings; private Subscriber(Builder builder) { receiver = builder.receiver; @@ -223,6 +224,7 @@ private Subscriber(Builder builder) { this.enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing; this.openTelemetry = builder.openTelemetry; + this.subscriberShutdownSettings = builder.subscriberShutdownSettings; if (this.openTelemetry != null && this.enableOpenTelemetryTracing) { Tracer openTelemetryTracer = builder.openTelemetry.getTracer(OPEN_TELEMETRY_TRACER_NAME); if (openTelemetryTracer != null) { @@ -366,7 +368,6 @@ protected void doStop() { @Override public void run() { try { - // stop connection is no-op if connections haven't been started. runShutdown(); notifyStopped(); } catch (Exception e) { @@ -378,7 +379,13 @@ public void run() { } private void runShutdown() { - stopAllStreamingConnections(); + java.time.Duration timeout = subscriberShutdownSettings.getTimeout(); + long deadlineMillis = -1; + if (!timeout.isNegative()) { + deadlineMillis = clock.millisTime() + timeout.toMillis(); + } + + stopAllStreamingConnections(deadlineMillis); shutdownBackgroundResources(); subscriberStub.shutdownNow(); } @@ -420,6 +427,7 @@ private void startStreamingConnections() { .setClock(clock) .setEnableOpenTelemetryTracing(enableOpenTelemetryTracing) .setTracer(tracer) + .setSubscriberShutdownSettings(subscriberShutdownSettings) .build(); streamingSubscriberConnections.add(streamingSubscriberConnection); @@ -445,8 +453,8 @@ public void failed(State from, Throwable failure) { } } - private void stopAllStreamingConnections() { - stopConnections(streamingSubscriberConnections); + private void stopAllStreamingConnections(long deadlineMillis) { + stopConnections(streamingSubscriberConnections, deadlineMillis); } private void shutdownBackgroundResources() { @@ -466,7 +474,7 @@ private void startConnections( } } - private void stopConnections(List connections) { + private void stopConnections(List connections, long deadlineMillis) { ArrayList liveConnections; synchronized (connections) { liveConnections = new ArrayList(connections); @@ -477,11 +485,19 @@ private void stopConnections(List connections) { } for (ApiService subscriber : liveConnections) { try { - subscriber.awaitTerminated(); - } catch (IllegalStateException e) { - // If the service fails, awaitTerminated will throw an exception. - // However, we could be stopping services because at least one - // has already failed, so we just ignore this exception. + if (deadlineMillis < 0) { + // Wait indefinitely + subscriber.awaitTerminated(); + } else { + long remaining = deadlineMillis - clock.millisTime(); + if (remaining < 0) { + remaining = 0; + } + subscriber.awaitTerminated(remaining, java.util.concurrent.TimeUnit.MILLISECONDS); + } + } catch (Exception e) { + logger.log(Level.FINE, "Exception while waiting for a connection to terminate", e); + break; // Stop waiting for other connections. } } } @@ -532,6 +548,9 @@ public static final class Builder { private boolean enableOpenTelemetryTracing = false; private OpenTelemetry openTelemetry = null; + private SubscriberShutdownSettings subscriberShutdownSettings = + SubscriberShutdownSettings.newBuilder().build(); + Builder(String subscription, MessageReceiver receiver) { this.subscription = subscription; this.receiver = receiver; @@ -546,7 +565,7 @@ public static final class Builder { * {@code ChannelProvider} to use to create Channels, which must point at Cloud Pub/Sub * endpoint. * - *

For performance, this client benefits from having multiple channels open at once. Users + *

For performance, this client bene/fits from having multiple channels open at once. Users * are encouraged to provide instances of {@code ChannelProvider} that creates new channels * instead of returning pre-initialized ones. */ @@ -772,6 +791,18 @@ public Builder setOpenTelemetry(OpenTelemetry openTelemetry) { return this; } + /** + * Sets the shutdown settings for the subscriber. Defaults to {@link + * SubscriberShutdownSettings#newBuilder() default settings}. + */ + @BetaApi( + "The surface for SubscriberShutdownSettings is not stable yet and may be changed in the future.") + public Builder setSubscriberShutdownSettings( + SubscriberShutdownSettings subscriberShutdownSettings) { + this.subscriberShutdownSettings = Preconditions.checkNotNull(subscriberShutdownSettings); + return this; + } + /** Returns the default FlowControlSettings used by the client if settings are not provided. */ public static FlowControlSettings getDefaultFlowControlSettings() { return DEFAULT_FLOW_CONTROL_SETTINGS; diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SubscriberShutdownSettings.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SubscriberShutdownSettings.java new file mode 100644 index 000000000..efd8e10db --- /dev/null +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/SubscriberShutdownSettings.java @@ -0,0 +1,102 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.v1; + +import com.google.common.base.Preconditions; +import java.time.Duration; + +/** + * Settings for configuring the shutdown behavior of a {@link Subscriber}. + * + *

This class allows customization of how the subscriber handles outstanding messages during + * shutdown, including whether to wait for processing to complete or to immediately nack messages, + * and an optional timeout for the shutdown process. + */ +public final class SubscriberShutdownSettings { + + /** Defines the behavior for handling outstanding messages during subscriber shutdown. */ + public enum ShutdownMode { + /** + * The subscriber will wait for all outstanding messages to be processed (acked or nacked by the + * user's message receiver) before completing the shutdown. + */ + WAIT_FOR_PROCESSING, + /** + * The subscriber will immediately nack all outstanding messages and attempt to shut down as + * quickly as possible. Messages delivered to the user callback but not yet acked/nacked will + * also be nacked. + */ + NACK_IMMEDIATELY + } + + private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(-1); // Indicates no timeout + private static final ShutdownMode DEFAULT_MODE = ShutdownMode.WAIT_FOR_PROCESSING; + + private final ShutdownMode mode; + private final Duration timeout; + + private SubscriberShutdownSettings(Builder builder) { + this.mode = builder.mode; + this.timeout = builder.timeout; + } + + /** Returns the configured shutdown mode. */ + public ShutdownMode getMode() { + return mode; + } + + /** Returns the configured shutdown timeout. A negative duration indicates no timeout. */ + public Duration getTimeout() { + return timeout; + } + + /** Returns a new builder for {@code SubscriberShutdownSettings}. */ + public static Builder newBuilder() { + return new Builder(); + } + + /** Builder for {@code SubscriberShutdownSettings}. */ + public static final class Builder { + private ShutdownMode mode = DEFAULT_MODE; + private Duration timeout = DEFAULT_TIMEOUT; + + private Builder() {} + + /** Sets the shutdown mode. Defaults to {@link ShutdownMode#WAIT_FOR_PROCESSING}. */ + public Builder setMode(ShutdownMode mode) { + this.mode = Preconditions.checkNotNull(mode); + return this; + } + + /** + * Sets the shutdown timeout. Defaults to a negative duration, indicating no timeout. + * + *

A positive duration specifies the maximum time to wait for shutdown to complete. A + * duration of zero indicates an immediate, forceful shutdown. A negative duration indicates an + * indefinite wait. + */ + public Builder setTimeout(Duration timeout) { + this.timeout = Preconditions.checkNotNull(timeout); + return this; + } + + /** Builds an instance of {@code SubscriberShutdownSettings}. */ + public SubscriberShutdownSettings build() { + return new SubscriberShutdownSettings(this); + } + } +} diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Waiter.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Waiter.java index e22125fee..7221d5144 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Waiter.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Waiter.java @@ -16,6 +16,7 @@ package com.google.cloud.pubsub.v1; +import com.google.api.core.ApiClock; import com.google.api.core.InternalApi; /** @@ -54,6 +55,32 @@ public synchronized void waitComplete() { } } + public synchronized boolean tryWait(long timeoutMilliseconds, ApiClock clock) { + long startTime = clock.millisTime(); + long remainingMilliseconds = timeoutMilliseconds; + boolean interrupted = false; + boolean completedWait = true; + try { + while (pendingCount > 0) { + if (remainingMilliseconds <= 0) { + completedWait = false; + break; + } + try { + wait(remainingMilliseconds); + } catch (InterruptedException e) { + interrupted = true; + } + remainingMilliseconds = timeoutMilliseconds - (clock.millisTime() - startTime); + } + } finally { + if (interrupted) { + Thread.currentThread().interrupt(); + } + } + return completedWait; + } + @InternalApi public int pendingCount() { return pendingCount; diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeSubscriberServiceImpl.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeSubscriberServiceImpl.java index 173248041..3b2bd2f5d 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeSubscriberServiceImpl.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/FakeSubscriberServiceImpl.java @@ -24,6 +24,7 @@ import com.google.pubsub.v1.PublisherGrpc.PublisherImplBase; import com.google.pubsub.v1.PullRequest; import com.google.pubsub.v1.PullResponse; +import com.google.pubsub.v1.ReceivedMessage; import com.google.pubsub.v1.StreamingPullRequest; import com.google.pubsub.v1.StreamingPullResponse; import com.google.pubsub.v1.SubscriberGrpc.SubscriberImplBase; @@ -247,8 +248,26 @@ public void modifyAckDeadline( responseObserver.onCompleted(); } + public void sendMessages(int numMessages) throws InterruptedException { + waitForRegisteredSubscription(); + synchronized (openedStreams) { + waitForOpenedStreams(1); + Stream stream = openedStreams.get(getAndAdvanceCurrentStream()); + StreamingPullResponse.Builder response = StreamingPullResponse.newBuilder(); + for (int i = 0; i < numMessages; i++) { + response.addReceivedMessages( + ReceivedMessage.newBuilder() + .setAckId("ackid" + i) + .setMessage( + com.google.pubsub.v1.PubsubMessage.newBuilder().setMessageId("id" + i).build()) + .build()); + } + stream.responseObserver.onNext(response.build()); + } + } + public void sendError(Throwable error) throws InterruptedException { - waitForRegistedSubscription(); + waitForRegisteredSubscription(); synchronized (openedStreams) { waitForOpenedStreams(1); Stream stream = openedStreams.get(getAndAdvanceCurrentStream()); @@ -257,7 +276,7 @@ public void sendError(Throwable error) throws InterruptedException { } } - public String waitForRegistedSubscription() throws InterruptedException { + public String waitForRegisteredSubscription() throws InterruptedException { synchronized (subscriptionInitialized) { while (!subscriptionInitialized.get()) { subscriptionInitialized.wait(); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java index 1de156939..1285fadd5 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/MessageDispatcherTest.java @@ -18,6 +18,9 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; import com.google.api.gax.batching.FlowController; @@ -713,9 +716,238 @@ private MessageDispatcher getMessageDispatcherFromBuilder( .setSubscriptionName(MOCK_SUBSCRIPTION_NAME) .setSystemExecutor(systemExecutor) .setApiClock(clock) + .setSubscriberShutdownSettings(SubscriberShutdownSettings.newBuilder().build()) .build(); messageDispatcher.setMessageDeadlineSeconds(MIN_ACK_DEADLINE_SECONDS); return messageDispatcher; } + + private MessageDispatcher getMessageDispatcherFromBuilder( + MessageDispatcher.Builder builder, SubscriberShutdownSettings shutdownSettings) { + MessageDispatcher messageDispatcher = + builder + .setAckProcessor(mockAckProcessor) + .setAckExpirationPadding(ACK_EXPIRATION_PADDING_DEFAULT) + .setMaxAckExtensionPeriod(MAX_ACK_EXTENSION_PERIOD) + .setMinDurationPerAckExtension(Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION) + .setMinDurationPerAckExtensionDefaultUsed(true) + .setMaxDurationPerAckExtension(Subscriber.DEFAULT_MAX_ACK_DEADLINE_EXTENSION) + .setMaxDurationPerAckExtensionDefaultUsed(true) + .setAckLatencyDistribution(mock(Distribution.class)) + .setFlowController(mock(FlowController.class)) + .setExecutor(MoreExecutors.newDirectExecutorService()) + .setSubscriptionName(MOCK_SUBSCRIPTION_NAME) + .setSystemExecutor(systemExecutor) + .setApiClock(clock) + .setSubscriberShutdownSettings(shutdownSettings) + .build(); + + messageDispatcher.setMessageDeadlineSeconds(MIN_ACK_DEADLINE_SECONDS); + return messageDispatcher; + } + + @Test + public void testStop_waitForProcessing_indefinite() throws Exception { + SubscriberShutdownSettings shutdownSettings = + SubscriberShutdownSettings.newBuilder() + .setMode(SubscriberShutdownSettings.ShutdownMode.WAIT_FOR_PROCESSING) + .setTimeout(Duration.ofSeconds(-1)) + .build(); + MessageDispatcher dispatcher = + getMessageDispatcherFromBuilder( + MessageDispatcher.newBuilder(messageReceiver), shutdownSettings); + + dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); + + Thread stopThread = new Thread(dispatcher::stop); + stopThread.start(); + + // Wait for the stop thread to block on the waiter. + Thread.sleep(100); + assertTrue(stopThread.isAlive()); + + // Ack the message, which should allow the stop thread to complete. + consumers.take().ack(); + + List ackRequestDataList = new ArrayList(); + AckRequestData ackRequestData = AckRequestData.newBuilder(TEST_MESSAGE.getAckId()).build(); + ackRequestDataList.add(ackRequestData); + + stopThread.join(); + assertFalse(stopThread.isAlive()); + + verify(mockAckProcessor, times(1)) + .sendAckOperations( + argThat(new CustomArgumentMatchers.AckRequestDataListMatcher(ackRequestDataList))); + } + + @Test + public void testStop_waitForProcessing_withTimeout_success() { + SubscriberShutdownSettings shutdownSettings = + SubscriberShutdownSettings.newBuilder() + .setMode(SubscriberShutdownSettings.ShutdownMode.WAIT_FOR_PROCESSING) + .setTimeout(Duration.ofSeconds(5)) + .build(); + MessageDispatcher dispatcher = + getMessageDispatcherFromBuilder( + MessageDispatcher.newBuilder(messageReceiver), shutdownSettings); + + dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); + + Thread stopThread = new Thread(dispatcher::stop); + stopThread.start(); + + // Ack the message before the timeout expires. + try { + consumers.take().ack(); + } catch (InterruptedException e) { + fail("Interrupted while taking consumer"); + } + + try { + stopThread.join(1000); + } catch (InterruptedException e) { + fail("Interrupted while joining stop thread"); + } + + List ackRequestDataList = new ArrayList(); + AckRequestData ackRequestData = AckRequestData.newBuilder(TEST_MESSAGE.getAckId()).build(); + ackRequestDataList.add(ackRequestData); + + verify(mockAckProcessor, times(1)) + .sendAckOperations( + argThat(new CustomArgumentMatchers.AckRequestDataListMatcher(ackRequestDataList))); + assertFalse(stopThread.isAlive()); + } + + @Test + public void testStop_waitForProcessing_withTimeout_nackWithOneSecondLeft() { + SubscriberShutdownSettings shutdownSettings = + SubscriberShutdownSettings.newBuilder() + .setMode(SubscriberShutdownSettings.ShutdownMode.WAIT_FOR_PROCESSING) + .setTimeout(Duration.ofSeconds(2)) + .build(); + MessageDispatcher dispatcher = + getMessageDispatcherFromBuilder( + MessageDispatcher.newBuilder(messageReceiver), shutdownSettings); + + dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); + Thread stopThread = new Thread(dispatcher::stop); + stopThread.start(); + + // Wait for the stop thread to block on the waiter. + try { + Thread.sleep(100); + } catch (InterruptedException e) { + fail("Interrupted while sleeping"); + } + + clock.advance(1500, TimeUnit.MILLISECONDS); + + try { + stopThread.join(); + } catch (InterruptedException e) { + fail("Interrupted while joining stop thread"); + } + // Assert expected behavior + List modackRequestDataList = new ArrayList(); + modackRequestDataList.add( + new ModackRequestData(0, AckRequestData.newBuilder(TEST_MESSAGE.getAckId()).build())); + + verify(mockAckProcessor, times(1)) + .sendModackOperations( + argThat( + new CustomArgumentMatchers.ModackRequestDataListMatcher(modackRequestDataList))); + verify(mockAckProcessor, times(1)) + .sendAckOperations( + argThat(new CustomArgumentMatchers.AckRequestDataListMatcher(Collections.emptyList()))); + } + + @Test + public void testStop_nackImmediately() { + SubscriberShutdownSettings shutdownSettings = + SubscriberShutdownSettings.newBuilder() + .setMode(SubscriberShutdownSettings.ShutdownMode.NACK_IMMEDIATELY) + .build(); + MessageDispatcher dispatcher = + getMessageDispatcherFromBuilder( + MessageDispatcher.newBuilder(messageReceiver), shutdownSettings); + + dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); + dispatcher.stop(); + + // Assert expected behavior + List modackRequestDataList = new ArrayList(); + modackRequestDataList.add( + new ModackRequestData(0, AckRequestData.newBuilder(TEST_MESSAGE.getAckId()).build())); + + verify(mockAckProcessor, times(1)) + .sendModackOperations( + argThat( + new CustomArgumentMatchers.ModackRequestDataListMatcher(modackRequestDataList))); + verify(mockAckProcessor, times(1)) + .sendAckOperations( + argThat(new CustomArgumentMatchers.AckRequestDataListMatcher(Collections.emptyList()))); + } + + @Test + public void testAckDuringNackImmediatelyShutdown() throws Exception { + SubscriberShutdownSettings shutdownSettings = + SubscriberShutdownSettings.newBuilder() + .setMode(SubscriberShutdownSettings.ShutdownMode.NACK_IMMEDIATELY) + .build(); + MessageDispatcher dispatcher = + getMessageDispatcherFromBuilder( + MessageDispatcher.newBuilder(messageReceiverWithAckResponse), shutdownSettings); + dispatcher.setExactlyOnceDeliveryEnabled(true); + + dispatcher.processReceivedMessages(Collections.singletonList(TEST_MESSAGE)); + dispatcher.processOutstandingOperations(); + + List receiptModackRequestDataList = new ArrayList(); + AckRequestData receiptModackRequestData = + AckRequestData.newBuilder(TEST_MESSAGE.getAckId()).build(); + receiptModackRequestDataList.add( + new ModackRequestData(MIN_ACK_DEADLINE_SECONDS, receiptModackRequestData)); + dispatcher.notifyAckSuccess(receiptModackRequestData); + + verify(mockAckProcessor, times(1)) + .sendModackOperations( + argThat( + new CustomArgumentMatchers.ModackRequestDataListMatcher( + receiptModackRequestDataList))); + + Thread stopThread = + new Thread( + () -> { + dispatcher.stop(); + }); + stopThread.start(); + + // Wait for the stop process to start. + while (!dispatcher.getNackImmediatelyShutdownInProgress()) { + Thread.sleep(1); + } + + // Try to ack the message. + AckResponse reply = consumersWithResponse.take().ack().get(); + assertThat(reply.equals(AckResponse.OTHER)); + + stopThread.join(); + + // Assert expected behavior + List modackRequestDataList = new ArrayList(); + modackRequestDataList.add( + new ModackRequestData(0, AckRequestData.newBuilder(TEST_MESSAGE.getAckId()).build())); + + // The message should have been nacked, not acked. + verify(mockAckProcessor, times(1)) + .sendModackOperations( + argThat( + new CustomArgumentMatchers.ModackRequestDataListMatcher(modackRequestDataList))); + verify(mockAckProcessor, times(2)) + .sendAckOperations( + argThat(new CustomArgumentMatchers.AckRequestDataListMatcher(Collections.emptyList()))); + } } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java index 335ccbdc3..f79825d85 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/StreamingSubscriberConnectionTest.java @@ -17,6 +17,7 @@ package com.google.cloud.pubsub.v1; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.*; @@ -41,6 +42,8 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -106,6 +109,110 @@ public void testSetupAndTeardown() { streamingSubscriberConnection.awaitTerminated(); } + @Test + public void testRunShutdown_TimeoutMet() throws Exception { + SubscriberShutdownSettings shutdownSettings = + SubscriberShutdownSettings.newBuilder().setTimeout(Duration.ofSeconds(10)).build(); + StreamingSubscriberConnection.Builder builder = + StreamingSubscriberConnection.newBuilder(mock(MessageReceiverWithAckResponse.class)); + builder.setSubscriberShutdownSettings(shutdownSettings); + StreamingSubscriberConnection streamingSubscriberConnection = + getStreamingSubscriberConnectionFromBuilder(builder); + + streamingSubscriberConnection.startAsync().awaitRunning(); + streamingSubscriberConnection.stopAsync(); + + // Should terminate quickly as there are no outstanding messages. + streamingSubscriberConnection.awaitTerminated(1, TimeUnit.SECONDS); + } + + @Test + public void testRunShutdown_TimeoutExceeded() throws Exception { + final SettableApiFuture ackFuture = SettableApiFuture.create(); + when(mockSubscriberStub.acknowledgeCallable().futureCall(any(AcknowledgeRequest.class))) + .thenReturn(ackFuture); + + SubscriberShutdownSettings shutdownSettings = + SubscriberShutdownSettings.newBuilder().setTimeout(Duration.ofSeconds(2)).build(); + StreamingSubscriberConnection.Builder builder = + StreamingSubscriberConnection.newBuilder(mock(MessageReceiverWithAckResponse.class)); + StreamingSubscriberConnection streamingSubscriberConnection = + getStreamingSubscriberConnectionFromBuilder(builder, shutdownSettings); + streamingSubscriberConnection.setExactlyOnceDeliveryEnabled(true); + + streamingSubscriberConnection.startAsync().awaitRunning(); + + // Send an ACK that will not complete. + SettableApiFuture messageFuture = SettableApiFuture.create(); + streamingSubscriberConnection.sendAckOperations( + Collections.singletonList( + AckRequestData.newBuilder("ack1").setMessageFuture(messageFuture).build())); + + Thread t = + new Thread( + () -> { + streamingSubscriberConnection.stopAsync(); + }); + t.start(); + + Thread t2 = + new Thread( + () -> { + try { + streamingSubscriberConnection.awaitTerminated(1, TimeUnit.SECONDS); + fail("Should have timed out"); + } catch (TimeoutException e) { + // expected + } + }); + t2.start(); + t2.join(); + + // Advance the clock past the shutdown timeout. + clock.advance(3, TimeUnit.SECONDS); + t.join(); + + // Now it should terminate. + streamingSubscriberConnection.awaitTerminated(); + assertFalse(streamingSubscriberConnection.isRunning()); + assertFalse(messageFuture.isDone()); + } + + @Test + public void testAckDuringNackImmediatelyShutdown() throws Exception { + SubscriberShutdownSettings shutdownSettings = + SubscriberShutdownSettings.newBuilder() + .setMode(SubscriberShutdownSettings.ShutdownMode.NACK_IMMEDIATELY) + .build(); + + MessageDispatcher mockMessageDispatcher = mock(MessageDispatcher.class); + when(mockMessageDispatcher.getNackImmediatelyShutdownInProgress()).thenReturn(true); + + StreamingSubscriberConnection.Builder builder = + StreamingSubscriberConnection.newBuilder(mock(MessageReceiverWithAckResponse.class)); + StreamingSubscriberConnection streamingSubscriberConnection = + getStreamingSubscriberConnectionFromBuilder(builder, shutdownSettings); + + // Use reflection to set the mock message dispatcher + java.lang.reflect.Field dispatcherField = + StreamingSubscriberConnection.class.getDeclaredField("messageDispatcher"); + dispatcherField.setAccessible(true); + dispatcherField.set(streamingSubscriberConnection, mockMessageDispatcher); + + streamingSubscriberConnection.setExactlyOnceDeliveryEnabled(true); + + SettableApiFuture messageFuture = SettableApiFuture.create(); + AckRequestData ackRequestData = + AckRequestData.newBuilder("ack1").setMessageFuture(messageFuture).build(); + + when(mockSubscriberStub.acknowledgeCallable().futureCall(any())) + .thenReturn(ApiFutures.immediateFuture(null)); + streamingSubscriberConnection.sendAckOperations(Collections.singletonList(ackRequestData)); + + verify(mockMessageDispatcher, times(1)).notifyAckFailed(ackRequestData); + assertEquals(AckResponse.OTHER, messageFuture.get()); + } + @Test public void testSendAckOperationsExactlyOnceDisabledNoMessageFutures() { // Setup mocks @@ -592,6 +699,28 @@ private StreamingSubscriberConnection getStreamingSubscriberConnectionFromBuilde .setMinDurationPerAckExtensionDefaultUsed(true) .setMaxDurationPerAckExtension(Subscriber.DEFAULT_MAX_ACK_DEADLINE_EXTENSION) .setMaxDurationPerAckExtensionDefaultUsed(true) + .setSubscriberShutdownSettings(SubscriberShutdownSettings.newBuilder().build()) + .build(); + } + + private StreamingSubscriberConnection getStreamingSubscriberConnectionFromBuilder( + StreamingSubscriberConnection.Builder builder, SubscriberShutdownSettings shutdownSettings) { + return builder + .setSubscription(MOCK_SUBSCRIPTION_NAME) + .setAckExpirationPadding(ACK_EXPIRATION_PADDING_DEFAULT_DURATION) + .setAckLatencyDistribution(mock(Distribution.class)) + .setSubscriberStub(mockSubscriberStub) + .setChannelAffinity(0) + .setFlowControlSettings(mock(FlowControlSettings.class)) + .setFlowController(mock(FlowController.class)) + .setExecutor(executor) + .setSystemExecutor(systemExecutor) + .setClock(clock) + .setMinDurationPerAckExtension(Subscriber.DEFAULT_MIN_ACK_DEADLINE_EXTENSION) + .setMinDurationPerAckExtensionDefaultUsed(true) + .setMaxDurationPerAckExtension(Subscriber.DEFAULT_MAX_ACK_DEADLINE_EXTENSION) + .setMaxDurationPerAckExtensionDefaultUsed(true) + .setSubscriberShutdownSettings(shutdownSettings) .build(); } diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberShutdownSettingsTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberShutdownSettingsTest.java new file mode 100644 index 000000000..f82937582 --- /dev/null +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberShutdownSettingsTest.java @@ -0,0 +1,100 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsub.v1; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import com.google.cloud.pubsub.v1.SubscriberShutdownSettings.ShutdownMode; +import java.time.Duration; +import org.junit.Test; + +public class SubscriberShutdownSettingsTest { + + @Test + public void testDefaultSettings() { + SubscriberShutdownSettings settings = SubscriberShutdownSettings.newBuilder().build(); + + assertNotNull(settings); + assertEquals(ShutdownMode.WAIT_FOR_PROCESSING, settings.getMode()); + assertTrue(settings.getTimeout().isNegative()); // Indefinite timeout + } + + @Test + public void testWaitForProcessingWithCustomTimeout() { + Duration customTimeout = Duration.ofSeconds(30); + SubscriberShutdownSettings settings = + SubscriberShutdownSettings.newBuilder() + .setMode(ShutdownMode.WAIT_FOR_PROCESSING) + .setTimeout(customTimeout) + .build(); + + assertNotNull(settings); + assertEquals(ShutdownMode.WAIT_FOR_PROCESSING, settings.getMode()); + assertEquals(customTimeout, settings.getTimeout()); + } + + @Test + public void testNackImmediatelyWithDefaultTimeout() { + SubscriberShutdownSettings settings = + SubscriberShutdownSettings.newBuilder().setMode(ShutdownMode.NACK_IMMEDIATELY).build(); + + assertNotNull(settings); + assertEquals(ShutdownMode.NACK_IMMEDIATELY, settings.getMode()); + assertTrue(settings.getTimeout().isNegative()); // Indefinite timeout + } + + @Test + public void testNackImmediatelyWithCustomTimeout() { + Duration customTimeout = Duration.ofSeconds(10); + SubscriberShutdownSettings settings = + SubscriberShutdownSettings.newBuilder() + .setMode(ShutdownMode.NACK_IMMEDIATELY) + .setTimeout(customTimeout) + .build(); + + assertNotNull(settings); + assertEquals(ShutdownMode.NACK_IMMEDIATELY, settings.getMode()); + assertEquals(customTimeout, settings.getTimeout()); + } + + @Test + public void testZeroTimeout() { + Duration zeroTimeout = Duration.ZERO; + SubscriberShutdownSettings settings = + SubscriberShutdownSettings.newBuilder() + .setMode(ShutdownMode.WAIT_FOR_PROCESSING) + .setTimeout(zeroTimeout) + .build(); + + assertNotNull(settings); + assertEquals(ShutdownMode.WAIT_FOR_PROCESSING, settings.getMode()); + assertEquals(zeroTimeout, settings.getTimeout()); + assertTrue(settings.getTimeout().isZero()); + } + + @Test(expected = NullPointerException.class) + public void testNullModeThrowsException() { + SubscriberShutdownSettings.newBuilder().setMode(null).build(); + } + + @Test(expected = NullPointerException.class) + public void testNullTimeoutThrowsException() { + SubscriberShutdownSettings.newBuilder().setTimeout(null).build(); + } +} diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java index 253a91f73..649769d1c 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/SubscriberTest.java @@ -320,6 +320,104 @@ public void testPartialFlowControlSettings() throws Exception { Subscriber.Builder.DEFAULT_FLOW_CONTROL_SETTINGS.getMaxOutstandingElementCount()); } + @Test + public void testShutdown_waitForProcessing_indefinite() throws Exception { + final CountDownLatch messageReceived = new CountDownLatch(1); + final AckReplyConsumer[] consumer = new AckReplyConsumer[1]; + + MessageReceiver receiver = + new MessageReceiver() { + @Override + public void receiveMessage(PubsubMessage message, AckReplyConsumer c) { + consumer[0] = c; + messageReceived.countDown(); + } + }; + + Subscriber subscriber = + startSubscriber( + getTestSubscriberBuilder(receiver) + .setSubscriberShutdownSettings( + SubscriberShutdownSettings.newBuilder() + .setMode(SubscriberShutdownSettings.ShutdownMode.WAIT_FOR_PROCESSING) + .setTimeout(Duration.ofSeconds(-1)) + .build())); + + // Send a message and wait for it to be received. + fakeSubscriberServiceImpl.sendMessages(1); + messageReceived.await(10, TimeUnit.SECONDS); + + subscriber.stopAsync(); + + try { + subscriber.awaitTerminated(1, TimeUnit.SECONDS); + fail("Subscriber should not have terminated yet."); + } catch (TimeoutException e) { + // Expected + } + + // Now, ack the message, which should allow the subscriber to terminate. + consumer[0].ack(); + subscriber.awaitTerminated(10, TimeUnit.SECONDS); + } + + @Test + public void testShutdown_waitForProcessing_withTimeout_success() throws Exception { + Subscriber subscriber = + startSubscriber( + getTestSubscriberBuilder(testReceiver) + .setSubscriberShutdownSettings( + SubscriberShutdownSettings.newBuilder() + .setMode(SubscriberShutdownSettings.ShutdownMode.WAIT_FOR_PROCESSING) + .setTimeout(Duration.ofSeconds(10)) + .build())); + subscriber.stopAsync(); + fakeExecutor.advanceTime(Duration.ofSeconds(5)); + subscriber.awaitTerminated(1, TimeUnit.SECONDS); // Should terminate quickly now + } + + @Test + public void testShutdown_waitForProcessing_withTimeout_failure() throws Exception { + Subscriber subscriber = + startSubscriber( + getTestSubscriberBuilder(testReceiver) + .setSubscriberShutdownSettings( + SubscriberShutdownSettings.newBuilder() + .setMode(SubscriberShutdownSettings.ShutdownMode.WAIT_FOR_PROCESSING) + .setTimeout(Duration.ofSeconds(5)) + .build())); + subscriber.stopAsync(); + fakeExecutor.advanceTime(Duration.ofSeconds(6)); + subscriber.awaitTerminated(1, TimeUnit.SECONDS); // Should have timed out and terminated + } + + @Test + public void testShutdown_waitForProcessing_zeroTimeout() throws Exception { + Subscriber subscriber = + startSubscriber( + getTestSubscriberBuilder(testReceiver) + .setSubscriberShutdownSettings( + SubscriberShutdownSettings.newBuilder() + .setMode(SubscriberShutdownSettings.ShutdownMode.WAIT_FOR_PROCESSING) + .setTimeout(Duration.ZERO) + .build())); + subscriber.stopAsync(); + subscriber.awaitTerminated(1, TimeUnit.SECONDS); // Should terminate almost immediately + } + + @Test + public void testShutdown_nackImmediately() throws Exception { + Subscriber subscriber = + startSubscriber( + getTestSubscriberBuilder(testReceiver) + .setSubscriberShutdownSettings( + SubscriberShutdownSettings.newBuilder() + .setMode(SubscriberShutdownSettings.ShutdownMode.NACK_IMMEDIATELY) + .build())); + subscriber.stopAsync(); + subscriber.awaitTerminated(1, TimeUnit.SECONDS); // Should terminate almost immediately + } + private Subscriber startSubscriber(Builder testSubscriberBuilder) { Subscriber subscriber = testSubscriberBuilder.build(); subscriber.startAsync().awaitRunning(); diff --git a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/WaiterTest.java b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/WaiterTest.java index ca8618378..c9eed7e16 100644 --- a/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/WaiterTest.java +++ b/google-cloud-pubsub/src/test/java/com/google/cloud/pubsub/v1/WaiterTest.java @@ -17,7 +17,10 @@ package com.google.cloud.pubsub.v1; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import java.util.concurrent.TimeUnit; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -50,4 +53,47 @@ public void run() { assertEquals(0, waiter.pendingCount()); } + + @Test + public void testTryWait_Completes() { + final Waiter waiter = new Waiter(); + waiter.incrementPendingCount(1); + final FakeClock clock = new FakeClock(); + + Thread t = + new Thread( + () -> { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } + waiter.incrementPendingCount(-1); + }); + t.start(); + + assertTrue(waiter.tryWait(500, clock)); + } + + @Test + public void testTryWait_TimesOut() { + final Waiter waiter = new Waiter(); + waiter.incrementPendingCount(1); + final FakeClock clock = new FakeClock(); + + Thread t = + new Thread( + () -> { + clock.advance(100, TimeUnit.MILLISECONDS); + }); + t.start(); + + assertFalse(waiter.tryWait(100, clock)); + } + + @Test + public void testTryWait_NoPending() { + final Waiter waiter = new Waiter(); + final FakeClock clock = new FakeClock(); + assertTrue(waiter.tryWait(100, clock)); + } } From cac469e6aa7135e7b5806401eb141082822d205f Mon Sep 17 00:00:00 2001 From: Mike Prieto Date: Mon, 6 Oct 2025 15:52:00 +0000 Subject: [PATCH 2/2] fix: Fix typo in Subscriber class comments --- .../src/main/java/com/google/cloud/pubsub/v1/Subscriber.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java index 580cc0632..28ba62d8e 100644 --- a/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java +++ b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Subscriber.java @@ -565,7 +565,7 @@ public static final class Builder { * {@code ChannelProvider} to use to create Channels, which must point at Cloud Pub/Sub * endpoint. * - *

For performance, this client bene/fits from having multiple channels open at once. Users + *

For performance, this client benefits from having multiple channels open at once. Users * are encouraged to provide instances of {@code ChannelProvider} that creates new channels * instead of returning pre-initialized ones. */