Skip to content

Commit 2e13e40

Browse files
chore: delete CloseableMonitor
errorprone is removing support for the annotation this uses
1 parent 0c01d49 commit 2e13e40

File tree

13 files changed

+314
-453
lines changed

13 files changed

+314
-453
lines changed

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/CloseableMonitor.java

Lines changed: 0 additions & 82 deletions
This file was deleted.

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/MoreApiFutures.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,16 @@
1616

1717
package com.google.cloud.pubsublite.internal;
1818

19+
import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
20+
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
21+
1922
import com.google.api.core.ApiFuture;
2023
import com.google.api.core.ApiFutureCallback;
2124
import com.google.api.core.ApiFutures;
2225
import com.google.api.core.SettableApiFuture;
2326
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
27+
import java.util.List;
28+
import java.util.concurrent.Future;
2429

2530
public final class MoreApiFutures {
2631
private MoreApiFutures() {}
@@ -42,4 +47,18 @@ public void onSuccess(T t) {
4247
},
4348
SystemExecutors.getFuturesExecutor());
4449
}
50+
51+
public static ApiFuture<Void> whenFirstDone(List<ApiFuture<?>> futures) {
52+
SettableApiFuture<Void> anyDone = SettableApiFuture.create();
53+
futures.forEach(f -> f.addListener(() -> anyDone.set(null), directExecutor()));
54+
return anyDone;
55+
}
56+
57+
public static <T> T get(Future<T> f) {
58+
try {
59+
return f.get();
60+
} catch (Throwable t) {
61+
throw toCanonical(t).underlying;
62+
}
63+
}
4564
}

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/SerialExecutor.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.google.cloud.pubsublite.internal;
1818

