Skip to content

Commit 0a44d3b

Browse files
lhotarisrinath-ctds
authored andcommitted
[fix][ml] Fix deadlock in PendingReadsManager (apache#23958)
(cherry picked from commit 367faef) (cherry picked from commit f2aa71b)
1 parent 3ba8a00 commit 0a44d3b

File tree

1 file changed

+42
-29
lines changed

1 file changed

+42
-29
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java

Lines changed: 42 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -211,73 +211,95 @@ private FindPendingReadOutcome findPendingRead(PendingReadKey key, ConcurrentMap
211211
private class PendingRead {
212212
final PendingReadKey key;
213213
final ConcurrentMap<PendingReadKey, PendingRead> ledgerCache;
214-
final List<ReadEntriesCallbackWithContext> callbacks = new ArrayList<>(1);
215-
boolean completed = false;
214+
final List<ReadEntriesCallbackWithContext> listeners = new ArrayList<>(1);
215+
PendingReadState state = PendingReadState.INITIALISED;
216+
217+
enum PendingReadState {
218+
INITIALISED,
219+
ATTACHED,
220+
COMPLETED
221+
}
216222

217223
public PendingRead(PendingReadKey key,
218224
ConcurrentMap<PendingReadKey, PendingRead> ledgerCache) {
219225
this.key = key;
220226
this.ledgerCache = ledgerCache;
221227
}
222228

223-
public void attach(CompletableFuture<List<EntryImpl>> handle) {
229+
public synchronized void attach(CompletableFuture<List<EntryImpl>> handle) {
230+
if (state != PendingReadState.INITIALISED) {
231+
// this shouldn't ever happen. this is here to prevent misuse in future changes
232+
throw new IllegalStateException("Unexpected state " + state + " for PendingRead for key " + key);
233+
}
234+
state = PendingReadState.ATTACHED;
224235
handle.whenComplete((entriesToReturn, error) -> {
225-
// execute in the completing thread
226-
completeAndRemoveFromCache();
236+
// execute in the completing thread and return a copy of the listeners
237+
List<ReadEntriesCallbackWithContext> callbacks = completeAndRemoveFromCache();
227238
// execute the callbacks in the managed ledger executor
228239
rangeEntryCache.getManagedLedger().getExecutor().execute(() -> {
229240
if (error != null) {
230-
readEntriesFailed(error);
241+
readEntriesFailed(callbacks, error);
231242
} else {
232-
readEntriesComplete(entriesToReturn);
243+
readEntriesComplete(callbacks, entriesToReturn);
233244
}
234245
});
235246
});
236247
}
237248

238-
private synchronized void completeAndRemoveFromCache() {
239-
completed = true;
249+
synchronized boolean addListener(AsyncCallbacks.ReadEntriesCallback callback,
250+
Object ctx, long startEntry, long endEntry) {
251+
if (state == PendingReadState.COMPLETED) {
252+
return false;
253+
}
254+
listeners.add(new ReadEntriesCallbackWithContext(callback, ctx, startEntry, endEntry));
255+
return true;
256+
}
257+
258+
private synchronized List<ReadEntriesCallbackWithContext> completeAndRemoveFromCache() {
259+
state = PendingReadState.COMPLETED;
240260
// When the read has completed, remove the instance from the ledgerCache map
241261
// so that new reads will go to a new instance.
242262
// this is required because we are going to do refcount management
243263
// on the results of the callback
244264
ledgerCache.remove(key, this);
265+
// return a copy of the listeners
266+
return List.copyOf(listeners);
245267
}
246268

247-
private synchronized void readEntriesComplete(List<EntryImpl> entriesToReturn) {
269+
// this method isn't synchronized since that could lead to deadlocks
270+
private void readEntriesComplete(List<ReadEntriesCallbackWithContext> callbacks,
271+
List<EntryImpl> entriesToReturn) {
248272
if (callbacks.size() == 1) {
249273
ReadEntriesCallbackWithContext first = callbacks.get(0);
250274
if (first.startEntry == key.startEntry
251275
&& first.endEntry == key.endEntry) {
252276
// perfect match, no copy, this is the most common case
253-
first.callback.readEntriesComplete((List) entriesToReturn,
254-
first.ctx);
277+
first.callback.readEntriesComplete((List) entriesToReturn, first.ctx);
255278
} else {
256279
first.callback.readEntriesComplete(
257-
keepEntries(entriesToReturn, first.startEntry, first.endEntry),
258-
first.ctx);
280+
keepEntries(entriesToReturn, first.startEntry, first.endEntry), first.ctx);
259281
}
260282
} else {
261283
for (ReadEntriesCallbackWithContext callback : callbacks) {
262284
callback.callback.readEntriesComplete(
263-
copyEntries(entriesToReturn, callback.startEntry, callback.endEntry),
264-
callback.ctx);
285+
copyEntries(entriesToReturn, callback.startEntry, callback.endEntry), callback.ctx);
265286
}
266287
for (EntryImpl entry : entriesToReturn) {
267288
entry.release();
268289
}
269290
}
270291
}
271292

272-
private synchronized void readEntriesFailed(Throwable error) {
293+
// this method isn't synchronized since that could lead to deadlocks
294+
private void readEntriesFailed(List<ReadEntriesCallbackWithContext> callbacks, Throwable error) {
273295
for (ReadEntriesCallbackWithContext callback : callbacks) {
274296
ManagedLedgerException mlException = createManagedLedgerException(error);
275297
callback.callback.readEntriesFailed(mlException, callback.ctx);
276298
}
277299
}
278300

279-
private List<Entry> keepEntries(List<EntryImpl> list, long startEntry, long endEntry) {
280-
List<Entry> result = new ArrayList<>((int) (endEntry - startEntry));
301+
private static List<Entry> keepEntries(List<EntryImpl> list, long startEntry, long endEntry) {
302+
List<Entry> result = new ArrayList<>((int) (endEntry - startEntry + 1));
281303
for (EntryImpl entry : list) {
282304
long entryId = entry.getEntryId();
283305
if (startEntry <= entryId && entryId <= endEntry) {
@@ -289,7 +311,7 @@ private List<Entry> keepEntries(List<EntryImpl> list, long startEntry, long endE
289311
return result;
290312
}
291313

292-
private List<Entry> copyEntries(List<EntryImpl> entriesToReturn, long startEntry, long endEntry) {
314+
private static List<Entry> copyEntries(List<EntryImpl> entriesToReturn, long startEntry, long endEntry) {
293315
List<Entry> result = new ArrayList<>((int) (endEntry - startEntry + 1));
294316
for (EntryImpl entry : entriesToReturn) {
295317
long entryId = entry.getEntryId();
@@ -300,15 +322,6 @@ private List<Entry> copyEntries(List<EntryImpl> entriesToReturn, long startEntry
300322
}
301323
return result;
302324
}
303-
304-
synchronized boolean addListener(AsyncCallbacks.ReadEntriesCallback callback,
305-
Object ctx, long startEntry, long endEntry) {
306-
if (completed) {
307-
return false;
308-
}
309-
callbacks.add(new ReadEntriesCallbackWithContext(callback, ctx, startEntry, endEntry));
310-
return true;
311-
}
312325
}
313326

314327

0 commit comments

Comments
 (0)