From 2e13e4070aa676461793bccb5456135131db3c5e Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Fri, 3 Mar 2023 15:03:20 -0500 Subject: [PATCH 1/4] chore: delete CloseableMonitor errorprone is removing support for the annotation this uses --- .../pubsublite/internal/CloseableMonitor.java | 82 ---------- .../pubsublite/internal/MoreApiFutures.java | 19 +++ .../pubsublite/internal/SerialExecutor.java | 33 ++-- .../internal/wire/AssignerImpl.java | 27 ++-- .../internal/wire/CommitterImpl.java | 46 ++++-- .../internal/wire/ConnectedAssignerImpl.java | 11 +- .../wire/PartitionCountWatchingPublisher.java | 76 ++++----- .../internal/wire/PublisherImpl.java | 75 +++++---- .../internal/wire/RetryingConnectionImpl.java | 83 +++++----- .../internal/wire/SerialBatcher.java | 6 +- .../internal/wire/SingleConnection.java | 93 ++++++----- .../internal/wire/SubscriberImpl.java | 148 ++++++++---------- .../internal/CloseableMonitorTest.java | 68 -------- 13 files changed, 314 insertions(+), 453 deletions(-) delete mode 100755 google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/CloseableMonitor.java delete mode 100755 google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/CloseableMonitorTest.java diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/CloseableMonitor.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/CloseableMonitor.java deleted file mode 100755 index ac8ba3ca1..000000000 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/CloseableMonitor.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.internal; - -import com.google.common.util.concurrent.Monitor; -import com.google.errorprone.annotations.concurrent.LockMethod; -import com.google.errorprone.annotations.concurrent.UnlockMethod; - -/** Wraps a Monitor with methods that can be used with try-with-resources. */ -public class CloseableMonitor { - public final Monitor monitor = new Monitor(); - - /** - * try-with-resources wrapper for enterWhenUninterruptibly. For example: - * - *
-   * final Monitor.Guard guard = new Monitor.Guard(monitor.monitor) {
-   *     @Override
-   *     public boolean isSatisfied() {
-   *       assertThat(monitor.monitor.isOccupied()).isTrue();
-   *       return state;
-   *     }
-   * };
-   *
-   * try (CloseableMonitor.Hold h = monitor.enterWhenUninterruptibly(guard)) {
-   *   // Do stuff
-   * }
-   * // Monitor is automatically released
-   * 
- */ - @LockMethod("monitor") - public Hold enterWhenUninterruptibly(Monitor.Guard condition) { - monitor.enterWhenUninterruptibly(condition); - return new Hold(); - } - - /** - * try-with-resources wrapper for enter. For example... - * - *
{@code
-   * try (CloseableMonitor.Hold h = monitor.enter()) {
-   *   // Do stuff
-   * }
-   * // Monitor is automatically released
-   * }
- */ - @LockMethod("monitor") - public Hold enter() { - monitor.enter(); - return new Hold(); - } - - /** - * This is meant for use in the try-with-resources pattern. It will call leave() on a Monitor when - * it goes out of scope. This is cannot be constructed directly, but through usage of the static - * utility methods above. - */ - public class Hold implements AutoCloseable { - - private Hold() {} - - @UnlockMethod("monitor") - @Override - public void close() { - monitor.leave(); - } - } -} diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/MoreApiFutures.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/MoreApiFutures.java index 3869c06d2..2fbcc66c7 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/MoreApiFutures.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/MoreApiFutures.java @@ -16,11 +16,16 @@ package com.google.cloud.pubsublite.internal; +import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; + import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; import com.google.cloud.pubsublite.internal.wire.SystemExecutors; +import java.util.List; +import java.util.concurrent.Future; public final class MoreApiFutures { private MoreApiFutures() {} @@ -42,4 +47,18 @@ public void onSuccess(T t) { }, SystemExecutors.getFuturesExecutor()); } + + public static ApiFuture whenFirstDone(List> futures) { + SettableApiFuture anyDone = SettableApiFuture.create(); + futures.forEach(f -> f.addListener(() -> anyDone.set(null), directExecutor())); + return anyDone; + } + + public static T get(Future f) { + try { + return f.get(); + } catch (Throwable t) { + throw toCanonical(t).underlying; + } + } } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/SerialExecutor.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/SerialExecutor.java index 217b13406..a6856b54f 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/SerialExecutor.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/SerialExecutor.java @@ -16,6 +16,7 @@ package com.google.cloud.pubsublite.internal; +import com.google.common.util.concurrent.Monitor; import com.google.common.util.concurrent.Monitor.Guard; import java.util.ArrayDeque; import java.util.Queue; @@ -26,22 +27,22 @@ public final class SerialExecutor implements AutoCloseable, Executor { private final Executor executor; - private final CloseableMonitor monitor = new CloseableMonitor(); + private final Monitor monitor = new Monitor(); private final Guard isInactive = - new Guard(monitor.monitor) { + new Guard(monitor) { @Override public boolean isSatisfied() { return !isTaskActive; } }; - @GuardedBy("monitor.monitor") + @GuardedBy("monitor") private final Queue tasks; - @GuardedBy("monitor.monitor") + @GuardedBy("monitor") private boolean isTaskActive; - @GuardedBy("monitor.monitor") + @GuardedBy("monitor") private boolean isShutdown; public SerialExecutor(Executor executor) { @@ -53,7 +54,7 @@ public SerialExecutor(Executor executor) { /** Waits until there are no active tasks. */ public void waitUntilInactive() { - try (CloseableMonitor.Hold h = monitor.enterWhenUninterruptibly(isInactive)) {} + monitor.enterWhenUninterruptibly(isInactive); } /** @@ -62,14 +63,18 @@ public void waitUntilInactive() { */ @Override public void close() { - try (CloseableMonitor.Hold h = monitor.enter()) { + monitor.enter(); + try { isShutdown = true; + } finally { + monitor.leave(); } } @Override public void execute(Runnable r) { - try (CloseableMonitor.Hold h = monitor.enter()) { + monitor.enter(); + try { if (isShutdown) { return; } @@ -86,21 +91,29 @@ public void execute(Runnable r) { if (!isTaskActive) { scheduleNextTask(); } + } finally { + monitor.leave(); } } private boolean shouldExecuteTask() { - try (CloseableMonitor.Hold h = monitor.enter()) { + monitor.enter(); + try { return !isShutdown; + } finally { + monitor.leave(); } } private void scheduleNextTask() { - try (CloseableMonitor.Hold h = monitor.enter()) { + monitor.enter(); + try { isTaskActive = !tasks.isEmpty() && !isShutdown; if (isTaskActive) { executor.execute(tasks.poll()); } + } finally { + monitor.leave(); } } } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerImpl.java index 1f545dc4c..0da2db463 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerImpl.java @@ -21,7 +21,6 @@ import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.internal.CheckedApiException; -import com.google.cloud.pubsublite.internal.CloseableMonitor; import com.google.cloud.pubsublite.internal.ProxyService; import com.google.cloud.pubsublite.proto.InitialPartitionAssignmentRequest; import com.google.cloud.pubsublite.proto.PartitionAssignment; @@ -41,12 +40,10 @@ public class AssignerImpl extends ProxyService private final PartitionAssignmentRequest initialRequest; private final String uuidHex; - private final CloseableMonitor monitor = new CloseableMonitor(); - - @GuardedBy("monitor.monitor") + @GuardedBy("this") private final RetryingConnection connection; - @GuardedBy("monitor.monitor") + @GuardedBy("this") private final PartitionAssignmentReceiver receiver; @VisibleForTesting @@ -80,10 +77,8 @@ public AssignerImpl( } @Override - public void triggerReinitialize(CheckedApiException streamError) { - try (CloseableMonitor.Hold h = monitor.enter()) { - connection.reinitialize(initialRequest); - } + public synchronized void triggerReinitialize(CheckedApiException streamError) { + connection.reinitialize(initialRequest); } private static Set toSet(PartitionAssignment assignment) throws ApiException { @@ -95,13 +90,11 @@ private static Set toSet(PartitionAssignment assignment) throws ApiEx } @Override - public void onClientResponse(PartitionAssignment value) throws CheckedApiException { - try (CloseableMonitor.Hold h = monitor.enter()) { - Set partitions = toSet(value); - logger.atFine().log("Subscriber with uuid %s received assignment: %s", uuidHex, partitions); - receiver.handleAssignment(partitions); - logger.atInfo().log("Subscriber with uuid %s handled assignment: %s", uuidHex, partitions); - connection.modifyConnection(connectionOr -> connectionOr.ifPresent(ConnectedAssigner::ack)); - } + public synchronized void onClientResponse(PartitionAssignment value) throws CheckedApiException { + Set partitions = toSet(value); + logger.atFine().log("Subscriber with uuid %s received assignment: %s", uuidHex, partitions); + receiver.handleAssignment(partitions); + logger.atInfo().log("Subscriber with uuid %s handled assignment: %s", uuidHex, partitions); + connection.modifyConnection(connectionOr -> connectionOr.ifPresent(ConnectedAssigner::ack)); } } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterImpl.java index c5fef43eb..fc57555ea 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterImpl.java @@ -23,7 +23,6 @@ import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.internal.CheckedApiException; -import com.google.cloud.pubsublite.internal.CloseableMonitor; import com.google.cloud.pubsublite.internal.ProxyService; import com.google.cloud.pubsublite.proto.InitialCommitCursorRequest; import com.google.cloud.pubsublite.proto.SequencedCommitCursorResponse; @@ -31,6 +30,7 @@ import com.google.cloud.pubsublite.proto.StreamingCommitCursorResponse; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Monitor; import com.google.common.util.concurrent.Monitor.Guard; import com.google.errorprone.annotations.concurrent.GuardedBy; import java.util.Optional; @@ -39,25 +39,25 @@ public class CommitterImpl extends ProxyService implements Committer, RetryingConnectionObserver { private final StreamingCommitCursorRequest initialRequest; - private final CloseableMonitor monitor = new CloseableMonitor(); + private final Monitor monitor = new Monitor(); private final Guard isEmptyOrError = - new Guard(monitor.monitor) { + new Guard(monitor) { public boolean isSatisfied() { // Wait until the state is empty or a permanent error occurred. return state.isEmpty() || permanentError.isPresent(); } }; - @GuardedBy("monitor.monitor") + @GuardedBy("monitor") private final RetryingConnection connection; - @GuardedBy("monitor.monitor") + @GuardedBy("monitor") private boolean shutdown = false; - @GuardedBy("monitor.monitor") + @GuardedBy("monitor") private Optional permanentError = Optional.empty(); - @GuardedBy("monitor.monitor") + @GuardedBy("monitor") private final CommitState state = new CommitState(); @VisibleForTesting @@ -83,25 +83,32 @@ public CommitterImpl( // ProxyService implementation. @Override protected void handlePermanentError(CheckedApiException error) { - try (CloseableMonitor.Hold h = monitor.enter()) { + monitor.enter(); + try { permanentError = Optional.of(error); shutdown = true; state.abort(error); + } finally { + monitor.leave(); } } @Override protected void stop() { - try (CloseableMonitor.Hold h = monitor.enter()) { + monitor.enter(); + try { shutdown = true; + monitor.waitForUninterruptibly(isEmptyOrError); + } finally { + monitor.leave(); } - try (CloseableMonitor.Hold h = monitor.enterWhenUninterruptibly(isEmptyOrError)) {} } // RetryingConnectionObserver implementation. @Override public void triggerReinitialize(CheckedApiException streamError) { - try (CloseableMonitor.Hold h = monitor.enter()) { + monitor.enter(); + try { connection.reinitialize(initialRequest); Optional offsetOr = state.reinitializeAndReturnToSend(); if (!offsetOr.isPresent()) return; // There are no outstanding commit requests. @@ -112,21 +119,27 @@ public void triggerReinitialize(CheckedApiException streamError) { }); } catch (CheckedApiException e) { onPermanentError(e); + } finally { + monitor.leave(); } } @Override public void onClientResponse(SequencedCommitCursorResponse value) throws CheckedApiException { Preconditions.checkArgument(value.getAcknowledgedCommits() > 0); - try (CloseableMonitor.Hold h = monitor.enter()) { + monitor.enter(); + try { state.complete(value.getAcknowledgedCommits()); + } finally { + monitor.leave(); } } // Committer implementation. @Override public ApiFuture commitOffset(Offset offset) { - try (CloseableMonitor.Hold h = monitor.enter()) { + monitor.enter(); + try { checkState(!shutdown, "Committed after the stream shut down."); connection.modifyConnection( connectedCommitter -> @@ -135,15 +148,20 @@ public ApiFuture commitOffset(Offset offset) { } catch (CheckedApiException e) { onPermanentError(e); return ApiFutures.immediateFailedFuture(e); + } finally { + monitor.leave(); } } @Override public void waitUntilEmpty() throws CheckedApiException { - try (CloseableMonitor.Hold h = monitor.enterWhenUninterruptibly(isEmptyOrError)) { + monitor.enterWhenUninterruptibly(isEmptyOrError); + try { if (permanentError.isPresent()) { throw permanentError.get(); } + } finally { + monitor.leave(); } } } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ConnectedAssignerImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ConnectedAssignerImpl.java index 3585116db..2f45b9f41 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ConnectedAssignerImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ConnectedAssignerImpl.java @@ -20,7 +20,6 @@ import com.google.api.gax.rpc.ResponseObserver; import com.google.cloud.pubsublite.internal.CheckedApiException; -import com.google.cloud.pubsublite.internal.CloseableMonitor; import com.google.cloud.pubsublite.proto.PartitionAssignment; import com.google.cloud.pubsublite.proto.PartitionAssignmentAck; import com.google.cloud.pubsublite.proto.PartitionAssignmentRequest; @@ -32,9 +31,7 @@ public class ConnectedAssignerImpl implements ConnectedAssigner { private static final Duration STREAM_IDLE_TIMEOUT = Duration.ofMinutes(10); - private final CloseableMonitor monitor = new CloseableMonitor(); - - @GuardedBy("monitor.monitor") + @GuardedBy("this") boolean outstanding = false; private ConnectedAssignerImpl( @@ -65,7 +62,7 @@ protected void handleInitialResponse(PartitionAssignment response) throws Checke @Override protected void handleStreamResponse(PartitionAssignment response) throws CheckedApiException { - try (CloseableMonitor.Hold h = monitor.enter()) { + synchronized (this) { checkState( !outstanding, "Received assignment from the server while there was an assignment outstanding."); @@ -76,8 +73,8 @@ protected void handleStreamResponse(PartitionAssignment response) throws Checked // ConnectedAssigner implementation. @Override - public void ack() { - try (CloseableMonitor.Hold h = monitor.enter()) { + public synchronized void ack() { + try { checkState(outstanding, "Client acknowledged when there was no request outstanding."); outstanding = false; } catch (CheckedApiException e) { diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java index 77614d3e5..48aa07e64 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java @@ -27,7 +27,6 @@ import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.internal.CheckedApiException; -import com.google.cloud.pubsublite.internal.CloseableMonitor; import com.google.cloud.pubsublite.internal.ProxyService; import com.google.cloud.pubsublite.internal.Publisher; import com.google.cloud.pubsublite.internal.RoutingPolicy; @@ -89,12 +88,10 @@ public void stop() { } } - private final CloseableMonitor monitor = new CloseableMonitor(); - - @GuardedBy("monitor.monitor") + @GuardedBy("this") private boolean shutdown = false; - @GuardedBy("monitor.monitor") + @GuardedBy("this") private Optional partitionsWithRouting = Optional.empty(); PartitionCountWatchingPublisher( @@ -107,12 +104,13 @@ public void stop() { addServices(configWatcher, autoCloseableAsApiService(publisherFactory)); } + private synchronized Optional getPartitions() { + return partitionsWithRouting; + } + @Override public ApiFuture publish(PubSubMessage message) { - Optional partitions; - try (CloseableMonitor.Hold h = monitor.enter()) { - partitions = partitionsWithRouting; - } + Optional partitions = getPartitions(); if (!partitions.isPresent()) { throw new IllegalStateException("Publish called before start or after shutdown"); } @@ -126,10 +124,7 @@ public ApiFuture publish(PubSubMessage message) { @Override public void cancelOutstandingPublishes() { - Optional partitions; - try (CloseableMonitor.Hold h = monitor.enter()) { - partitions = partitionsWithRouting; - } + Optional partitions = getPartitions(); if (!partitions.isPresent()) { throw new IllegalStateException( "Cancel outstanding publishes called before start or after shutdown"); @@ -139,10 +134,7 @@ public void cancelOutstandingPublishes() { @Override public void flush() throws IOException { - Optional partitions; - try (CloseableMonitor.Hold h = monitor.enter()) { - partitions = partitionsWithRouting; - } + Optional partitions = getPartitions(); if (!partitions.isPresent()) { throw new IllegalStateException("Publish called before start or after shutdown"); } @@ -171,39 +163,35 @@ public void failed(State from, Throwable failure) { return partitions; } - private void handleConfig(long partitionCount) { - try (CloseableMonitor.Hold h = monitor.enter()) { - if (shutdown) { - return; - } - Optional current = partitionsWithRouting; - long currentSize = current.map(withRouting -> withRouting.publishers.size()).orElse(0); - if (partitionCount == currentSize) { - return; - } - if (partitionCount < currentSize) { - log.atWarning().log( - "Received an unexpected decrease in partition count. Previous partition count %s, new count %s", - currentSize, partitionCount); - return; - } - ImmutableMap.Builder> mapBuilder = - ImmutableMap.builder(); - current.ifPresent(p -> p.publishers.forEach(mapBuilder::put)); - getNewPartitionPublishers(LongStream.range(currentSize, partitionCount)) - .forEach(mapBuilder::put); - - partitionsWithRouting = - Optional.of( - new PartitionsWithRouting( - mapBuilder.build(), policyFactory.newPolicy(partitionCount))); + private synchronized void handleConfig(long partitionCount) { + if (shutdown) { + return; + } + Optional current = partitionsWithRouting; + long currentSize = current.map(withRouting -> withRouting.publishers.size()).orElse(0); + if (partitionCount == currentSize) { + return; } + if (partitionCount < currentSize) { + log.atWarning().log( + "Received an unexpected decrease in partition count. Previous partition count %s, new count %s", + currentSize, partitionCount); + return; + } + ImmutableMap.Builder> mapBuilder = ImmutableMap.builder(); + current.ifPresent(p -> p.publishers.forEach(mapBuilder::put)); + getNewPartitionPublishers(LongStream.range(currentSize, partitionCount)) + .forEach(mapBuilder::put); + + partitionsWithRouting = + Optional.of( + new PartitionsWithRouting(mapBuilder.build(), policyFactory.newPolicy(partitionCount))); } @Override protected void stop() { Optional current; - try (CloseableMonitor.Hold h = monitor.enter()) { + synchronized (this) { shutdown = true; current = partitionsWithRouting; partitionsWithRouting = Optional.empty(); diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java index f2600a5c1..e3253d1d6 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java @@ -31,7 +31,6 @@ import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.internal.AlarmFactory; import com.google.cloud.pubsublite.internal.CheckedApiException; -import com.google.cloud.pubsublite.internal.CloseableMonitor; import com.google.cloud.pubsublite.internal.ProxyService; import com.google.cloud.pubsublite.internal.PublishSequenceNumber; import com.google.cloud.pubsublite.internal.SequencedPublisher; @@ -64,31 +63,27 @@ public final class PublisherImpl extends ProxyService private final AlarmFactory alarmFactory; private final PublishRequest initialRequest; - private final CloseableMonitor monitor = new CloseableMonitor(); + private final Monitor monitor = new Monitor(); private final Monitor.Guard noneInFlight = - new Monitor.Guard(monitor.monitor) { + new Monitor.Guard(monitor) { @Override public boolean isSatisfied() { return batchesInFlight.isEmpty() || shutdown; } }; - @GuardedBy("monitor.monitor") + @GuardedBy("monitor") private Optional> alarmFuture = Optional.empty(); - @GuardedBy("monitor.monitor") + @GuardedBy("monitor") private final RetryingConnection connection; - @GuardedBy("monitor.monitor") + @GuardedBy("monitor") private boolean shutdown = false; - @GuardedBy("monitor.monitor") + @GuardedBy("monitor") private Offset lastSentOffset = Offset.of(-1); - // batcherMonitor is always acquired after monitor.monitor when both are held. - private final CloseableMonitor batcherMonitor = new CloseableMonitor(); - - @GuardedBy("batcherMonitor.monitor") private final SerialBatcher batcher; private static class InFlightBatch { @@ -114,7 +109,7 @@ void failBatch(int startIdx, CheckedApiException e) { } // An ordered list of batches in flight. - @GuardedBy("monitor.monitor") + @GuardedBy("monitor") private final Queue batchesInFlight = new ArrayDeque<>(); @VisibleForTesting @@ -153,7 +148,7 @@ public PublisherImpl( batchingSettings); } - @GuardedBy("monitor.monitor") + @GuardedBy("monitor") private void rebatchForRestart() { Queue messages = batchesInFlight.stream() @@ -188,7 +183,8 @@ private void rebatchForRestart() { @Override public void triggerReinitialize(CheckedApiException streamError) { - try (CloseableMonitor.Hold h = monitor.enter()) { + monitor.enter(); + try { connection.reinitialize(initialRequest); rebatchForRestart(); Collection batches = batchesInFlight; @@ -203,50 +199,59 @@ public void triggerReinitialize(CheckedApiException streamError) { }); } catch (CheckedApiException e) { onPermanentError(e); + } finally { + monitor.leave(); } } @Override protected void handlePermanentError(CheckedApiException error) { - try (CloseableMonitor.Hold h = monitor.enter()) { + monitor.enter(); + try { shutdown = true; this.alarmFuture.ifPresent(future -> future.cancel(false)); this.alarmFuture = Optional.empty(); terminateOutstandingPublishes(error); + } finally { + monitor.leave(); } } @Override protected void start() { - try (CloseableMonitor.Hold h = monitor.enter()) { + monitor.enter(); + try { this.alarmFuture = Optional.of(alarmFactory.newAlarm(this::flushToStream)); + } finally { + monitor.leave(); } } @Override protected void stop() { flush(); // Flush any outstanding messages that were batched. - try (CloseableMonitor.Hold h = monitor.enter()) { + monitor.enter(); + try { shutdown = true; this.alarmFuture.ifPresent(future -> future.cancel(false)); this.alarmFuture = Optional.empty(); + } finally { + monitor.leave(); } flush(); // Flush again in case messages were added since shutdown was set. } - @GuardedBy("monitor.monitor") + @GuardedBy("monitor") private void terminateOutstandingPublishes(CheckedApiException e) { batchesInFlight.forEach( batch -> batch.messages.forEach(message -> message.future().setException(e))); - try (CloseableMonitor.Hold h = batcherMonitor.enter()) { - batcher.flush().forEach(batch -> batch.forEach(m -> m.future().setException(e))); - } + batcher.flush().forEach(batch -> batch.forEach(m -> m.future().setException(e))); batchesInFlight.clear(); } @Override public ApiFuture publish(PubSubMessage message, PublishSequenceNumber sequenceNumber) { - try (CloseableMonitor.Hold h = batcherMonitor.enter()) { + try { ApiService.State currentState = state(); switch (currentState) { case FAILED: @@ -270,28 +275,30 @@ public ApiFuture publish(PubSubMessage message, PublishSequenceNumber se @Override public void cancelOutstandingPublishes() { - try (CloseableMonitor.Hold h = monitor.enter()) { + monitor.enter(); + try { terminateOutstandingPublishes( new CheckedApiException("Cancelled by client.", Code.CANCELLED)); + } finally { + monitor.leave(); } } private void flushToStream() { - try (CloseableMonitor.Hold h = monitor.enter()) { + monitor.enter(); + try { if (shutdown) return; - List> batches; - try (CloseableMonitor.Hold h2 = batcherMonitor.enter()) { - batches = batcher.flush(); - } - for (List batch : batches) { + for (List batch : batcher.flush()) { processBatch(batch); } } catch (CheckedApiException e) { onPermanentError(e); + } finally { + monitor.leave(); } } - @GuardedBy("monitor.monitor") + @GuardedBy("monitor") private void processBatch(List batch) throws CheckedApiException { if (batch.isEmpty()) return; InFlightBatch inFlightBatch = new InFlightBatch(batch); @@ -309,7 +316,8 @@ private void processBatch(List batch) throws CheckedApiExcepti @Override public void flush() { flushToStream(); - try (CloseableMonitor.Hold h = monitor.enterWhenUninterruptibly(noneInFlight)) {} + monitor.enterWhenUninterruptibly(noneInFlight); + monitor.leave(); } @Override @@ -318,7 +326,8 @@ public void onClientResponse(MessagePublishResponse publishResponse) throws Chec ImmutableList ranges = ImmutableList.sortedCopyOf( comparing(CursorRange::getStartIndex), publishResponse.getCursorRangesList()); - try (CloseableMonitor.Hold h = monitor.enter()) { + monitor.enter(); + try { checkState( !batchesInFlight.isEmpty(), "Received publish response with no batches in flight."); InFlightBatch batch = batchesInFlight.remove(); @@ -363,6 +372,8 @@ public void onClientResponse(MessagePublishResponse publishResponse) throws Chec throw e; } } + } finally { + monitor.leave(); } } } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RetryingConnectionImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RetryingConnectionImpl.java index d1e87def0..cf2e97edf 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RetryingConnectionImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RetryingConnectionImpl.java @@ -24,7 +24,6 @@ import com.google.api.gax.rpc.StreamController; import com.google.cloud.pubsublite.ErrorCodes; import com.google.cloud.pubsublite.internal.CheckedApiException; -import com.google.cloud.pubsublite.internal.CloseableMonitor; import com.google.cloud.pubsublite.internal.ExtractStatus; import com.google.common.flogger.GoogleLogger; import java.time.Duration; @@ -55,19 +54,18 @@ class RetryingConnectionImpl< connectionFactory; private final RetryingConnectionObserver observer; - // connectionMonitor will not be held in any upcalls. - private final CloseableMonitor connectionMonitor = new CloseableMonitor(); + // "this" will not be held in any upcalls. - @GuardedBy("connectionMonitor.monitor") + @GuardedBy("this") private long nextRetryBackoffDuration = INITIAL_RECONNECT_BACKOFF_TIME.toMillis(); - @GuardedBy("connectionMonitor.monitor") + @GuardedBy("this") private StreamRequestT lastInitialRequest; - @GuardedBy("connectionMonitor.monitor") + @GuardedBy("this") private ConnectionT currentConnection; - @GuardedBy("connectionMonitor.monitor") + @GuardedBy("this") private boolean completed = false; RetryingConnectionImpl( @@ -83,11 +81,8 @@ class RetryingConnectionImpl< } @Override - protected void doStart() { - StreamRequestT initialInitialRequest; - try (CloseableMonitor.Hold h = connectionMonitor.enter()) { - initialInitialRequest = lastInitialRequest; - } + protected synchronized void doStart() { + StreamRequestT initialInitialRequest = lastInitialRequest; SystemExecutors.getFuturesExecutor() .execute( () -> { @@ -98,19 +93,17 @@ protected void doStart() { // Reinitialize the stream. Must be called in a downcall to prevent deadlock. @Override - public void reinitialize(StreamRequestT initialRequest) { - try (CloseableMonitor.Hold h = connectionMonitor.enter()) { - if (completed) return; - lastInitialRequest = initialRequest; - logger.atFiner().log("Start initializing connection for %s", streamDescription()); - currentConnection = connectionFactory.New(streamFactory, this, lastInitialRequest); - logger.atFiner().log("Finished initializing connection for %s", streamDescription()); - } + public synchronized void reinitialize(StreamRequestT initialRequest) { + if (completed) return; + lastInitialRequest = initialRequest; + logger.atFiner().log("Start initializing connection for %s", streamDescription()); + currentConnection = connectionFactory.New(streamFactory, this, lastInitialRequest); + logger.atFiner().log("Finished initializing connection for %s", streamDescription()); } @Override - protected void doStop() { - try (CloseableMonitor.Hold h = connectionMonitor.enter()) { + protected synchronized void doStop() { + try { if (completed) return; completed = true; logger.atFine().log("Terminating connection for %s", streamDescription()); @@ -128,21 +121,18 @@ protected void doStop() { // Run modification on the current connection or empty if not connected. @Override - public void modifyConnection(Modifier modifier) throws CheckedApiException { - try (CloseableMonitor.Hold h = connectionMonitor.enter()) { - if (completed) { - modifier.modify(Optional.empty()); - } else { - modifier.modify(Optional.of(currentConnection)); - } + public synchronized void modifyConnection(Modifier modifier) + throws CheckedApiException { + if (completed) { + modifier.modify(Optional.empty()); + } else { + modifier.modify(Optional.of(currentConnection)); } } - void setPermanentError(Throwable error) { - try (CloseableMonitor.Hold h = connectionMonitor.enter()) { - if (completed) return; - completed = true; - } + synchronized void setPermanentError(Throwable error) { + if (completed) return; + completed = true; logger.atInfo().withCause(error).log("Permanent error occurred for %s", streamDescription()); notifyFailed(error); } @@ -156,7 +146,7 @@ public void onStart(StreamController controller) { // ResponseObserver implementation @Override public final void onResponse(ClientResponseT value) { - try (CloseableMonitor.Hold h = connectionMonitor.enter()) { + synchronized (this) { if (completed) return; nextRetryBackoffDuration = INITIAL_RECONNECT_BACKOFF_TIME.toMillis(); } @@ -181,16 +171,15 @@ public final void onError(Throwable t) { } Optional throwable = Optional.empty(); long backoffTime = 0; - try (CloseableMonitor.Hold h = connectionMonitor.enter()) { - if (currentConnection != null) { - currentConnection.close(); + try { + synchronized (this) { + if (currentConnection != null) { + currentConnection.close(); + } + backoffTime = nextRetryBackoffDuration; + nextRetryBackoffDuration = Math.min(backoffTime * 2, MAX_RECONNECT_BACKOFF_TIME.toMillis()); } - backoffTime = nextRetryBackoffDuration; - nextRetryBackoffDuration = Math.min(backoffTime * 2, MAX_RECONNECT_BACKOFF_TIME.toMillis()); } catch (Throwable t2) { - throwable = Optional.of(t2); - } - if (throwable.isPresent()) { setPermanentError( new CheckedApiException( "Failed to close preexisting stream after error.", @@ -225,7 +214,7 @@ private void triggerReinitialize(CheckedApiException streamError) { public final void onComplete() { logger.atFine().log("Stream completed for %s", streamDescription()); boolean expectedCompletion; - try (CloseableMonitor.Hold h = connectionMonitor.enter()) { + synchronized (this) { expectedCompletion = completed; } if (!expectedCompletion) { @@ -234,9 +223,7 @@ public final void onComplete() { } } - private String streamDescription() { - try (CloseableMonitor.Hold h = connectionMonitor.enter()) { - return lastInitialRequest.getClass().getSimpleName() + ": " + lastInitialRequest.toString(); - } + private synchronized String streamDescription() { + return lastInitialRequest.getClass().getSimpleName() + ": " + lastInitialRequest.toString(); } } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SerialBatcher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SerialBatcher.java index bbeb71d40..df3ee03d9 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SerialBatcher.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SerialBatcher.java @@ -29,7 +29,7 @@ import java.util.Deque; import java.util.List; -// A thread compatible batcher which preserves message order. +// A thread safe batcher which preserves message order. class SerialBatcher { private final long byteLimit; private final long messageLimit; @@ -64,7 +64,7 @@ private static boolean hasSequenceDiscontinuity( this.messageLimit = messageLimit; } - ApiFuture add(PubSubMessage message, PublishSequenceNumber sequenceNumber) + synchronized ApiFuture add(PubSubMessage message, PublishSequenceNumber sequenceNumber) throws CheckedApiException { if (!messages.isEmpty() && hasSequenceDiscontinuity(messages.peekLast().sequenceNumber(), sequenceNumber)) { @@ -79,7 +79,7 @@ && hasSequenceDiscontinuity(messages.peekLast().sequenceNumber(), sequenceNumber return future; } - List> flush() { + synchronized List> flush() { List> toReturn = new ArrayList<>(); List currentBatch = new ArrayList<>(); toReturn.add(currentBatch); diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SingleConnection.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SingleConnection.java index 1b181c1c1..5b321c526 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SingleConnection.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SingleConnection.java @@ -16,17 +16,21 @@ package com.google.cloud.pubsublite.internal.wire; +import static com.google.cloud.pubsublite.internal.MoreApiFutures.whenFirstDone; +import static javax.swing.UIManager.get; + +import com.google.api.core.SettableApiFuture; import com.google.api.gax.rpc.ClientStream; import com.google.api.gax.rpc.ResponseObserver; import com.google.api.gax.rpc.StatusCode.Code; import com.google.api.gax.rpc.StreamController; import com.google.cloud.pubsublite.internal.CheckedApiException; -import com.google.cloud.pubsublite.internal.CloseableMonitor; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; import com.google.common.flogger.GoogleLogger; -import com.google.common.util.concurrent.Monitor.Guard; import com.google.errorprone.annotations.concurrent.GuardedBy; import java.time.Duration; +import java.util.concurrent.Future; /** * A SingleConnection handles the state for a stream with an initial connection request that may @@ -47,13 +51,11 @@ public abstract class SingleConnection receivedInitial = SettableApiFuture.create(); - @GuardedBy("connectionMonitor.monitor") - private boolean completed = false; + @GuardedBy("this") + private final SettableApiFuture completed = SettableApiFuture.create(); protected abstract void handleInitialResponse(StreamResponseT response) throws CheckedApiException; @@ -82,39 +84,32 @@ protected void initialize(StreamRequestT initialRequest) { if (!expectInitial) { return; } - try (CloseableMonitor.Hold h = - connectionMonitor.enterWhenUninterruptibly( - new Guard(connectionMonitor.monitor) { - @Override - public boolean isSatisfied() { - return receivedInitial || completed; - } - })) {} - } - - protected void sendToStream(StreamRequestT request) { - try (CloseableMonitor.Hold h = connectionMonitor.enter()) { - if (completed) { - log.atFine().log("Sent request after stream completion: %s", request); - return; - } - // This should be impossible to not have received the initial request, or be completed, and - // the caller has access to this object. - Preconditions.checkState(receivedInitial); - requestStream.send(request); + get(receivedInitialOrDone()); + } + + private synchronized Future receivedInitialOrDone() { + return whenFirstDone(ImmutableList.of(receivedInitial, completed)); + } + + protected synchronized void sendToStream(StreamRequestT request) { + if (isCompleted()) { + log.atFine().log("Sent request after stream completion: %s", request); + return; } + // This should be impossible to not have received the initial request, or be completed, and + // the caller has access to this object. + Preconditions.checkState(didReceiveInitial()); + requestStream.send(request); } protected void sendToClient(ClientResponseT response) { - try (CloseableMonitor.Hold h = connectionMonitor.enter()) { - if (completed) { - log.atFine().log("Sent response after stream completion: %s", response); - return; - } - // This should be impossible to not have received the initial request, or be completed, and - // the caller has access to this object. - Preconditions.checkState(receivedInitial); + if (isCompleted()) { + log.atFine().log("Sent response after stream completion: %s", response); + return; } + // This should be impossible to not have received the initial request, or be completed, and + // the caller has access to this object. + Preconditions.checkState(didReceiveInitial()); // The upcall may be reentrant, possibly on another thread while this thread is blocked. clientStream.onResponse(response); } @@ -123,20 +118,22 @@ protected void setError(CheckedApiException error) { abort(error); } - protected boolean isCompleted() { - try (CloseableMonitor.Hold h = connectionMonitor.enter()) { - return completed; - } + protected synchronized boolean isCompleted() { + return completed.isDone(); + } + + private synchronized boolean didReceiveInitial() { + return receivedInitial.isDone(); } // Records the connection as completed and performs tear down, if not already completed. Returns // whether the connection was already complete. - private boolean completeStream() { - try (CloseableMonitor.Hold h = connectionMonitor.enter()) { - if (completed) { + private synchronized boolean completeStream() { + try { + if (isCompleted()) { return true; } - completed = true; + completed.set(null); streamIdleTimer.close(); } catch (Exception e) { log.atSevere().withCause(e).log("Error occurred while shutting down connection."); @@ -168,14 +165,14 @@ public void onStart(StreamController streamController) {} @Override public void onResponse(StreamResponseT response) { boolean isFirst; - try (CloseableMonitor.Hold h = connectionMonitor.enter()) { + synchronized (this) { streamIdleTimer.restart(); - if (completed) { + if (isCompleted()) { log.atFine().log("Received response on stream after completion: %s", response); return; } - isFirst = !receivedInitial; - receivedInitial = true; + isFirst = !didReceiveInitial(); + receivedInitial.set(null); } try { if (isFirst) { diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java index f0d320504..af3d8c066 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java @@ -22,7 +22,6 @@ import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.internal.AlarmFactory; import com.google.cloud.pubsublite.internal.CheckedApiException; -import com.google.cloud.pubsublite.internal.CloseableMonitor; import com.google.cloud.pubsublite.internal.ExtractStatus; import com.google.cloud.pubsublite.internal.ProxyService; import com.google.cloud.pubsublite.internal.SerialExecutor; @@ -62,24 +61,22 @@ public class SubscriberImpl extends ProxyService // Used to ensure messages are delivered to consumers in order. private final SerialExecutor messageDeliveryExecutor; - private final CloseableMonitor monitor = new CloseableMonitor(); - - @GuardedBy("monitor.monitor") + @GuardedBy("this") private Optional> alarmFuture = Optional.empty(); - @GuardedBy("monitor.monitor") + @GuardedBy("this") private final RetryingConnection connection; - @GuardedBy("monitor.monitor") + @GuardedBy("this") private final NextOffsetTracker nextOffsetTracker = new NextOffsetTracker(); - @GuardedBy("monitor.monitor") + @GuardedBy("this") private final FlowControlBatcher flowControlBatcher = new FlowControlBatcher(); - @GuardedBy("monitor.monitor") + @GuardedBy("this") private SeekRequest initialLocation; - @GuardedBy("monitor.monitor") + @GuardedBy("this") private boolean shutdown = false; @VisibleForTesting @@ -126,20 +123,16 @@ public SubscriberImpl( // ProxyService implementation. @Override - protected void start() { - try (CloseableMonitor.Hold h = monitor.enter()) { - alarmFuture = Optional.of(alarmFactory.newAlarm(this::processBatchFlowRequest)); - } + protected synchronized void start() { + alarmFuture = Optional.of(alarmFactory.newAlarm(this::processBatchFlowRequest)); } @Override - protected void stop() { - try (CloseableMonitor.Hold h = monitor.enter()) { - shutdown = true; - this.alarmFuture.ifPresent(future -> future.cancel(false)); - this.alarmFuture = Optional.empty(); - messageDeliveryExecutor.close(); - } + protected synchronized void stop() { + shutdown = true; + this.alarmFuture.ifPresent(future -> future.cancel(false)); + this.alarmFuture = Optional.empty(); + messageDeliveryExecutor.close(); } @Override @@ -148,36 +141,28 @@ protected void handlePermanentError(CheckedApiException error) { } @Override - public void allowFlow(FlowControlRequest clientRequest) throws CheckedApiException { - try (CloseableMonitor.Hold h = monitor.enter()) { - if (shutdown) return; - flowControlBatcher.onClientFlowRequest(clientRequest); - if (flowControlBatcher.shouldExpediteBatchRequest()) { - connection.modifyConnection( - connectedSubscriber -> connectedSubscriber.ifPresent(this::flushBatchFlowRequest)); - } + public synchronized void allowFlow(FlowControlRequest clientRequest) throws CheckedApiException { + if (shutdown) return; + flowControlBatcher.onClientFlowRequest(clientRequest); + if (flowControlBatcher.shouldExpediteBatchRequest()) { + connection.modifyConnection( + connectedSubscriber -> connectedSubscriber.ifPresent(this::flushBatchFlowRequest)); } } - private SubscribeRequest getInitialRequest() { - try (CloseableMonitor.Hold h = monitor.enter()) { - return SubscribeRequest.newBuilder() - .setInitial( - baseInitialRequest - .toBuilder() - .setInitialLocation( - nextOffsetTracker.requestForRestart().orElse(initialLocation))) - .build(); - } + private synchronized SubscribeRequest getInitialRequest() { + return SubscribeRequest.newBuilder() + .setInitial( + baseInitialRequest + .toBuilder() + .setInitialLocation(nextOffsetTracker.requestForRestart().orElse(initialLocation))) + .build(); } - public void reset() { - try (CloseableMonitor.Hold h = monitor.enter()) { - if (shutdown) return; - nextOffsetTracker.reset(); - initialLocation = - SeekRequest.newBuilder().setNamedTarget(NamedTarget.COMMITTED_CURSOR).build(); - } + public synchronized void reset() { + if (shutdown) return; + nextOffsetTracker.reset(); + initialLocation = SeekRequest.newBuilder().setNamedTarget(NamedTarget.COMMITTED_CURSOR).build(); } @Override @@ -203,43 +188,48 @@ public void triggerReinitialize(CheckedApiException streamError) { } } - try (CloseableMonitor.Hold h = monitor.enter()) { - if (shutdown) return; - connection.reinitialize(getInitialRequest()); - connection.modifyConnection( - connectedSubscriber -> { - checkArgument(monitor.monitor.isOccupiedByCurrentThread()); - checkArgument(connectedSubscriber.isPresent()); - flowControlBatcher - .requestForRestart() - .ifPresent(request -> connectedSubscriber.get().allowFlow(request)); - }); - } catch (CheckedApiException e) { - onPermanentError(e); + try { + doReinitialize(); + } catch (Throwable t) { + onPermanentError(toCanonical(t)); } } + /* GuardedBy can't handle this function. */ + @SuppressWarnings("GuardedBy") + private synchronized void doReinitialize() throws CheckedApiException { + if (shutdown) return; + connection.reinitialize(getInitialRequest()); + connection.modifyConnection( + connectedSubscriber -> { + checkArgument(Thread.holdsLock(this)); + checkArgument(connectedSubscriber.isPresent()); + flowControlBatcher + .requestForRestart() + .ifPresent(request -> connectedSubscriber.get().allowFlow(request)); + }); + } + @Override - public void onClientResponse(List messages) throws CheckedApiException { - try (CloseableMonitor.Hold h = monitor.enter()) { - if (shutdown) return; - nextOffsetTracker.onMessages(messages); - flowControlBatcher.onMessages(messages); - messageDeliveryExecutor.execute( - () -> { - try { - messageConsumer.accept(messages); - } catch (Throwable t) { - logger.atWarning().withCause(t).log( - "Consumer threw an exception- failing subscriber. %s", baseInitialRequest); - onPermanentError(toCanonical(t)); - } - }); - } + public synchronized void onClientResponse(List messages) + throws CheckedApiException { + if (shutdown) return; + nextOffsetTracker.onMessages(messages); + flowControlBatcher.onMessages(messages); + messageDeliveryExecutor.execute( + () -> { + try { + messageConsumer.accept(messages); + } catch (Throwable t) { + logger.atWarning().withCause(t).log( + "Consumer threw an exception- failing subscriber. %s", baseInitialRequest); + onPermanentError(toCanonical(t)); + } + }); } - private void processBatchFlowRequest() { - try (CloseableMonitor.Hold h = monitor.enter()) { + private synchronized void processBatchFlowRequest() { + try { if (shutdown) return; connection.modifyConnection( connectedSubscriber -> connectedSubscriber.ifPresent(this::flushBatchFlowRequest)); @@ -248,9 +238,7 @@ private void processBatchFlowRequest() { } } - private void flushBatchFlowRequest(ConnectedSubscriber subscriber) { - try (CloseableMonitor.Hold h = monitor.enter()) { - flowControlBatcher.releasePendingRequest().ifPresent(subscriber::allowFlow); - } + private synchronized void flushBatchFlowRequest(ConnectedSubscriber subscriber) { + flowControlBatcher.releasePendingRequest().ifPresent(subscriber::allowFlow); } } diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/CloseableMonitorTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/CloseableMonitorTest.java deleted file mode 100755 index eef11f394..000000000 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/CloseableMonitorTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.internal; - -import static com.google.common.truth.Truth.assertThat; - -import com.google.common.util.concurrent.Monitor; -import com.google.errorprone.annotations.concurrent.GuardedBy; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -@RunWith(JUnit4.class) -public final class CloseableMonitorTest { - private final CloseableMonitor monitor = new CloseableMonitor(); - - @GuardedBy("monitor.monitor") - boolean state = false; - - @Test - public void enter() { - assertThat(monitor.monitor.isOccupied()).isFalse(); - try (CloseableMonitor.Hold h = monitor.enter()) { - assertThat(monitor.monitor.isOccupied()).isTrue(); - } - assertThat(monitor.monitor.isOccupied()).isFalse(); - } - - @Test - public void enterWhenUninterruptibly() { - ExecutorService executorService = Executors.newCachedThreadPool(); - executorService.execute( - () -> { - try (CloseableMonitor.Hold h = monitor.enter()) { - state = true; - } - }); - - try (CloseableMonitor.Hold h = - monitor.enterWhenUninterruptibly( - new Monitor.Guard(monitor.monitor) { - @Override - public boolean isSatisfied() { - assertThat(monitor.monitor.isOccupied()).isTrue(); - return state; - } - })) { - assertThat(monitor.monitor.isOccupied()).isTrue(); - assertThat(state).isTrue(); - } - } -} From 5a05c3fa55195d134142e00933b43d6f418fb981 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Fri, 3 Mar 2023 15:24:59 -0500 Subject: [PATCH 2/4] chore: delete CloseableMonitor errorprone is removing support for the annotation this uses --- google-cloud-pubsublite/clirr-ignored-differences.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/google-cloud-pubsublite/clirr-ignored-differences.xml b/google-cloud-pubsublite/clirr-ignored-differences.xml index 5db425733..3feb22082 100644 --- a/google-cloud-pubsublite/clirr-ignored-differences.xml +++ b/google-cloud-pubsublite/clirr-ignored-differences.xml @@ -33,4 +33,9 @@ com/google/cloud/pubsublite/internal/** * + + 8001 + com/google/cloud/pubsublite/internal/** + * + From 66a9df8fec7287cf7908ab6ffafec60913d16b96 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Fri, 3 Mar 2023 16:38:29 -0500 Subject: [PATCH 3/4] chore: delete CloseableMonitor errorprone is removing support for the annotation this uses --- .../pubsublite/internal/wire/SingleConnection.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SingleConnection.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SingleConnection.java index 5b321c526..26bf888f0 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SingleConnection.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SingleConnection.java @@ -103,13 +103,15 @@ protected synchronized void sendToStream(StreamRequestT request) { } protected void sendToClient(ClientResponseT response) { - if (isCompleted()) { - log.atFine().log("Sent response after stream completion: %s", response); - return; + synchronized (this) { + if (isCompleted()) { + log.atFine().log("Sent response after stream completion: %s", response); + return; + } + // This should be impossible to not have received the initial request, or be completed, and + // the caller has access to this object. + Preconditions.checkState(didReceiveInitial()); } - // This should be impossible to not have received the initial request, or be completed, and - // the caller has access to this object. - Preconditions.checkState(didReceiveInitial()); // The upcall may be reentrant, possibly on another thread while this thread is blocked. clientStream.onResponse(response); } From aacd4558a6da0a585718f817dcb35805bb6b6107 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Fri, 3 Mar 2023 21:40:26 +0000 Subject: [PATCH 4/4] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index fab235b97..ec48c59df 100644 --- a/README.md +++ b/README.md @@ -32,13 +32,13 @@ If you are using Maven, add this to your pom.xml file: If you are using Gradle without BOM, add this to your dependencies: ```Groovy -implementation 'com.google.cloud:google-cloud-pubsublite:1.11.0' +implementation 'com.google.cloud:google-cloud-pubsublite:1.11.1' ``` If you are using SBT, add this to your dependencies: ```Scala -libraryDependencies += "com.google.cloud" % "google-cloud-pubsublite" % "1.11.0" +libraryDependencies += "com.google.cloud" % "google-cloud-pubsublite" % "1.11.1" ``` ## Authentication