diff --git a/README.md b/README.md
index fab235b97..ec48c59df 100644
--- a/README.md
+++ b/README.md
@@ -32,13 +32,13 @@ If you are using Maven, add this to your pom.xml file:
If you are using Gradle without BOM, add this to your dependencies:
```Groovy
-implementation 'com.google.cloud:google-cloud-pubsublite:1.11.0'
+implementation 'com.google.cloud:google-cloud-pubsublite:1.11.1'
```
If you are using SBT, add this to your dependencies:
```Scala
-libraryDependencies += "com.google.cloud" % "google-cloud-pubsublite" % "1.11.0"
+libraryDependencies += "com.google.cloud" % "google-cloud-pubsublite" % "1.11.1"
```
## Authentication
diff --git a/google-cloud-pubsublite/clirr-ignored-differences.xml b/google-cloud-pubsublite/clirr-ignored-differences.xml
index 5db425733..3feb22082 100644
--- a/google-cloud-pubsublite/clirr-ignored-differences.xml
+++ b/google-cloud-pubsublite/clirr-ignored-differences.xml
@@ -33,4 +33,9 @@
com/google/cloud/pubsublite/internal/**
*
+
+ 8001
+ com/google/cloud/pubsublite/internal/**
+ *
+
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/CloseableMonitor.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/CloseableMonitor.java
deleted file mode 100755
index ac8ba3ca1..000000000
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/CloseableMonitor.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Copyright 2020 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.google.cloud.pubsublite.internal;
-
-import com.google.common.util.concurrent.Monitor;
-import com.google.errorprone.annotations.concurrent.LockMethod;
-import com.google.errorprone.annotations.concurrent.UnlockMethod;
-
-/** Wraps a Monitor with methods that can be used with try-with-resources. */
-public class CloseableMonitor {
- public final Monitor monitor = new Monitor();
-
- /**
- * try-with-resources wrapper for enterWhenUninterruptibly. For example:
- *
- *
- * final Monitor.Guard guard = new Monitor.Guard(monitor.monitor) {
- * @Override
- * public boolean isSatisfied() {
- * assertThat(monitor.monitor.isOccupied()).isTrue();
- * return state;
- * }
- * };
- *
- * try (CloseableMonitor.Hold h = monitor.enterWhenUninterruptibly(guard)) {
- * // Do stuff
- * }
- * // Monitor is automatically released
- *
- */
- @LockMethod("monitor")
- public Hold enterWhenUninterruptibly(Monitor.Guard condition) {
- monitor.enterWhenUninterruptibly(condition);
- return new Hold();
- }
-
- /**
- * try-with-resources wrapper for enter. For example...
- *
- * {@code
- * try (CloseableMonitor.Hold h = monitor.enter()) {
- * // Do stuff
- * }
- * // Monitor is automatically released
- * }
- */
- @LockMethod("monitor")
- public Hold enter() {
- monitor.enter();
- return new Hold();
- }
-
- /**
- * This is meant for use in the try-with-resources pattern. It will call leave() on a Monitor when
- * it goes out of scope. This is cannot be constructed directly, but through usage of the static
- * utility methods above.
- */
- public class Hold implements AutoCloseable {
-
- private Hold() {}
-
- @UnlockMethod("monitor")
- @Override
- public void close() {
- monitor.leave();
- }
- }
-}
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/MoreApiFutures.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/MoreApiFutures.java
index 3869c06d2..2fbcc66c7 100644
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/MoreApiFutures.java
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/MoreApiFutures.java
@@ -16,11 +16,16 @@
package com.google.cloud.pubsublite.internal;
+import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
+import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
+
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
+import java.util.List;
+import java.util.concurrent.Future;
public final class MoreApiFutures {
private MoreApiFutures() {}
@@ -42,4 +47,18 @@ public void onSuccess(T t) {
},
SystemExecutors.getFuturesExecutor());
}
+
+ public static ApiFuture whenFirstDone(List> futures) {
+ SettableApiFuture anyDone = SettableApiFuture.create();
+ futures.forEach(f -> f.addListener(() -> anyDone.set(null), directExecutor()));
+ return anyDone;
+ }
+
+ public static T get(Future f) {
+ try {
+ return f.get();
+ } catch (Throwable t) {
+ throw toCanonical(t).underlying;
+ }
+ }
}
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/SerialExecutor.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/SerialExecutor.java
index 217b13406..a6856b54f 100644
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/SerialExecutor.java
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/SerialExecutor.java
@@ -16,6 +16,7 @@
package com.google.cloud.pubsublite.internal;
+import com.google.common.util.concurrent.Monitor;
import com.google.common.util.concurrent.Monitor.Guard;
import java.util.ArrayDeque;
import java.util.Queue;
@@ -26,22 +27,22 @@
public final class SerialExecutor implements AutoCloseable, Executor {
private final Executor executor;
- private final CloseableMonitor monitor = new CloseableMonitor();
+ private final Monitor monitor = new Monitor();
private final Guard isInactive =
- new Guard(monitor.monitor) {
+ new Guard(monitor) {
@Override
public boolean isSatisfied() {
return !isTaskActive;
}
};
- @GuardedBy("monitor.monitor")
+ @GuardedBy("monitor")
private final Queue tasks;
- @GuardedBy("monitor.monitor")
+ @GuardedBy("monitor")
private boolean isTaskActive;
- @GuardedBy("monitor.monitor")
+ @GuardedBy("monitor")
private boolean isShutdown;
public SerialExecutor(Executor executor) {
@@ -53,7 +54,7 @@ public SerialExecutor(Executor executor) {
/** Waits until there are no active tasks. */
public void waitUntilInactive() {
- try (CloseableMonitor.Hold h = monitor.enterWhenUninterruptibly(isInactive)) {}
+ monitor.enterWhenUninterruptibly(isInactive);
}
/**
@@ -62,14 +63,18 @@ public void waitUntilInactive() {
*/
@Override
public void close() {
- try (CloseableMonitor.Hold h = monitor.enter()) {
+ monitor.enter();
+ try {
isShutdown = true;
+ } finally {
+ monitor.leave();
}
}
@Override
public void execute(Runnable r) {
- try (CloseableMonitor.Hold h = monitor.enter()) {
+ monitor.enter();
+ try {
if (isShutdown) {
return;
}
@@ -86,21 +91,29 @@ public void execute(Runnable r) {
if (!isTaskActive) {
scheduleNextTask();
}
+ } finally {
+ monitor.leave();
}
}
private boolean shouldExecuteTask() {
- try (CloseableMonitor.Hold h = monitor.enter()) {
+ monitor.enter();
+ try {
return !isShutdown;
+ } finally {
+ monitor.leave();
}
}
private void scheduleNextTask() {
- try (CloseableMonitor.Hold h = monitor.enter()) {
+ monitor.enter();
+ try {
isTaskActive = !tasks.isEmpty() && !isShutdown;
if (isTaskActive) {
executor.execute(tasks.poll());
}
+ } finally {
+ monitor.leave();
}
}
}
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerImpl.java
index 1f545dc4c..0da2db463 100644
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerImpl.java
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerImpl.java
@@ -21,7 +21,6 @@
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.CheckedApiException;
-import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.proto.InitialPartitionAssignmentRequest;
import com.google.cloud.pubsublite.proto.PartitionAssignment;
@@ -41,12 +40,10 @@ public class AssignerImpl extends ProxyService
private final PartitionAssignmentRequest initialRequest;
private final String uuidHex;
- private final CloseableMonitor monitor = new CloseableMonitor();
-
- @GuardedBy("monitor.monitor")
+ @GuardedBy("this")
private final RetryingConnection connection;
- @GuardedBy("monitor.monitor")
+ @GuardedBy("this")
private final PartitionAssignmentReceiver receiver;
@VisibleForTesting
@@ -80,10 +77,8 @@ public AssignerImpl(
}
@Override
- public void triggerReinitialize(CheckedApiException streamError) {
- try (CloseableMonitor.Hold h = monitor.enter()) {
- connection.reinitialize(initialRequest);
- }
+ public synchronized void triggerReinitialize(CheckedApiException streamError) {
+ connection.reinitialize(initialRequest);
}
private static Set toSet(PartitionAssignment assignment) throws ApiException {
@@ -95,13 +90,11 @@ private static Set toSet(PartitionAssignment assignment) throws ApiEx
}
@Override
- public void onClientResponse(PartitionAssignment value) throws CheckedApiException {
- try (CloseableMonitor.Hold h = monitor.enter()) {
- Set partitions = toSet(value);
- logger.atFine().log("Subscriber with uuid %s received assignment: %s", uuidHex, partitions);
- receiver.handleAssignment(partitions);
- logger.atInfo().log("Subscriber with uuid %s handled assignment: %s", uuidHex, partitions);
- connection.modifyConnection(connectionOr -> connectionOr.ifPresent(ConnectedAssigner::ack));
- }
+ public synchronized void onClientResponse(PartitionAssignment value) throws CheckedApiException {
+ Set partitions = toSet(value);
+ logger.atFine().log("Subscriber with uuid %s received assignment: %s", uuidHex, partitions);
+ receiver.handleAssignment(partitions);
+ logger.atInfo().log("Subscriber with uuid %s handled assignment: %s", uuidHex, partitions);
+ connection.modifyConnection(connectionOr -> connectionOr.ifPresent(ConnectedAssigner::ack));
}
}
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterImpl.java
index c5fef43eb..fc57555ea 100755
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterImpl.java
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterImpl.java
@@ -23,7 +23,6 @@
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.CheckedApiException;
-import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.proto.InitialCommitCursorRequest;
import com.google.cloud.pubsublite.proto.SequencedCommitCursorResponse;
@@ -31,6 +30,7 @@
import com.google.cloud.pubsublite.proto.StreamingCommitCursorResponse;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Monitor;
import com.google.common.util.concurrent.Monitor.Guard;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.Optional;
@@ -39,25 +39,25 @@ public class CommitterImpl extends ProxyService
implements Committer, RetryingConnectionObserver {
private final StreamingCommitCursorRequest initialRequest;
- private final CloseableMonitor monitor = new CloseableMonitor();
+ private final Monitor monitor = new Monitor();
private final Guard isEmptyOrError =
- new Guard(monitor.monitor) {
+ new Guard(monitor) {
public boolean isSatisfied() {
// Wait until the state is empty or a permanent error occurred.
return state.isEmpty() || permanentError.isPresent();
}
};
- @GuardedBy("monitor.monitor")
+ @GuardedBy("monitor")
private final RetryingConnection connection;
- @GuardedBy("monitor.monitor")
+ @GuardedBy("monitor")
private boolean shutdown = false;
- @GuardedBy("monitor.monitor")
+ @GuardedBy("monitor")
private Optional permanentError = Optional.empty();
- @GuardedBy("monitor.monitor")
+ @GuardedBy("monitor")
private final CommitState state = new CommitState();
@VisibleForTesting
@@ -83,25 +83,32 @@ public CommitterImpl(
// ProxyService implementation.
@Override
protected void handlePermanentError(CheckedApiException error) {
- try (CloseableMonitor.Hold h = monitor.enter()) {
+ monitor.enter();
+ try {
permanentError = Optional.of(error);
shutdown = true;
state.abort(error);
+ } finally {
+ monitor.leave();
}
}
@Override
protected void stop() {
- try (CloseableMonitor.Hold h = monitor.enter()) {
+ monitor.enter();
+ try {
shutdown = true;
+ monitor.waitForUninterruptibly(isEmptyOrError);
+ } finally {
+ monitor.leave();
}
- try (CloseableMonitor.Hold h = monitor.enterWhenUninterruptibly(isEmptyOrError)) {}
}
// RetryingConnectionObserver implementation.
@Override
public void triggerReinitialize(CheckedApiException streamError) {
- try (CloseableMonitor.Hold h = monitor.enter()) {
+ monitor.enter();
+ try {
connection.reinitialize(initialRequest);
Optional offsetOr = state.reinitializeAndReturnToSend();
if (!offsetOr.isPresent()) return; // There are no outstanding commit requests.
@@ -112,21 +119,27 @@ public void triggerReinitialize(CheckedApiException streamError) {
});
} catch (CheckedApiException e) {
onPermanentError(e);
+ } finally {
+ monitor.leave();
}
}
@Override
public void onClientResponse(SequencedCommitCursorResponse value) throws CheckedApiException {
Preconditions.checkArgument(value.getAcknowledgedCommits() > 0);
- try (CloseableMonitor.Hold h = monitor.enter()) {
+ monitor.enter();
+ try {
state.complete(value.getAcknowledgedCommits());
+ } finally {
+ monitor.leave();
}
}
// Committer implementation.
@Override
public ApiFuture commitOffset(Offset offset) {
- try (CloseableMonitor.Hold h = monitor.enter()) {
+ monitor.enter();
+ try {
checkState(!shutdown, "Committed after the stream shut down.");
connection.modifyConnection(
connectedCommitter ->
@@ -135,15 +148,20 @@ public ApiFuture commitOffset(Offset offset) {
} catch (CheckedApiException e) {
onPermanentError(e);
return ApiFutures.immediateFailedFuture(e);
+ } finally {
+ monitor.leave();
}
}
@Override
public void waitUntilEmpty() throws CheckedApiException {
- try (CloseableMonitor.Hold h = monitor.enterWhenUninterruptibly(isEmptyOrError)) {
+ monitor.enterWhenUninterruptibly(isEmptyOrError);
+ try {
if (permanentError.isPresent()) {
throw permanentError.get();
}
+ } finally {
+ monitor.leave();
}
}
}
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ConnectedAssignerImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ConnectedAssignerImpl.java
index 3585116db..2f45b9f41 100644
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ConnectedAssignerImpl.java
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/ConnectedAssignerImpl.java
@@ -20,7 +20,6 @@
import com.google.api.gax.rpc.ResponseObserver;
import com.google.cloud.pubsublite.internal.CheckedApiException;
-import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.proto.PartitionAssignment;
import com.google.cloud.pubsublite.proto.PartitionAssignmentAck;
import com.google.cloud.pubsublite.proto.PartitionAssignmentRequest;
@@ -32,9 +31,7 @@ public class ConnectedAssignerImpl
implements ConnectedAssigner {
private static final Duration STREAM_IDLE_TIMEOUT = Duration.ofMinutes(10);
- private final CloseableMonitor monitor = new CloseableMonitor();
-
- @GuardedBy("monitor.monitor")
+ @GuardedBy("this")
boolean outstanding = false;
private ConnectedAssignerImpl(
@@ -65,7 +62,7 @@ protected void handleInitialResponse(PartitionAssignment response) throws Checke
@Override
protected void handleStreamResponse(PartitionAssignment response) throws CheckedApiException {
- try (CloseableMonitor.Hold h = monitor.enter()) {
+ synchronized (this) {
checkState(
!outstanding,
"Received assignment from the server while there was an assignment outstanding.");
@@ -76,8 +73,8 @@ protected void handleStreamResponse(PartitionAssignment response) throws Checked
// ConnectedAssigner implementation.
@Override
- public void ack() {
- try (CloseableMonitor.Hold h = monitor.enter()) {
+ public synchronized void ack() {
+ try {
checkState(outstanding, "Client acknowledged when there was no request outstanding.");
outstanding = false;
} catch (CheckedApiException e) {
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java
index 77614d3e5..48aa07e64 100644
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java
@@ -27,7 +27,6 @@
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.CheckedApiException;
-import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.RoutingPolicy;
@@ -89,12 +88,10 @@ public void stop() {
}
}
- private final CloseableMonitor monitor = new CloseableMonitor();
-
- @GuardedBy("monitor.monitor")
+ @GuardedBy("this")
private boolean shutdown = false;
- @GuardedBy("monitor.monitor")
+ @GuardedBy("this")
private Optional partitionsWithRouting = Optional.empty();
PartitionCountWatchingPublisher(
@@ -107,12 +104,13 @@ public void stop() {
addServices(configWatcher, autoCloseableAsApiService(publisherFactory));
}
+ private synchronized Optional getPartitions() {
+ return partitionsWithRouting;
+ }
+
@Override
public ApiFuture publish(PubSubMessage message) {
- Optional partitions;
- try (CloseableMonitor.Hold h = monitor.enter()) {
- partitions = partitionsWithRouting;
- }
+ Optional partitions = getPartitions();
if (!partitions.isPresent()) {
throw new IllegalStateException("Publish called before start or after shutdown");
}
@@ -126,10 +124,7 @@ public ApiFuture publish(PubSubMessage message) {
@Override
public void cancelOutstandingPublishes() {
- Optional partitions;
- try (CloseableMonitor.Hold h = monitor.enter()) {
- partitions = partitionsWithRouting;
- }
+ Optional partitions = getPartitions();
if (!partitions.isPresent()) {
throw new IllegalStateException(
"Cancel outstanding publishes called before start or after shutdown");
@@ -139,10 +134,7 @@ public void cancelOutstandingPublishes() {
@Override
public void flush() throws IOException {
- Optional partitions;
- try (CloseableMonitor.Hold h = monitor.enter()) {
- partitions = partitionsWithRouting;
- }
+ Optional partitions = getPartitions();
if (!partitions.isPresent()) {
throw new IllegalStateException("Publish called before start or after shutdown");
}
@@ -171,39 +163,35 @@ public void failed(State from, Throwable failure) {
return partitions;
}
- private void handleConfig(long partitionCount) {
- try (CloseableMonitor.Hold h = monitor.enter()) {
- if (shutdown) {
- return;
- }
- Optional current = partitionsWithRouting;
- long currentSize = current.map(withRouting -> withRouting.publishers.size()).orElse(0);
- if (partitionCount == currentSize) {
- return;
- }
- if (partitionCount < currentSize) {
- log.atWarning().log(
- "Received an unexpected decrease in partition count. Previous partition count %s, new count %s",
- currentSize, partitionCount);
- return;
- }
- ImmutableMap.Builder> mapBuilder =
- ImmutableMap.builder();
- current.ifPresent(p -> p.publishers.forEach(mapBuilder::put));
- getNewPartitionPublishers(LongStream.range(currentSize, partitionCount))
- .forEach(mapBuilder::put);
-
- partitionsWithRouting =
- Optional.of(
- new PartitionsWithRouting(
- mapBuilder.build(), policyFactory.newPolicy(partitionCount)));
+ private synchronized void handleConfig(long partitionCount) {
+ if (shutdown) {
+ return;
+ }
+ Optional current = partitionsWithRouting;
+ long currentSize = current.map(withRouting -> withRouting.publishers.size()).orElse(0);
+ if (partitionCount == currentSize) {
+ return;
}
+ if (partitionCount < currentSize) {
+ log.atWarning().log(
+ "Received an unexpected decrease in partition count. Previous partition count %s, new count %s",
+ currentSize, partitionCount);
+ return;
+ }
+ ImmutableMap.Builder> mapBuilder = ImmutableMap.builder();
+ current.ifPresent(p -> p.publishers.forEach(mapBuilder::put));
+ getNewPartitionPublishers(LongStream.range(currentSize, partitionCount))
+ .forEach(mapBuilder::put);
+
+ partitionsWithRouting =
+ Optional.of(
+ new PartitionsWithRouting(mapBuilder.build(), policyFactory.newPolicy(partitionCount)));
}
@Override
protected void stop() {
Optional current;
- try (CloseableMonitor.Hold h = monitor.enter()) {
+ synchronized (this) {
shutdown = true;
current = partitionsWithRouting;
partitionsWithRouting = Optional.empty();
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java
index f2600a5c1..e3253d1d6 100755
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java
@@ -31,7 +31,6 @@
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.AlarmFactory;
import com.google.cloud.pubsublite.internal.CheckedApiException;
-import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.PublishSequenceNumber;
import com.google.cloud.pubsublite.internal.SequencedPublisher;
@@ -64,31 +63,27 @@ public final class PublisherImpl extends ProxyService
private final AlarmFactory alarmFactory;
private final PublishRequest initialRequest;
- private final CloseableMonitor monitor = new CloseableMonitor();
+ private final Monitor monitor = new Monitor();
private final Monitor.Guard noneInFlight =
- new Monitor.Guard(monitor.monitor) {
+ new Monitor.Guard(monitor) {
@Override
public boolean isSatisfied() {
return batchesInFlight.isEmpty() || shutdown;
}
};
- @GuardedBy("monitor.monitor")
+ @GuardedBy("monitor")
private Optional> alarmFuture = Optional.empty();
- @GuardedBy("monitor.monitor")
+ @GuardedBy("monitor")
private final RetryingConnection connection;
- @GuardedBy("monitor.monitor")
+ @GuardedBy("monitor")
private boolean shutdown = false;
- @GuardedBy("monitor.monitor")
+ @GuardedBy("monitor")
private Offset lastSentOffset = Offset.of(-1);
- // batcherMonitor is always acquired after monitor.monitor when both are held.
- private final CloseableMonitor batcherMonitor = new CloseableMonitor();
-
- @GuardedBy("batcherMonitor.monitor")
private final SerialBatcher batcher;
private static class InFlightBatch {
@@ -114,7 +109,7 @@ void failBatch(int startIdx, CheckedApiException e) {
}
// An ordered list of batches in flight.
- @GuardedBy("monitor.monitor")
+ @GuardedBy("monitor")
private final Queue batchesInFlight = new ArrayDeque<>();
@VisibleForTesting
@@ -153,7 +148,7 @@ public PublisherImpl(
batchingSettings);
}
- @GuardedBy("monitor.monitor")
+ @GuardedBy("monitor")
private void rebatchForRestart() {
Queue messages =
batchesInFlight.stream()
@@ -188,7 +183,8 @@ private void rebatchForRestart() {
@Override
public void triggerReinitialize(CheckedApiException streamError) {
- try (CloseableMonitor.Hold h = monitor.enter()) {
+ monitor.enter();
+ try {
connection.reinitialize(initialRequest);
rebatchForRestart();
Collection batches = batchesInFlight;
@@ -203,50 +199,59 @@ public void triggerReinitialize(CheckedApiException streamError) {
});
} catch (CheckedApiException e) {
onPermanentError(e);
+ } finally {
+ monitor.leave();
}
}
@Override
protected void handlePermanentError(CheckedApiException error) {
- try (CloseableMonitor.Hold h = monitor.enter()) {
+ monitor.enter();
+ try {
shutdown = true;
this.alarmFuture.ifPresent(future -> future.cancel(false));
this.alarmFuture = Optional.empty();
terminateOutstandingPublishes(error);
+ } finally {
+ monitor.leave();
}
}
@Override
protected void start() {
- try (CloseableMonitor.Hold h = monitor.enter()) {
+ monitor.enter();
+ try {
this.alarmFuture = Optional.of(alarmFactory.newAlarm(this::flushToStream));
+ } finally {
+ monitor.leave();
}
}
@Override
protected void stop() {
flush(); // Flush any outstanding messages that were batched.
- try (CloseableMonitor.Hold h = monitor.enter()) {
+ monitor.enter();
+ try {
shutdown = true;
this.alarmFuture.ifPresent(future -> future.cancel(false));
this.alarmFuture = Optional.empty();
+ } finally {
+ monitor.leave();
}
flush(); // Flush again in case messages were added since shutdown was set.
}
- @GuardedBy("monitor.monitor")
+ @GuardedBy("monitor")
private void terminateOutstandingPublishes(CheckedApiException e) {
batchesInFlight.forEach(
batch -> batch.messages.forEach(message -> message.future().setException(e)));
- try (CloseableMonitor.Hold h = batcherMonitor.enter()) {
- batcher.flush().forEach(batch -> batch.forEach(m -> m.future().setException(e)));
- }
+ batcher.flush().forEach(batch -> batch.forEach(m -> m.future().setException(e)));
batchesInFlight.clear();
}
@Override
public ApiFuture publish(PubSubMessage message, PublishSequenceNumber sequenceNumber) {
- try (CloseableMonitor.Hold h = batcherMonitor.enter()) {
+ try {
ApiService.State currentState = state();
switch (currentState) {
case FAILED:
@@ -270,28 +275,30 @@ public ApiFuture publish(PubSubMessage message, PublishSequenceNumber se
@Override
public void cancelOutstandingPublishes() {
- try (CloseableMonitor.Hold h = monitor.enter()) {
+ monitor.enter();
+ try {
terminateOutstandingPublishes(
new CheckedApiException("Cancelled by client.", Code.CANCELLED));
+ } finally {
+ monitor.leave();
}
}
private void flushToStream() {
- try (CloseableMonitor.Hold h = monitor.enter()) {
+ monitor.enter();
+ try {
if (shutdown) return;
- List> batches;
- try (CloseableMonitor.Hold h2 = batcherMonitor.enter()) {
- batches = batcher.flush();
- }
- for (List batch : batches) {
+ for (List batch : batcher.flush()) {
processBatch(batch);
}
} catch (CheckedApiException e) {
onPermanentError(e);
+ } finally {
+ monitor.leave();
}
}
- @GuardedBy("monitor.monitor")
+ @GuardedBy("monitor")
private void processBatch(List batch) throws CheckedApiException {
if (batch.isEmpty()) return;
InFlightBatch inFlightBatch = new InFlightBatch(batch);
@@ -309,7 +316,8 @@ private void processBatch(List batch) throws CheckedApiExcepti
@Override
public void flush() {
flushToStream();
- try (CloseableMonitor.Hold h = monitor.enterWhenUninterruptibly(noneInFlight)) {}
+ monitor.enterWhenUninterruptibly(noneInFlight);
+ monitor.leave();
}
@Override
@@ -318,7 +326,8 @@ public void onClientResponse(MessagePublishResponse publishResponse) throws Chec
ImmutableList ranges =
ImmutableList.sortedCopyOf(
comparing(CursorRange::getStartIndex), publishResponse.getCursorRangesList());
- try (CloseableMonitor.Hold h = monitor.enter()) {
+ monitor.enter();
+ try {
checkState(
!batchesInFlight.isEmpty(), "Received publish response with no batches in flight.");
InFlightBatch batch = batchesInFlight.remove();
@@ -363,6 +372,8 @@ public void onClientResponse(MessagePublishResponse publishResponse) throws Chec
throw e;
}
}
+ } finally {
+ monitor.leave();
}
}
}
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RetryingConnectionImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RetryingConnectionImpl.java
index d1e87def0..cf2e97edf 100755
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RetryingConnectionImpl.java
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/RetryingConnectionImpl.java
@@ -24,7 +24,6 @@
import com.google.api.gax.rpc.StreamController;
import com.google.cloud.pubsublite.ErrorCodes;
import com.google.cloud.pubsublite.internal.CheckedApiException;
-import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.common.flogger.GoogleLogger;
import java.time.Duration;
@@ -55,19 +54,18 @@ class RetryingConnectionImpl<
connectionFactory;
private final RetryingConnectionObserver observer;
- // connectionMonitor will not be held in any upcalls.
- private final CloseableMonitor connectionMonitor = new CloseableMonitor();
+ // "this" will not be held in any upcalls.
- @GuardedBy("connectionMonitor.monitor")
+ @GuardedBy("this")
private long nextRetryBackoffDuration = INITIAL_RECONNECT_BACKOFF_TIME.toMillis();
- @GuardedBy("connectionMonitor.monitor")
+ @GuardedBy("this")
private StreamRequestT lastInitialRequest;
- @GuardedBy("connectionMonitor.monitor")
+ @GuardedBy("this")
private ConnectionT currentConnection;
- @GuardedBy("connectionMonitor.monitor")
+ @GuardedBy("this")
private boolean completed = false;
RetryingConnectionImpl(
@@ -83,11 +81,8 @@ class RetryingConnectionImpl<
}
@Override
- protected void doStart() {
- StreamRequestT initialInitialRequest;
- try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
- initialInitialRequest = lastInitialRequest;
- }
+ protected synchronized void doStart() {
+ StreamRequestT initialInitialRequest = lastInitialRequest;
SystemExecutors.getFuturesExecutor()
.execute(
() -> {
@@ -98,19 +93,17 @@ protected void doStart() {
// Reinitialize the stream. Must be called in a downcall to prevent deadlock.
@Override
- public void reinitialize(StreamRequestT initialRequest) {
- try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
- if (completed) return;
- lastInitialRequest = initialRequest;
- logger.atFiner().log("Start initializing connection for %s", streamDescription());
- currentConnection = connectionFactory.New(streamFactory, this, lastInitialRequest);
- logger.atFiner().log("Finished initializing connection for %s", streamDescription());
- }
+ public synchronized void reinitialize(StreamRequestT initialRequest) {
+ if (completed) return;
+ lastInitialRequest = initialRequest;
+ logger.atFiner().log("Start initializing connection for %s", streamDescription());
+ currentConnection = connectionFactory.New(streamFactory, this, lastInitialRequest);
+ logger.atFiner().log("Finished initializing connection for %s", streamDescription());
}
@Override
- protected void doStop() {
- try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
+ protected synchronized void doStop() {
+ try {
if (completed) return;
completed = true;
logger.atFine().log("Terminating connection for %s", streamDescription());
@@ -128,21 +121,18 @@ protected void doStop() {
// Run modification on the current connection or empty if not connected.
@Override
- public void modifyConnection(Modifier modifier) throws CheckedApiException {
- try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
- if (completed) {
- modifier.modify(Optional.empty());
- } else {
- modifier.modify(Optional.of(currentConnection));
- }
+ public synchronized void modifyConnection(Modifier modifier)
+ throws CheckedApiException {
+ if (completed) {
+ modifier.modify(Optional.empty());
+ } else {
+ modifier.modify(Optional.of(currentConnection));
}
}
- void setPermanentError(Throwable error) {
- try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
- if (completed) return;
- completed = true;
- }
+ synchronized void setPermanentError(Throwable error) {
+ if (completed) return;
+ completed = true;
logger.atInfo().withCause(error).log("Permanent error occurred for %s", streamDescription());
notifyFailed(error);
}
@@ -156,7 +146,7 @@ public void onStart(StreamController controller) {
// ResponseObserver implementation
@Override
public final void onResponse(ClientResponseT value) {
- try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
+ synchronized (this) {
if (completed) return;
nextRetryBackoffDuration = INITIAL_RECONNECT_BACKOFF_TIME.toMillis();
}
@@ -181,16 +171,15 @@ public final void onError(Throwable t) {
}
Optional throwable = Optional.empty();
long backoffTime = 0;
- try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
- if (currentConnection != null) {
- currentConnection.close();
+ try {
+ synchronized (this) {
+ if (currentConnection != null) {
+ currentConnection.close();
+ }
+ backoffTime = nextRetryBackoffDuration;
+ nextRetryBackoffDuration = Math.min(backoffTime * 2, MAX_RECONNECT_BACKOFF_TIME.toMillis());
}
- backoffTime = nextRetryBackoffDuration;
- nextRetryBackoffDuration = Math.min(backoffTime * 2, MAX_RECONNECT_BACKOFF_TIME.toMillis());
} catch (Throwable t2) {
- throwable = Optional.of(t2);
- }
- if (throwable.isPresent()) {
setPermanentError(
new CheckedApiException(
"Failed to close preexisting stream after error.",
@@ -225,7 +214,7 @@ private void triggerReinitialize(CheckedApiException streamError) {
public final void onComplete() {
logger.atFine().log("Stream completed for %s", streamDescription());
boolean expectedCompletion;
- try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
+ synchronized (this) {
expectedCompletion = completed;
}
if (!expectedCompletion) {
@@ -234,9 +223,7 @@ public final void onComplete() {
}
}
- private String streamDescription() {
- try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
- return lastInitialRequest.getClass().getSimpleName() + ": " + lastInitialRequest.toString();
- }
+ private synchronized String streamDescription() {
+ return lastInitialRequest.getClass().getSimpleName() + ": " + lastInitialRequest.toString();
}
}
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SerialBatcher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SerialBatcher.java
index bbeb71d40..df3ee03d9 100755
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SerialBatcher.java
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SerialBatcher.java
@@ -29,7 +29,7 @@
import java.util.Deque;
import java.util.List;
-// A thread compatible batcher which preserves message order.
+// A thread safe batcher which preserves message order.
class SerialBatcher {
private final long byteLimit;
private final long messageLimit;
@@ -64,7 +64,7 @@ private static boolean hasSequenceDiscontinuity(
this.messageLimit = messageLimit;
}
- ApiFuture add(PubSubMessage message, PublishSequenceNumber sequenceNumber)
+ synchronized ApiFuture add(PubSubMessage message, PublishSequenceNumber sequenceNumber)
throws CheckedApiException {
if (!messages.isEmpty()
&& hasSequenceDiscontinuity(messages.peekLast().sequenceNumber(), sequenceNumber)) {
@@ -79,7 +79,7 @@ && hasSequenceDiscontinuity(messages.peekLast().sequenceNumber(), sequenceNumber
return future;
}
- List> flush() {
+ synchronized List> flush() {
List> toReturn = new ArrayList<>();
List currentBatch = new ArrayList<>();
toReturn.add(currentBatch);
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SingleConnection.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SingleConnection.java
index 1b181c1c1..26bf888f0 100755
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SingleConnection.java
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SingleConnection.java
@@ -16,17 +16,21 @@
package com.google.cloud.pubsublite.internal.wire;
+import static com.google.cloud.pubsublite.internal.MoreApiFutures.whenFirstDone;
+import static javax.swing.UIManager.get;
+
+import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.ClientStream;
import com.google.api.gax.rpc.ResponseObserver;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.StreamController;
import com.google.cloud.pubsublite.internal.CheckedApiException;
-import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
import com.google.common.flogger.GoogleLogger;
-import com.google.common.util.concurrent.Monitor.Guard;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.time.Duration;
+import java.util.concurrent.Future;
/**
* A SingleConnection handles the state for a stream with an initial connection request that may
@@ -47,13 +51,11 @@ public abstract class SingleConnection receivedInitial = SettableApiFuture.create();
- @GuardedBy("connectionMonitor.monitor")
- private boolean completed = false;
+ @GuardedBy("this")
+ private final SettableApiFuture completed = SettableApiFuture.create();
protected abstract void handleInitialResponse(StreamResponseT response)
throws CheckedApiException;
@@ -82,38 +84,33 @@ protected void initialize(StreamRequestT initialRequest) {
if (!expectInitial) {
return;
}
- try (CloseableMonitor.Hold h =
- connectionMonitor.enterWhenUninterruptibly(
- new Guard(connectionMonitor.monitor) {
- @Override
- public boolean isSatisfied() {
- return receivedInitial || completed;
- }
- })) {}
- }
-
- protected void sendToStream(StreamRequestT request) {
- try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
- if (completed) {
- log.atFine().log("Sent request after stream completion: %s", request);
- return;
- }
- // This should be impossible to not have received the initial request, or be completed, and
- // the caller has access to this object.
- Preconditions.checkState(receivedInitial);
- requestStream.send(request);
+ get(receivedInitialOrDone());
+ }
+
+ private synchronized Future receivedInitialOrDone() {
+ return whenFirstDone(ImmutableList.of(receivedInitial, completed));
+ }
+
+ protected synchronized void sendToStream(StreamRequestT request) {
+ if (isCompleted()) {
+ log.atFine().log("Sent request after stream completion: %s", request);
+ return;
}
+ // This should be impossible to not have received the initial request, or be completed, and
+ // the caller has access to this object.
+ Preconditions.checkState(didReceiveInitial());
+ requestStream.send(request);
}
protected void sendToClient(ClientResponseT response) {
- try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
- if (completed) {
+ synchronized (this) {
+ if (isCompleted()) {
log.atFine().log("Sent response after stream completion: %s", response);
return;
}
// This should be impossible to not have received the initial request, or be completed, and
// the caller has access to this object.
- Preconditions.checkState(receivedInitial);
+ Preconditions.checkState(didReceiveInitial());
}
// The upcall may be reentrant, possibly on another thread while this thread is blocked.
clientStream.onResponse(response);
@@ -123,20 +120,22 @@ protected void setError(CheckedApiException error) {
abort(error);
}
- protected boolean isCompleted() {
- try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
- return completed;
- }
+ protected synchronized boolean isCompleted() {
+ return completed.isDone();
+ }
+
+ private synchronized boolean didReceiveInitial() {
+ return receivedInitial.isDone();
}
// Records the connection as completed and performs tear down, if not already completed. Returns
// whether the connection was already complete.
- private boolean completeStream() {
- try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
- if (completed) {
+ private synchronized boolean completeStream() {
+ try {
+ if (isCompleted()) {
return true;
}
- completed = true;
+ completed.set(null);
streamIdleTimer.close();
} catch (Exception e) {
log.atSevere().withCause(e).log("Error occurred while shutting down connection.");
@@ -168,14 +167,14 @@ public void onStart(StreamController streamController) {}
@Override
public void onResponse(StreamResponseT response) {
boolean isFirst;
- try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
+ synchronized (this) {
streamIdleTimer.restart();
- if (completed) {
+ if (isCompleted()) {
log.atFine().log("Received response on stream after completion: %s", response);
return;
}
- isFirst = !receivedInitial;
- receivedInitial = true;
+ isFirst = !didReceiveInitial();
+ receivedInitial.set(null);
}
try {
if (isFirst) {
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java
index f0d320504..af3d8c066 100755
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java
@@ -22,7 +22,6 @@
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.internal.AlarmFactory;
import com.google.cloud.pubsublite.internal.CheckedApiException;
-import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.SerialExecutor;
@@ -62,24 +61,22 @@ public class SubscriberImpl extends ProxyService
// Used to ensure messages are delivered to consumers in order.
private final SerialExecutor messageDeliveryExecutor;
- private final CloseableMonitor monitor = new CloseableMonitor();
-
- @GuardedBy("monitor.monitor")
+ @GuardedBy("this")
private Optional> alarmFuture = Optional.empty();
- @GuardedBy("monitor.monitor")
+ @GuardedBy("this")
private final RetryingConnection connection;
- @GuardedBy("monitor.monitor")
+ @GuardedBy("this")
private final NextOffsetTracker nextOffsetTracker = new NextOffsetTracker();
- @GuardedBy("monitor.monitor")
+ @GuardedBy("this")
private final FlowControlBatcher flowControlBatcher = new FlowControlBatcher();
- @GuardedBy("monitor.monitor")
+ @GuardedBy("this")
private SeekRequest initialLocation;
- @GuardedBy("monitor.monitor")
+ @GuardedBy("this")
private boolean shutdown = false;
@VisibleForTesting
@@ -126,20 +123,16 @@ public SubscriberImpl(
// ProxyService implementation.
@Override
- protected void start() {
- try (CloseableMonitor.Hold h = monitor.enter()) {
- alarmFuture = Optional.of(alarmFactory.newAlarm(this::processBatchFlowRequest));
- }
+ protected synchronized void start() {
+ alarmFuture = Optional.of(alarmFactory.newAlarm(this::processBatchFlowRequest));
}
@Override
- protected void stop() {
- try (CloseableMonitor.Hold h = monitor.enter()) {
- shutdown = true;
- this.alarmFuture.ifPresent(future -> future.cancel(false));
- this.alarmFuture = Optional.empty();
- messageDeliveryExecutor.close();
- }
+ protected synchronized void stop() {
+ shutdown = true;
+ this.alarmFuture.ifPresent(future -> future.cancel(false));
+ this.alarmFuture = Optional.empty();
+ messageDeliveryExecutor.close();
}
@Override
@@ -148,36 +141,28 @@ protected void handlePermanentError(CheckedApiException error) {
}
@Override
- public void allowFlow(FlowControlRequest clientRequest) throws CheckedApiException {
- try (CloseableMonitor.Hold h = monitor.enter()) {
- if (shutdown) return;
- flowControlBatcher.onClientFlowRequest(clientRequest);
- if (flowControlBatcher.shouldExpediteBatchRequest()) {
- connection.modifyConnection(
- connectedSubscriber -> connectedSubscriber.ifPresent(this::flushBatchFlowRequest));
- }
+ public synchronized void allowFlow(FlowControlRequest clientRequest) throws CheckedApiException {
+ if (shutdown) return;
+ flowControlBatcher.onClientFlowRequest(clientRequest);
+ if (flowControlBatcher.shouldExpediteBatchRequest()) {
+ connection.modifyConnection(
+ connectedSubscriber -> connectedSubscriber.ifPresent(this::flushBatchFlowRequest));
}
}
- private SubscribeRequest getInitialRequest() {
- try (CloseableMonitor.Hold h = monitor.enter()) {
- return SubscribeRequest.newBuilder()
- .setInitial(
- baseInitialRequest
- .toBuilder()
- .setInitialLocation(
- nextOffsetTracker.requestForRestart().orElse(initialLocation)))
- .build();
- }
+ private synchronized SubscribeRequest getInitialRequest() {
+ return SubscribeRequest.newBuilder()
+ .setInitial(
+ baseInitialRequest
+ .toBuilder()
+ .setInitialLocation(nextOffsetTracker.requestForRestart().orElse(initialLocation)))
+ .build();
}
- public void reset() {
- try (CloseableMonitor.Hold h = monitor.enter()) {
- if (shutdown) return;
- nextOffsetTracker.reset();
- initialLocation =
- SeekRequest.newBuilder().setNamedTarget(NamedTarget.COMMITTED_CURSOR).build();
- }
+ public synchronized void reset() {
+ if (shutdown) return;
+ nextOffsetTracker.reset();
+ initialLocation = SeekRequest.newBuilder().setNamedTarget(NamedTarget.COMMITTED_CURSOR).build();
}
@Override
@@ -203,43 +188,48 @@ public void triggerReinitialize(CheckedApiException streamError) {
}
}
- try (CloseableMonitor.Hold h = monitor.enter()) {
- if (shutdown) return;
- connection.reinitialize(getInitialRequest());
- connection.modifyConnection(
- connectedSubscriber -> {
- checkArgument(monitor.monitor.isOccupiedByCurrentThread());
- checkArgument(connectedSubscriber.isPresent());
- flowControlBatcher
- .requestForRestart()
- .ifPresent(request -> connectedSubscriber.get().allowFlow(request));
- });
- } catch (CheckedApiException e) {
- onPermanentError(e);
+ try {
+ doReinitialize();
+ } catch (Throwable t) {
+ onPermanentError(toCanonical(t));
}
}
+ /* GuardedBy can't handle this function. */
+ @SuppressWarnings("GuardedBy")
+ private synchronized void doReinitialize() throws CheckedApiException {
+ if (shutdown) return;
+ connection.reinitialize(getInitialRequest());
+ connection.modifyConnection(
+ connectedSubscriber -> {
+ checkArgument(Thread.holdsLock(this));
+ checkArgument(connectedSubscriber.isPresent());
+ flowControlBatcher
+ .requestForRestart()
+ .ifPresent(request -> connectedSubscriber.get().allowFlow(request));
+ });
+ }
+
@Override
- public void onClientResponse(List messages) throws CheckedApiException {
- try (CloseableMonitor.Hold h = monitor.enter()) {
- if (shutdown) return;
- nextOffsetTracker.onMessages(messages);
- flowControlBatcher.onMessages(messages);
- messageDeliveryExecutor.execute(
- () -> {
- try {
- messageConsumer.accept(messages);
- } catch (Throwable t) {
- logger.atWarning().withCause(t).log(
- "Consumer threw an exception- failing subscriber. %s", baseInitialRequest);
- onPermanentError(toCanonical(t));
- }
- });
- }
+ public synchronized void onClientResponse(List messages)
+ throws CheckedApiException {
+ if (shutdown) return;
+ nextOffsetTracker.onMessages(messages);
+ flowControlBatcher.onMessages(messages);
+ messageDeliveryExecutor.execute(
+ () -> {
+ try {
+ messageConsumer.accept(messages);
+ } catch (Throwable t) {
+ logger.atWarning().withCause(t).log(
+ "Consumer threw an exception- failing subscriber. %s", baseInitialRequest);
+ onPermanentError(toCanonical(t));
+ }
+ });
}
- private void processBatchFlowRequest() {
- try (CloseableMonitor.Hold h = monitor.enter()) {
+ private synchronized void processBatchFlowRequest() {
+ try {
if (shutdown) return;
connection.modifyConnection(
connectedSubscriber -> connectedSubscriber.ifPresent(this::flushBatchFlowRequest));
@@ -248,9 +238,7 @@ private void processBatchFlowRequest() {
}
}
- private void flushBatchFlowRequest(ConnectedSubscriber subscriber) {
- try (CloseableMonitor.Hold h = monitor.enter()) {
- flowControlBatcher.releasePendingRequest().ifPresent(subscriber::allowFlow);
- }
+ private synchronized void flushBatchFlowRequest(ConnectedSubscriber subscriber) {
+ flowControlBatcher.releasePendingRequest().ifPresent(subscriber::allowFlow);
}
}
diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/CloseableMonitorTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/CloseableMonitorTest.java
deleted file mode 100755
index eef11f394..000000000
--- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/CloseableMonitorTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright 2020 Google LLC
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.google.cloud.pubsublite.internal;
-
-import static com.google.common.truth.Truth.assertThat;
-
-import com.google.common.util.concurrent.Monitor;
-import com.google.errorprone.annotations.concurrent.GuardedBy;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-@RunWith(JUnit4.class)
-public final class CloseableMonitorTest {
- private final CloseableMonitor monitor = new CloseableMonitor();
-
- @GuardedBy("monitor.monitor")
- boolean state = false;
-
- @Test
- public void enter() {
- assertThat(monitor.monitor.isOccupied()).isFalse();
- try (CloseableMonitor.Hold h = monitor.enter()) {
- assertThat(monitor.monitor.isOccupied()).isTrue();
- }
- assertThat(monitor.monitor.isOccupied()).isFalse();
- }
-
- @Test
- public void enterWhenUninterruptibly() {
- ExecutorService executorService = Executors.newCachedThreadPool();
- executorService.execute(
- () -> {
- try (CloseableMonitor.Hold h = monitor.enter()) {
- state = true;
- }
- });
-
- try (CloseableMonitor.Hold h =
- monitor.enterWhenUninterruptibly(
- new Monitor.Guard(monitor.monitor) {
- @Override
- public boolean isSatisfied() {
- assertThat(monitor.monitor.isOccupied()).isTrue();
- return state;
- }
- })) {
- assertThat(monitor.monitor.isOccupied()).isTrue();
- assertThat(state).isTrue();
- }
- }
-}