Skip to content

Commit 3197cc0

Browse files
authored
fix: avoid blocking alarm pool during stream reconnects (#1400)
* fix: avoid blocking alarm pool during stream reconnects Do not process background requests in alarm pool while publish and subscribe streams are reconnecting. Avoid grabbing the same lock as reinitialize, which will be held until the stream is successfully reconnected. Also reverts #1394, which can cause thread explosion. * Remove unintended change
1 parent dca41ba commit 3197cc0

File tree

3 files changed

+40
-9
lines changed

3 files changed

+40
-9
lines changed

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/AlarmFactory.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,11 +47,4 @@ static AlarmFactory create(Duration duration) {
4747
duration.toNanos(),
4848
NANOSECONDS);
4949
}
50-
51-
/** Runnable is executed by an unbounded pool, although the alarm pool is bounded. */
52-
static AlarmFactory createUnbounded(Duration duration) {
53-
AlarmFactory underlying = create(duration);
54-
return runnable ->
55-
underlying.newAlarm(() -> SystemExecutors.getFuturesExecutor().execute(runnable));
56-
}
5750
}

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherImpl.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,12 @@ void failBatch(int startIdx, CheckedApiException e) {
117117
@GuardedBy("monitor.monitor")
118118
private final Queue<InFlightBatch> batchesInFlight = new ArrayDeque<>();
119119

120+
// reconnectingMonitor is always acquired after monitor.monitor when both are held.
121+
private final CloseableMonitor reconnectingMonitor = new CloseableMonitor();
122+
123+
@GuardedBy("reconnectingMonitor.monitor")
124+
private boolean reconnecting = false;
125+
120126
@VisibleForTesting
121127
PublisherImpl(
122128
PublishStreamFactory streamFactory,
@@ -146,7 +152,7 @@ public PublisherImpl(
146152
this(
147153
streamFactory,
148154
new BatchPublisherImpl.Factory(),
149-
AlarmFactory.createUnbounded(
155+
AlarmFactory.create(
150156
Duration.ofNanos(
151157
Objects.requireNonNull(batchingSettings.getDelayThreshold()).toNanos())),
152158
initialRequest,
@@ -189,6 +195,9 @@ private void rebatchForRestart() {
189195
@Override
190196
public void triggerReinitialize(CheckedApiException streamError) {
191197
try (CloseableMonitor.Hold h = monitor.enter()) {
198+
try (CloseableMonitor.Hold rh = reconnectingMonitor.enter()) {
199+
reconnecting = true;
200+
}
192201
connection.reinitialize(initialRequest);
193202
rebatchForRestart();
194203
Collection<InFlightBatch> batches = batchesInFlight;
@@ -201,6 +210,9 @@ public void triggerReinitialize(CheckedApiException streamError) {
201210
.get()
202211
.publish(batch.messagesToSend(), batch.firstSequenceNumber()));
203212
});
213+
try (CloseableMonitor.Hold rh = reconnectingMonitor.enter()) {
214+
reconnecting = false;
215+
}
204216
} catch (CheckedApiException e) {
205217
onPermanentError(e);
206218
}
@@ -219,7 +231,7 @@ protected void handlePermanentError(CheckedApiException error) {
219231
@Override
220232
protected void start() {
221233
try (CloseableMonitor.Hold h = monitor.enter()) {
222-
this.alarmFuture = Optional.of(alarmFactory.newAlarm(this::flushToStream));
234+
this.alarmFuture = Optional.of(alarmFactory.newAlarm(this::backgroundFlushToStream));
223235
}
224236
}
225237

@@ -276,6 +288,15 @@ public void cancelOutstandingPublishes() {
276288
}
277289
}
278290

291+
private void backgroundFlushToStream() {
292+
try (CloseableMonitor.Hold h = reconnectingMonitor.enter()) {
293+
if (reconnecting) {
294+
return;
295+
}
296+
}
297+
flushToStream();
298+
}
299+
279300
private void flushToStream() {
280301
try (CloseableMonitor.Hold h = monitor.enter()) {
281302
if (shutdown) return;

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ public class SubscriberImpl extends ProxyService
8282
@GuardedBy("monitor.monitor")
8383
private boolean shutdown = false;
8484

85+
// reconnectingMonitor is always acquired after monitor.monitor when both are held.
86+
private final CloseableMonitor reconnectingMonitor = new CloseableMonitor();
87+
88+
@GuardedBy("reconnectingMonitor.monitor")
89+
private boolean reconnecting = false;
90+
8591
@VisibleForTesting
8692
SubscriberImpl(
8793
SubscribeStreamFactory streamFactory,
@@ -205,6 +211,9 @@ public void triggerReinitialize(CheckedApiException streamError) {
205211

206212
try (CloseableMonitor.Hold h = monitor.enter()) {
207213
if (shutdown) return;
214+
try (CloseableMonitor.Hold rh = reconnectingMonitor.enter()) {
215+
reconnecting = true;
216+
}
208217
connection.reinitialize(getInitialRequest());
209218
connection.modifyConnection(
210219
connectedSubscriber -> {
@@ -214,6 +223,9 @@ public void triggerReinitialize(CheckedApiException streamError) {
214223
.requestForRestart()
215224
.ifPresent(request -> connectedSubscriber.get().allowFlow(request));
216225
});
226+
try (CloseableMonitor.Hold rh = reconnectingMonitor.enter()) {
227+
reconnecting = false;
228+
}
217229
} catch (CheckedApiException e) {
218230
onPermanentError(e);
219231
}
@@ -239,6 +251,11 @@ public void onClientResponse(List<SequencedMessage> messages) throws CheckedApiE
239251
}
240252

241253
private void processBatchFlowRequest() {
254+
try (CloseableMonitor.Hold h = reconnectingMonitor.enter()) {
255+
if (reconnecting) {
256+
return;
257+
}
258+
}
242259
try (CloseableMonitor.Hold h = monitor.enter()) {
243260
if (shutdown) return;
244261
connection.modifyConnection(

0 commit comments

Comments
 (0)