19+
import com.google.common.util.concurrent.Monitor;
1920
import com.google.common.util.concurrent.Monitor.Guard;
2021
import java.util.ArrayDeque;
2122
import java.util.Queue;
@@ -26,22 +27,22 @@
2627
public final class SerialExecutor implements AutoCloseable, Executor {
2728
private final Executor executor;
2829

29-
private final CloseableMonitor monitor = new CloseableMonitor();
30+
private final Monitor monitor = new Monitor();
3031
private final Guard isInactive =
31-
new Guard(monitor.monitor) {
32+
new Guard(monitor) {
3233
@Override
3334
public boolean isSatisfied() {
3435
return !isTaskActive;
3536
}
3637
};
3738

38-
@GuardedBy("monitor.monitor")
39+
@GuardedBy("monitor")
3940
private final Queue<Runnable> tasks;
4041

41-
@GuardedBy("monitor.monitor")
42+
@GuardedBy("monitor")
4243
private boolean isTaskActive;
4344

44-
@GuardedBy("monitor.monitor")
45+
@GuardedBy("monitor")
4546
private boolean isShutdown;
4647

4748
public SerialExecutor(Executor executor) {
@@ -53,7 +54,7 @@ public SerialExecutor(Executor executor) {
5354

5455
/** Waits until there are no active tasks. */
5556
public void waitUntilInactive() {
56-
try (CloseableMonitor.Hold h = monitor.enterWhenUninterruptibly(isInactive)) {}
57+
monitor.enterWhenUninterruptibly(isInactive);
5758
}
5859

5960
/**
@@ -62,14 +63,18 @@ public void waitUntilInactive() {
6263
*/
6364
@Override
6465
public void close() {
65-
try (CloseableMonitor.Hold h = monitor.enter()) {
66+
monitor.enter();
67+
try {
6668
isShutdown = true;
69+
} finally {
70+
monitor.leave();
6771
}
6872
}
6973

7074
@Override
7175
public void execute(Runnable r) {
72-
try (CloseableMonitor.Hold h = monitor.enter()) {
76+
monitor.enter();
77+
try {
7378
if (isShutdown) {
7479
return;
7580
}
@@ -86,21 +91,29 @@ public void execute(Runnable r) {
8691
if (!isTaskActive) {
8792
scheduleNextTask();
8893
}
94+
} finally {
95+
monitor.leave();
8996
}
9097
}
9198

9299
private boolean shouldExecuteTask() {
93-
try (CloseableMonitor.Hold h = monitor.enter()) {
100+
monitor.enter();
101+
try {
94102
return !isShutdown;
103+
} finally {
104+
monitor.leave();
95105
}
96106
}
97107

98108
private void scheduleNextTask() {
99-
try (CloseableMonitor.Hold h = monitor.enter()) {
109+
monitor.enter();
110+
try {
100111
isTaskActive = !tasks.isEmpty() && !isShutdown;
101112
if (isTaskActive) {
102113
executor.execute(tasks.poll());
103114
}
115+
} finally {
116+
monitor.leave();
104117
}
105118
}
106119
}

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerImpl.java

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.google.api.gax.rpc.ApiException;
2222
import com.google.cloud.pubsublite.Partition;
2323
import com.google.cloud.pubsublite.internal.CheckedApiException;
24-
import com.google.cloud.pubsublite.internal.CloseableMonitor;
2524
import com.google.cloud.pubsublite.internal.ProxyService;
2625
import com.google.cloud.pubsublite.proto.InitialPartitionAssignmentRequest;
2726
import com.google.cloud.pubsublite.proto.PartitionAssignment;
@@ -41,12 +40,10 @@ public class AssignerImpl extends ProxyService
4140
private final PartitionAssignmentRequest initialRequest;
4241
private final String uuidHex;
4342

44-
private final CloseableMonitor monitor = new CloseableMonitor();
45-
46-
@GuardedBy("monitor.monitor")
43+
@GuardedBy("this")
4744
private final RetryingConnection<PartitionAssignmentRequest, ConnectedAssigner> connection;
4845

49-
@GuardedBy("monitor.monitor")
46+
@GuardedBy("this")
5047
private final PartitionAssignmentReceiver receiver;
5148

5249
@VisibleForTesting
@@ -80,10 +77,8 @@ public AssignerImpl(
8077
}
8178

8279
@Override
83-
public void triggerReinitialize(CheckedApiException streamError) {
84-
try (CloseableMonitor.Hold h = monitor.enter()) {
85-
connection.reinitialize(initialRequest);
86-
}
80+
public synchronized void triggerReinitialize(CheckedApiException streamError) {
81+
connection.reinitialize(initialRequest);
8782
}
8883

8984
private static Set<Partition> toSet(PartitionAssignment assignment) throws ApiException {
@@ -95,13 +90,11 @@ private static Set<Partition> toSet(PartitionAssignment assignment) throws ApiEx
9590
}
9691

9792
@Override
98-
public void onClientResponse(PartitionAssignment value) throws CheckedApiException {
99-
try (CloseableMonitor.Hold h = monitor.enter()) {
100-
Set<Partition> partitions = toSet(value);
101-
logger.atFine().log("Subscriber with uuid %s received assignment: %s", uuidHex, partitions);
102-
receiver.handleAssignment(partitions);
103-
logger.atInfo().log("Subscriber with uuid %s handled assignment: %s", uuidHex, partitions);
104-
connection.modifyConnection(connectionOr -> connectionOr.ifPresent(ConnectedAssigner::ack));
105-
}
93+
public synchronized void onClientResponse(PartitionAssignment value) throws CheckedApiException {
94+
Set<Partition> partitions = toSet(value);
95+
logger.atFine().log("Subscriber with uuid %s received assignment: %s", uuidHex, partitions);
96+
receiver.handleAssignment(partitions);
97+
logger.atInfo().log("Subscriber with uuid %s handled assignment: %s", uuidHex, partitions);
98+
connection.modifyConnection(connectionOr -> connectionOr.ifPresent(ConnectedAssigner::ack));
10699
}
107100
}

0 commit comments

Comments
 (0)