Skip to content

Commit e9b6a94

Browse files
committed
Refactor PendingRead
1 parent c4dda85 commit e9b6a94

File tree

1 file changed

+69
-57
lines changed

1 file changed

+69
-57
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PendingReadsManager.java

Lines changed: 69 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -232,8 +232,66 @@ public PendingRead(PendingReadKey key,
232232
this.ledgerCache = ledgerCache;
233233
}
234234

235-
private List<EntryImpl> keepEntries(List<EntryImpl> list, long startEntry, long endEntry) {
236-
List<EntryImpl> result = new ArrayList<>((int) (endEntry - startEntry));
235+
public void attach(CompletableFuture<List<EntryImpl>> handle) {
236+
handle.whenComplete((entriesToReturn, error) -> {
237+
// execute in the completing thread
238+
completeAndRemoveFromCache();
239+
// execute the callbacks in the managed ledger executor
240+
rangeEntryCache.getManagedLedger().getExecutor().execute(() -> {
241+
if (error != null) {
242+
readEntriesFailed(error);
243+
} else {
244+
readEntriesComplete(entriesToReturn);
245+
}
246+
});
247+
});
248+
}
249+
250+
private synchronized void completeAndRemoveFromCache() {
251+
completed = true;
252+
// When the read has completed, remove the instance from the ledgerCache map
253+
// so that new reads will go to a new instance.
254+
// this is required because we are going to do refcount management
255+
// on the results of the callback
256+
synchronized (ledgerCache) {
257+
ledgerCache.remove(key, this);
258+
}
259+
}
260+
261+
private synchronized void readEntriesComplete(List<EntryImpl> entriesToReturn) {
262+
if (callbacks.size() == 1) {
263+
ReadEntriesCallbackWithContext first = callbacks.get(0);
264+
if (first.startEntry == key.startEntry
265+
&& first.endEntry == key.endEntry) {
266+
// perfect match, no copy, this is the most common case
267+
first.callback.readEntriesComplete((List) entriesToReturn,
268+
first.ctx);
269+
} else {
270+
first.callback.readEntriesComplete(
271+
keepEntries(entriesToReturn, first.startEntry, first.endEntry),
272+
first.ctx);
273+
}
274+
} else {
275+
for (ReadEntriesCallbackWithContext callback : callbacks) {
276+
callback.callback.readEntriesComplete(
277+
copyEntries(entriesToReturn, callback.startEntry, callback.endEntry),
278+
callback.ctx);
279+
}
280+
for (EntryImpl entry : entriesToReturn) {
281+
entry.release();
282+
}
283+
}
284+
}
285+
286+
private synchronized void readEntriesFailed(Throwable error) {
287+
for (ReadEntriesCallbackWithContext callback : callbacks) {
288+
ManagedLedgerException mlException = createManagedLedgerException(error);
289+
callback.callback.readEntriesFailed(mlException, callback.ctx);
290+
}
291+
}
292+
293+
private List<Entry> keepEntries(List<EntryImpl> list, long startEntry, long endEntry) {
294+
List<Entry> result = new ArrayList<>((int) (endEntry - startEntry));
237295
for (EntryImpl entry : list) {
238296
long entryId = entry.getEntryId();
239297
if (startEntry <= entryId && entryId <= endEntry) {
@@ -245,62 +303,16 @@ private List<EntryImpl> keepEntries(List<EntryImpl> list, long startEntry, long
245303
return result;
246304
}
247305

248-
public void attach(CompletableFuture<List<EntryImpl>> handle) {
249-
// when the future is done remove this from the map
250-
// new reads will go to a new instance
251-
// this is required because we are going to do refcount management
252-
// on the results of the callback
253-
handle.whenComplete((___, error) -> {
254-
synchronized (PendingRead.this) {
255-
completed = true;
256-
synchronized (ledgerCache) {
257-
ledgerCache.remove(key, this);
258-
}
259-
}
260-
});
261-
262-
handle.thenAcceptAsync(entriesToReturn -> {
263-
synchronized (PendingRead.this) {
264-
if (callbacks.size() == 1) {
265-
ReadEntriesCallbackWithContext first = callbacks.get(0);
266-
if (first.startEntry == key.startEntry
267-
&& first.endEntry == key.endEntry) {
268-
// perfect match, no copy, this is the most common case
269-
first.callback.readEntriesComplete((List) entriesToReturn,
270-
first.ctx);
271-
} else {
272-
first.callback.readEntriesComplete(
273-
(List) keepEntries(entriesToReturn, first.startEntry, first.endEntry),
274-
first.ctx);
275-
}
276-
} else {
277-
for (ReadEntriesCallbackWithContext callback : callbacks) {
278-
long callbackStartEntry = callback.startEntry;
279-
long callbackEndEntry = callback.endEntry;
280-
List<EntryImpl> copy = new ArrayList<>((int) (callbackEndEntry - callbackStartEntry + 1));
281-
for (EntryImpl entry : entriesToReturn) {
282-
long entryId = entry.getEntryId();
283-
if (callbackStartEntry <= entryId && entryId <= callbackEndEntry) {
284-
EntryImpl entryCopy = EntryImpl.create(entry);
285-
copy.add(entryCopy);
286-
}
287-
}
288-
callback.callback.readEntriesComplete((List) copy, callback.ctx);
289-
}
290-
for (EntryImpl entry : entriesToReturn) {
291-
entry.release();
292-
}
293-
}
294-
}
295-
}, rangeEntryCache.getManagedLedger().getExecutor()).exceptionally(exception -> {
296-
synchronized (PendingRead.this) {
297-
for (ReadEntriesCallbackWithContext callback : callbacks) {
298-
ManagedLedgerException mlException = createManagedLedgerException(exception);
299-
callback.callback.readEntriesFailed(mlException, callback.ctx);
300-
}
306+
private List<Entry> copyEntries(List<EntryImpl> entriesToReturn, long startEntry, long endEntry) {
307+
List<Entry> result = new ArrayList<>((int) (endEntry - startEntry + 1));
308+
for (EntryImpl entry : entriesToReturn) {
309+
long entryId = entry.getEntryId();
310+
if (startEntry <= entryId && entryId <= endEntry) {
311+
EntryImpl entryCopy = EntryImpl.create(entry);
312+
result.add(entryCopy);
301313
}
302-
return null;
303-
});
314+
}
315+
return result;
304316
}
305317

306318
synchronized boolean addListener(AsyncCallbacks.ReadEntriesCallback callback,

0 commit comments

Comments
 (0)