Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -211,73 +211,95 @@ private FindPendingReadOutcome findPendingRead(PendingReadKey key, ConcurrentMap
private class PendingRead {
final PendingReadKey key;
final ConcurrentMap<PendingReadKey, PendingRead> ledgerCache;
final List<ReadEntriesCallbackWithContext> callbacks = new ArrayList<>(1);
boolean completed = false;
final List<ReadEntriesCallbackWithContext> listeners = new ArrayList<>(1);
PendingReadState state = PendingReadState.INITIALISED;

enum PendingReadState {
INITIALISED,
ATTACHED,
COMPLETED
}

public PendingRead(PendingReadKey key,
ConcurrentMap<PendingReadKey, PendingRead> ledgerCache) {
this.key = key;
this.ledgerCache = ledgerCache;
}

public void attach(CompletableFuture<List<EntryImpl>> handle) {
public synchronized void attach(CompletableFuture<List<EntryImpl>> handle) {
if (state != PendingReadState.INITIALISED) {
// this shouldn't ever happen. this is here to prevent misuse in future changes
throw new IllegalStateException("Unexpected state " + state + " for PendingRead for key " + key);
}
state = PendingReadState.ATTACHED;
handle.whenComplete((entriesToReturn, error) -> {
// execute in the completing thread
completeAndRemoveFromCache();
// execute in the completing thread and return a copy of the listeners
List<ReadEntriesCallbackWithContext> callbacks = completeAndRemoveFromCache();
// execute the callbacks in the managed ledger executor
rangeEntryCache.getManagedLedger().getExecutor().execute(() -> {
if (error != null) {
readEntriesFailed(error);
readEntriesFailed(callbacks, error);
} else {
readEntriesComplete(entriesToReturn);
readEntriesComplete(callbacks, entriesToReturn);
}
});
});
}

private synchronized void completeAndRemoveFromCache() {
completed = true;
synchronized boolean addListener(AsyncCallbacks.ReadEntriesCallback callback,
Object ctx, long startEntry, long endEntry) {
if (state == PendingReadState.COMPLETED) {
return false;
}
listeners.add(new ReadEntriesCallbackWithContext(callback, ctx, startEntry, endEntry));
return true;
}

private synchronized List<ReadEntriesCallbackWithContext> completeAndRemoveFromCache() {
state = PendingReadState.COMPLETED;
// When the read has completed, remove the instance from the ledgerCache map
// so that new reads will go to a new instance.
// this is required because we are going to do refcount management
// on the results of the callback
ledgerCache.remove(key, this);
// return a copy of the listeners
return List.copyOf(listeners);
}

private synchronized void readEntriesComplete(List<EntryImpl> entriesToReturn) {
// this method isn't synchronized since that could lead to deadlocks
private void readEntriesComplete(List<ReadEntriesCallbackWithContext> callbacks,
List<EntryImpl> entriesToReturn) {
if (callbacks.size() == 1) {
ReadEntriesCallbackWithContext first = callbacks.get(0);
if (first.startEntry == key.startEntry
&& first.endEntry == key.endEntry) {
// perfect match, no copy, this is the most common case
first.callback.readEntriesComplete((List) entriesToReturn,
first.ctx);
first.callback.readEntriesComplete((List) entriesToReturn, first.ctx);
} else {
first.callback.readEntriesComplete(
keepEntries(entriesToReturn, first.startEntry, first.endEntry),
first.ctx);
keepEntries(entriesToReturn, first.startEntry, first.endEntry), first.ctx);
}
} else {
for (ReadEntriesCallbackWithContext callback : callbacks) {
callback.callback.readEntriesComplete(
copyEntries(entriesToReturn, callback.startEntry, callback.endEntry),
callback.ctx);
copyEntries(entriesToReturn, callback.startEntry, callback.endEntry), callback.ctx);
}
for (EntryImpl entry : entriesToReturn) {
entry.release();
}
}
}

private synchronized void readEntriesFailed(Throwable error) {
// this method isn't synchronized since that could lead to deadlocks
private void readEntriesFailed(List<ReadEntriesCallbackWithContext> callbacks, Throwable error) {
for (ReadEntriesCallbackWithContext callback : callbacks) {
ManagedLedgerException mlException = createManagedLedgerException(error);
callback.callback.readEntriesFailed(mlException, callback.ctx);
}
}

private List<Entry> keepEntries(List<EntryImpl> list, long startEntry, long endEntry) {
List<Entry> result = new ArrayList<>((int) (endEntry - startEntry));
private static List<Entry> keepEntries(List<EntryImpl> list, long startEntry, long endEntry) {
List<Entry> result = new ArrayList<>((int) (endEntry - startEntry + 1));
for (EntryImpl entry : list) {
long entryId = entry.getEntryId();
if (startEntry <= entryId && entryId <= endEntry) {
Expand All @@ -289,7 +311,7 @@ private List<Entry> keepEntries(List<EntryImpl> list, long startEntry, long endE
return result;
}

private List<Entry> copyEntries(List<EntryImpl> entriesToReturn, long startEntry, long endEntry) {
private static List<Entry> copyEntries(List<EntryImpl> entriesToReturn, long startEntry, long endEntry) {
List<Entry> result = new ArrayList<>((int) (endEntry - startEntry + 1));
for (EntryImpl entry : entriesToReturn) {
long entryId = entry.getEntryId();
Expand All @@ -300,15 +322,6 @@ private List<Entry> copyEntries(List<EntryImpl> entriesToReturn, long startEntry
}
return result;
}

synchronized boolean addListener(AsyncCallbacks.ReadEntriesCallback callback,
Object ctx, long startEntry, long endEntry) {
if (completed) {
return false;
}
callbacks.add(new ReadEntriesCallbackWithContext(callback, ctx, startEntry, endEntry));
return true;
}
}


Expand Down
Loading