Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class MessageDispatcher {

@InternalApi static final double PERCENTILE_FOR_ACK_DEADLINE_UPDATES = 99.9;
@InternalApi static final Duration PENDING_ACKS_SEND_DELAY = Duration.ofMillis(100);
@InternalApi static final long FINAL_NACK_TIMEOUT = Duration.ofSeconds(1).toMillis();

private final Executor executor;
private final SequentialExecutorService.AutoExecutor sequentialExecutor;
Expand Down Expand Up @@ -108,6 +109,8 @@ class MessageDispatcher {
private final SubscriptionName subscriptionNameObject;
private final boolean enableOpenTelemetryTracing;
private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false);
private final SubscriberShutdownSettings subscriberShutdownSettings;
private final AtomicBoolean nackImmediatelyShutdownInProgress = new AtomicBoolean(false);

/** Internal representation of a reply to a Pubsub message, to be sent back to the service. */
public enum AckReply {
Expand Down Expand Up @@ -170,12 +173,18 @@ public void onFailure(Throwable t) {
public void onSuccess(AckReply reply) {
switch (reply) {
case ACK:
pendingAcks.add(this.ackRequestData);
// Record the latency rounded to the next closest integer.
ackLatencyDistribution.record(
Ints.saturatedCast(
(long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D)));
tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "ack");
if (nackImmediatelyShutdownInProgress.get() && exactlyOnceDeliveryEnabled.get()) {
this.ackRequestData.setResponse(AckResponse.OTHER, true);
tracer.endSubscribeProcessSpan(
this.ackRequestData.getMessageWrapper(), "ack failed_with_nack_immediately");
} else {
pendingAcks.add(this.ackRequestData);
// Record the latency rounded to the next closest integer.
ackLatencyDistribution.record(
Ints.saturatedCast(
(long) Math.ceil((clock.millisTime() - receivedTimeMillis) / 1000D)));
tracer.endSubscribeProcessSpan(this.ackRequestData.getMessageWrapper(), "ack");
}
break;
case NACK:
pendingNacks.add(this.ackRequestData);
Expand Down Expand Up @@ -231,6 +240,7 @@ private MessageDispatcher(Builder builder) {
if (builder.tracer != null) {
tracer = builder.tracer;
}
this.subscriberShutdownSettings = builder.subscriberShutdownSettings;
}

private boolean shouldSetMessageFuture() {
Expand Down Expand Up @@ -294,8 +304,60 @@ public void run() {
}
}

private void nackAllOutstandingMessages() {
nackImmediatelyShutdownInProgress.set(true);
List<AckHandler> handlersToNack = new ArrayList<>(pendingMessages.values());
for (AckHandler ackHandler : handlersToNack) {
pendingNacks.add(ackHandler.getAckRequestData());
ackHandler.forget(); // This removes from pendingMessages, releases flow control, etc.
}
}

void stop() {
messagesWaiter.waitComplete();
switch (subscriberShutdownSettings.getMode()) {
case WAIT_FOR_PROCESSING:
logger.log(
Level.FINE,
"WAIT_FOR_PROCESSING shutdown mode: Waiting for outstanding messages to complete processing.");
java.time.Duration timeout = subscriberShutdownSettings.getTimeout();
if (timeout.isNegative()) {
// Indefinite wait use existing blocking wait
messagesWaiter.waitComplete();
} else {
// Wait for (timeout - 1 second) for messages to complete
long gracePeriodMillis = Math.max(0, timeout.toMillis() - FINAL_NACK_TIMEOUT);
boolean completedWait = messagesWaiter.tryWait(gracePeriodMillis, clock);
if (!completedWait) {
logger.log(
Level.WARNING,
"Grace period expired for WAIT_FOR_PROCESSING shutdown. Nacking remaining messages.");
// Switch to NACK_IMMEDIATELY behavior for remaining messages
nackAllOutstandingMessages();
}
}
cancelBackgroundJob();
processOutstandingOperations(); // Send any remaining acks/nacks.
break;

case NACK_IMMEDIATELY:
logger.log(Level.FINE, "NACK_IMMEDIATELY shutdown mode: Nacking all outstanding messages.");
// Stop extending deadlines immediately.
cancelBackgroundJob();
nackAllOutstandingMessages();
processOutstandingOperations(); // Send all pending nacks.
break;

default:
logger.log(Level.WARNING, "Unknown shutdown mode: " + subscriberShutdownSettings.getMode());
// Default to WAIT_FOR_PROCESSING behavior
messagesWaiter.waitComplete();
cancelBackgroundJob();
processOutstandingOperations();
break;
}
}

private void cancelBackgroundJob() {
jobLock.lock();
try {
if (backgroundJob != null) {
Expand All @@ -309,7 +371,6 @@ void stop() {
} finally {
jobLock.unlock();
}
processOutstandingOperations();
}

@InternalApi
Expand Down Expand Up @@ -364,6 +425,11 @@ void setMessageOrderingEnabled(boolean messageOrderingEnabled) {
this.messageOrderingEnabled.set(messageOrderingEnabled);
}

@InternalApi
boolean getNackImmediatelyShutdownInProgress() {
return nackImmediatelyShutdownInProgress.get();
}

private static class OutstandingMessage {
private final AckHandler ackHandler;

Expand Down Expand Up @@ -661,7 +727,7 @@ void processOutstandingOperations() {

List<AckRequestData> ackRequestDataReceipts = new ArrayList<AckRequestData>();
pendingReceipts.drainTo(ackRequestDataReceipts);
if (!ackRequestDataReceipts.isEmpty()) {
if (!ackRequestDataReceipts.isEmpty() && !getNackImmediatelyShutdownInProgress()) {
ModackRequestData receiptModack =
new ModackRequestData(this.getMessageDeadlineSeconds(), ackRequestDataReceipts);
receiptModack.setIsReceiptModack(true);
Expand Down Expand Up @@ -705,6 +771,7 @@ public static final class Builder {
private String subscriptionName;
private boolean enableOpenTelemetryTracing;
private OpenTelemetryPubsubTracer tracer;
private SubscriberShutdownSettings subscriberShutdownSettings;

protected Builder(MessageReceiver receiver) {
this.receiver = receiver;
Expand Down Expand Up @@ -791,6 +858,12 @@ public Builder setTracer(OpenTelemetryPubsubTracer tracer) {
return this;
}

public Builder setSubscriberShutdownSettings(
SubscriberShutdownSettings subscriberShutdownSettings) {
this.subscriberShutdownSettings = subscriberShutdownSettings;
return this;
}

public MessageDispatcher build() {
return new MessageDispatcher(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ final class StreamingSubscriberConnection extends AbstractApiService implements
private final String subscription;
private final SubscriptionName subscriptionNameObject;
private final ScheduledExecutorService systemExecutor;
private final ApiClock clock;
private final MessageDispatcher messageDispatcher;

private final FlowControlSettings flowControlSettings;
Expand All @@ -124,11 +125,13 @@ final class StreamingSubscriberConnection extends AbstractApiService implements

private final boolean enableOpenTelemetryTracing;
private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false);
private final SubscriberShutdownSettings subscriberShutdownSettings;

private StreamingSubscriberConnection(Builder builder) {
subscription = builder.subscription;
subscriptionNameObject = SubscriptionName.parse(builder.subscription);
systemExecutor = builder.systemExecutor;
clock = builder.clock;

// We need to set the default stream ack deadline on the initial request, this will be
// updated by modack requests in the message dispatcher
Expand Down Expand Up @@ -163,6 +166,7 @@ private StreamingSubscriberConnection(Builder builder) {
if (builder.tracer != null) {
tracer = builder.tracer;
}
this.subscriberShutdownSettings = builder.subscriberShutdownSettings;

messageDispatcher =
messageDispatcherBuilder
Expand All @@ -181,6 +185,7 @@ private StreamingSubscriberConnection(Builder builder) {
.setSubscriptionName(subscription)
.setEnableOpenTelemetryTracing(enableOpenTelemetryTracing)
.setTracer(tracer)
.setSubscriberShutdownSettings(subscriberShutdownSettings)
.build();

flowControlSettings = builder.flowControlSettings;
Expand Down Expand Up @@ -218,8 +223,21 @@ protected void doStop() {
}

private void runShutdown() {
java.time.Duration timeout = subscriberShutdownSettings.getTimeout();
if (timeout.isZero()) {
return;
}

messageDispatcher.stop();
ackOperationsWaiter.waitComplete();
if (timeout.isNegative()) {
ackOperationsWaiter.waitComplete();
} else {
boolean completedWait = ackOperationsWaiter.tryWait(timeout.toMillis(), clock);
if (!completedWait) {
logger.log(
Level.WARNING, "Timeout exceeded while waiting for ACK/NACK operations to complete.");
}
}
}

private class StreamingPullResponseObserver implements ResponseObserver<StreamingPullResponse> {
Expand Down Expand Up @@ -554,9 +572,18 @@ public void onSuccess(Empty empty) {
tracer.endSubscribeRpcSpan(rpcSpan);

for (AckRequestData ackRequestData : ackRequestDataList) {
// This will check if a response is needed, and if it has already been set
ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess);
messageDispatcher.notifyAckSuccess(ackRequestData);
// If we are in NACK_IMMEDIATELY shutdown mode, we will set failures on acks/nack so that
// an error is surfaced if the user
// manually acks or nacks in their callback.
if (setResponseOnSuccess
&& getExactlyOnceDeliveryEnabled()
&& messageDispatcher.getNackImmediatelyShutdownInProgress()) {
ackRequestData.setResponse(AckResponse.OTHER, setResponseOnSuccess);
messageDispatcher.notifyAckFailed(ackRequestData);
} else {
ackRequestData.setResponse(AckResponse.SUCCESSFUL, setResponseOnSuccess);
messageDispatcher.notifyAckSuccess(ackRequestData);
}
// Remove from our pending operations
pendingRequests.remove(ackRequestData);
tracer.addEndRpcEvent(
Expand Down Expand Up @@ -751,6 +778,7 @@ public static final class Builder {

private boolean enableOpenTelemetryTracing;
private OpenTelemetryPubsubTracer tracer;
private SubscriberShutdownSettings subscriberShutdownSettings;

protected Builder(MessageReceiver receiver) {
this.receiver = receiver;
Expand Down Expand Up @@ -852,6 +880,12 @@ public Builder setTracer(OpenTelemetryPubsubTracer tracer) {
return this;
}

public Builder setSubscriberShutdownSettings(
SubscriberShutdownSettings subscriberShutdownSettings) {
this.subscriberShutdownSettings = subscriberShutdownSettings;
return this;
}

public StreamingSubscriberConnection build() {
return new StreamingSubscriberConnection(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ public class Subscriber extends AbstractApiService implements SubscriberInterfac
private final boolean enableOpenTelemetryTracing;
private final OpenTelemetry openTelemetry;
private OpenTelemetryPubsubTracer tracer = new OpenTelemetryPubsubTracer(null, false);
private final SubscriberShutdownSettings subscriberShutdownSettings;

private Subscriber(Builder builder) {
receiver = builder.receiver;
Expand Down Expand Up @@ -223,6 +224,7 @@ private Subscriber(Builder builder) {

this.enableOpenTelemetryTracing = builder.enableOpenTelemetryTracing;
this.openTelemetry = builder.openTelemetry;
this.subscriberShutdownSettings = builder.subscriberShutdownSettings;
if (this.openTelemetry != null && this.enableOpenTelemetryTracing) {
Tracer openTelemetryTracer = builder.openTelemetry.getTracer(OPEN_TELEMETRY_TRACER_NAME);
if (openTelemetryTracer != null) {
Expand Down Expand Up @@ -366,7 +368,6 @@ protected void doStop() {
@Override
public void run() {
try {
// stop connection is no-op if connections haven't been started.
runShutdown();
notifyStopped();
} catch (Exception e) {
Expand All @@ -378,7 +379,13 @@ public void run() {
}

private void runShutdown() {
stopAllStreamingConnections();
java.time.Duration timeout = subscriberShutdownSettings.getTimeout();
long deadlineMillis = -1;
if (!timeout.isNegative()) {
deadlineMillis = clock.millisTime() + timeout.toMillis();
}

stopAllStreamingConnections(deadlineMillis);
shutdownBackgroundResources();
subscriberStub.shutdownNow();
}
Expand Down Expand Up @@ -420,6 +427,7 @@ private void startStreamingConnections() {
.setClock(clock)
.setEnableOpenTelemetryTracing(enableOpenTelemetryTracing)
.setTracer(tracer)
.setSubscriberShutdownSettings(subscriberShutdownSettings)
.build();

streamingSubscriberConnections.add(streamingSubscriberConnection);
Expand All @@ -445,8 +453,8 @@ public void failed(State from, Throwable failure) {
}
}

private void stopAllStreamingConnections() {
stopConnections(streamingSubscriberConnections);
private void stopAllStreamingConnections(long deadlineMillis) {
stopConnections(streamingSubscriberConnections, deadlineMillis);
}

private void shutdownBackgroundResources() {
Expand All @@ -466,7 +474,7 @@ private void startConnections(
}
}

private void stopConnections(List<? extends ApiService> connections) {
private void stopConnections(List<? extends ApiService> connections, long deadlineMillis) {
ArrayList<ApiService> liveConnections;
synchronized (connections) {
liveConnections = new ArrayList<ApiService>(connections);
Expand All @@ -477,11 +485,19 @@ private void stopConnections(List<? extends ApiService> connections) {
}
for (ApiService subscriber : liveConnections) {
try {
subscriber.awaitTerminated();
} catch (IllegalStateException e) {
// If the service fails, awaitTerminated will throw an exception.
// However, we could be stopping services because at least one
// has already failed, so we just ignore this exception.
if (deadlineMillis < 0) {
// Wait indefinitely
subscriber.awaitTerminated();
} else {
long remaining = deadlineMillis - clock.millisTime();
if (remaining < 0) {
remaining = 0;
}
subscriber.awaitTerminated(remaining, java.util.concurrent.TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
logger.log(Level.FINE, "Exception while waiting for a connection to terminate", e);
break; // Stop waiting for other connections.
}
}
}
Expand Down Expand Up @@ -532,6 +548,9 @@ public static final class Builder {
private boolean enableOpenTelemetryTracing = false;
private OpenTelemetry openTelemetry = null;

private SubscriberShutdownSettings subscriberShutdownSettings =
SubscriberShutdownSettings.newBuilder().build();

Builder(String subscription, MessageReceiver receiver) {
this.subscription = subscription;
this.receiver = receiver;
Expand Down Expand Up @@ -772,6 +791,18 @@ public Builder setOpenTelemetry(OpenTelemetry openTelemetry) {
return this;
}

/**
* Sets the shutdown settings for the subscriber. Defaults to {@link
* SubscriberShutdownSettings#newBuilder() default settings}.
*/
@BetaApi(
"The surface for SubscriberShutdownSettings is not stable yet and may be changed in the future.")
public Builder setSubscriberShutdownSettings(
SubscriberShutdownSettings subscriberShutdownSettings) {
this.subscriberShutdownSettings = Preconditions.checkNotNull(subscriberShutdownSettings);
return this;
}

/** Returns the default FlowControlSettings used by the client if settings are not provided. */
public static FlowControlSettings getDefaultFlowControlSettings() {
return DEFAULT_FLOW_CONTROL_SETTINGS;
Expand Down
Loading