Skip to content

Commit 22e9245

Browse files
lhotarisrinath-ctds
authored andcommitted
[fix][broker] Fix ManagedCursor state management race conditions and lifecycle issues (apache#24569)
(cherry picked from commit c96f27a) (cherry picked from commit ec56ca5)
1 parent dd6ba36 commit 22e9245

File tree

7 files changed

+343
-210
lines changed

7 files changed

+343
-210
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 254 additions & 180 deletions
Large diffs are not rendered by default.

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1059,23 +1059,36 @@ public synchronized void asyncDeleteCursor(final String consumerName, final Dele
10591059
callback.deleteCursorFailed(new ManagedLedgerException.CursorNotFoundException("ManagedCursor not found: "
10601060
+ consumerName), ctx);
10611061
return;
1062-
} else if (!cursor.isDurable()) {
1063-
cursor.setState(ManagedCursorImpl.State.Closed);
1064-
cursor.cancelPendingReadRequest();
1062+
}
1063+
1064+
// Non-durable cursors can be closed and removed immediately
1065+
if (!cursor.isDurable()) {
1066+
try {
1067+
cursor.close();
1068+
} catch (Exception e) {
1069+
log.warn("[{}] Failed to close non-durable cursor {}", name, consumerName, e);
1070+
}
10651071
cursors.removeCursor(consumerName);
1066-
deactivateCursorByName(consumerName);
10671072
callback.deleteCursorComplete(ctx);
10681073
return;
10691074
}
10701075

1076+
// If the cursor is active, we need to deactivate it first
1077+
cursor.setInactive();
1078+
// Set the state to deleting (which is a closed state) to avoid any new writes
1079+
ManagedCursorImpl.State beforeChangingState = cursor.changeStateToDeletingIfNotDeleted();
1080+
if (beforeChangingState.isDeletingOrDeleted()) {
1081+
log.warn("[{}] [{}] Cursor is already being deleted or has been deleted.", name, consumerName);
1082+
return;
1083+
}
1084+
10711085
// First remove the consumer form the MetaStore. If this operation succeeds and the next one (removing the
10721086
// ledger from BK) don't, we end up having a loose ledger leaked but the state will be consistent.
10731087
store.asyncRemoveCursor(ManagedLedgerImpl.this.name, consumerName, new MetaStoreCallback<Void>() {
10741088
@Override
10751089
public void operationComplete(Void result, Stat stat) {
10761090
cursor.asyncDeleteCursorLedger();
10771091
cursors.removeCursor(consumerName);
1078-
deactivateCursorByName(consumerName);
10791092

10801093
trimConsumedLedgersInBackground();
10811094

@@ -1085,7 +1098,7 @@ public void operationComplete(Void result, Stat stat) {
10851098

10861099
@Override
10871100
public void operationFailed(MetaStoreException e) {
1088-
handleBadVersion(e);
1101+
cursor.getAndSetState(ManagedCursorImpl.State.DeletingFailed);
10891102
callback.deleteCursorFailed(e, ctx);
10901103
}
10911104

@@ -2437,7 +2450,7 @@ void notifyCursors() {
24372450
if (waitingCursor == null) {
24382451
break;
24392452
}
2440-
2453+
waitingCursor.notifyWaitingCursorDequeued();
24412454
executor.execute(waitingCursor::notifyEntriesAvailable);
24422455
}
24432456
}
@@ -3917,11 +3930,17 @@ private void deactivateCursorByName(String cursorName) {
39173930

39183931

39193932
public void removeWaitingCursor(ManagedCursor cursor) {
3920-
this.waitingCursors.remove(cursor);
3933+
((ManagedCursorImpl) cursor).removeWaitingCursorRequested(() -> {
3934+
// remove only if the cursor has been registered
3935+
this.waitingCursors.remove(cursor);
3936+
});
39213937
}
39223938

39233939
public void addWaitingCursor(ManagedCursorImpl cursor) {
3924-
this.waitingCursors.add(cursor);
3940+
cursor.addWaitingCursorRequested(() -> {
3941+
// add only if the cursor has not been registered
3942+
this.waitingCursors.add(cursor);
3943+
});
39253944
}
39263945

39273946
public boolean isCursorActive(ManagedCursor cursor) {

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ protected void internalAsyncMarkDelete(final Position newPosition, Map<String, L
113113
@Override
114114
public void asyncClose(CloseCallback callback, Object ctx) {
115115
STATE_UPDATER.set(this, State.Closed);
116+
closeWaitingCursor();
117+
setInactive();
116118
callback.closeComplete(ctx);
117119
}
118120

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.netty.util.Recycler.Handle;
2323
import java.util.ArrayList;
2424
import java.util.List;
25+
import java.util.concurrent.atomic.AtomicInteger;
2526
import java.util.function.Predicate;
2627
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
2728
import org.apache.bookkeeper.mledger.Entry;
@@ -34,7 +35,14 @@
3435
import org.slf4j.LoggerFactory;
3536

3637
class OpReadEntry implements ReadEntriesCallback {
37-
38+
static final OpReadEntry WAITING_READ_OP_FOR_CLOSED_CURSOR = new OpReadEntry();
39+
private static final AtomicInteger opReadIdGenerator = new AtomicInteger(1);
40+
/**
41+
* id for this read operation. Value can be negative when integer value overflow happens.
42+
* Used for waitingReadOp consistency so the the correct instance is handled after the instance has already been
43+
* recycled.
44+
*/
45+
int id;
3846
ManagedCursorImpl cursor;
3947
Position readPosition;
4048
private int count;
@@ -51,6 +59,7 @@ class OpReadEntry implements ReadEntriesCallback {
5159
public static OpReadEntry create(ManagedCursorImpl cursor, Position readPositionRef, int count,
5260
ReadEntriesCallback callback, Object ctx, Position maxPosition, Predicate<Position> skipCondition) {
5361
OpReadEntry op = RECYCLER.get();
62+
op.id = opReadIdGenerator.getAndIncrement();
5463
op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef);
5564
op.cursor = cursor;
5665
op.count = count;
@@ -123,7 +132,7 @@ private void internalReadEntriesFailed(ManagedLedgerException exception, Object
123132
if (!entries.isEmpty()) {
124133
// There were already some entries that were read before, we can return them
125134
complete(ctx);
126-
} else if (cursor.getConfig().isAutoSkipNonRecoverableData()
135+
} else if (!cursor.isClosed() && cursor.getConfig().isAutoSkipNonRecoverableData()
127136
&& exception instanceof NonRecoverableLedgerException) {
128137
log.warn("[{}][{}] read failed from ledger at position:{} : {}", cursor.ledger.getName(), cursor.getName(),
129138
readPosition, exception.getMessage());
@@ -200,6 +209,22 @@ private OpReadEntry(Handle<OpReadEntry> recyclerHandle) {
200209
this.recyclerHandle = recyclerHandle;
201210
}
202211

212+
// no-op constructor for EMPTY instance
213+
private OpReadEntry() {
214+
this.recyclerHandle = null;
215+
this.callback = new ReadEntriesCallback() {
216+
@Override
217+
public void readEntriesComplete(List<Entry> entries, Object ctx) {
218+
// no-op
219+
}
220+
221+
@Override
222+
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
223+
// no-op
224+
}
225+
};
226+
}
227+
203228
private static final Recycler<OpReadEntry> RECYCLER = new Recycler<>() {
204229
@Override
205230
protected OpReadEntry newObject(Recycler.Handle<OpReadEntry> recyclerHandle) {
@@ -208,6 +233,11 @@ protected OpReadEntry newObject(Recycler.Handle<OpReadEntry> recyclerHandle) {
208233
};
209234

210235
public void recycle() {
236+
if (recyclerHandle == null) {
237+
// This is the no-op instance, do not recycle
238+
return;
239+
}
240+
id = -1;
211241
count = 0;
212242
cursor = null;
213243
readPosition = null;

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ public void skipEntries(int numEntriesToSkip) {
6060
@Override
6161
public void asyncClose(final AsyncCallbacks.CloseCallback callback, final Object ctx) {
6262
state = State.Closed;
63+
closeWaitingCursor();
64+
setInactive();
6365
callback.closeComplete(ctx);
6466
}
6567

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,16 @@ public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor
350350

351351
if (dispatcher != null && dispatcher.getConsumers().isEmpty()) {
352352
deactivateCursor();
353+
// Remove the cursor from the waiting cursors list.
354+
// For durable cursors, we should *not* cancel the pending read with cursor.cancelPendingReadRequest.
355+
// This is because internally, in the dispatcher implementations, there is a "havePendingRead" flag
356+
// that is not reset. If the pending read is cancelled, the dispatcher will not continue reading from
357+
// the managed ledger when a new consumer is added to the dispatcher since based on the "havePendingRead"
358+
// state, it will continue to expect that a read is pending and will not submit a new read.
359+
// For non-durable cursors, there's no difference since the cursor is not expected to be used again.
360+
361+
// remove waiting cursor from the managed ledger, this applies to both durable and non-durable cursors.
362+
topic.getManagedLedger().removeWaitingCursor(cursor);
353363

354364
if (!cursor.isDurable()) {
355365
// If cursor is not durable, we need to clean up the subscription as well. No need to check for active

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959
import org.apache.bookkeeper.mledger.AsyncCallbacks;
6060
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
6161
import org.apache.bookkeeper.mledger.Entry;
62-
import org.apache.bookkeeper.mledger.ManagedCursor;
6362
import org.apache.bookkeeper.mledger.ManagedLedgerException;
6463
import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
6564
import org.apache.bookkeeper.mledger.Position;
@@ -1046,7 +1045,7 @@ public void testResumptionAfterBacklogRelaxed() throws Exception {
10461045
*
10471046
* @throws Exception
10481047
*/
1049-
@Test(timeOut = 15000)
1048+
@Test(timeOut = 30000)
10501049
public void testCloseReplicatorStartProducer() throws Exception {
10511050
TopicName dest = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns1/closeCursor"));
10521051
// Producer on r1
@@ -1063,33 +1062,30 @@ public void testCloseReplicatorStartProducer() throws Exception {
10631062
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
10641063
PersistentReplicator replicator = (PersistentReplicator) topic.getPersistentReplicator("r2");
10651064

1065+
// check that the replicator producer is not null
1066+
Awaitility.await().untilAsserted(() -> {
1067+
assertNotNull(replicator.getProducer());
1068+
});
1069+
10661070
// close the cursor
1067-
Field cursorField = PersistentReplicator.class.getDeclaredField("cursor");
1068-
cursorField.setAccessible(true);
1069-
ManagedCursor cursor = (ManagedCursor) cursorField.get(replicator);
1070-
cursor.close();
1071-
// try to read entries
1071+
replicator.getCursor().close();
1072+
1073+
// try to produce entries
10721074
producer1.produce(10);
10731075

1076+
// attempt to read entries directly from replicator cursor
10741077
try {
1075-
cursor.readEntriesOrWait(10);
1078+
replicator.getCursor().readEntriesOrWait(10);
10761079
fail("It should have failed");
10771080
} catch (Exception e) {
10781081
assertEquals(e.getClass(), CursorAlreadyClosedException.class);
10791082
}
10801083

1081-
// replicator-readException: cursorAlreadyClosed
1082-
replicator.readEntriesFailed(new CursorAlreadyClosedException("Cursor already closed exception"), null);
1083-
10841084
// wait replicator producer to be closed
1085-
Thread.sleep(100);
1086-
1087-
// Replicator producer must be closed
1088-
Field producerField = AbstractReplicator.class.getDeclaredField("producer");
1089-
producerField.setAccessible(true);
1090-
@SuppressWarnings("unchecked")
1091-
ProducerImpl<byte[]> replicatorProducer = (ProducerImpl<byte[]>) producerField.get(replicator);
1092-
assertNull(replicatorProducer);
1085+
// Replicator producer must be null after the producer has been closed
1086+
Awaitility.await().untilAsserted(() -> {
1087+
assertNull(replicator.getProducer());
1088+
});
10931089
}
10941090

10951091
@Test(timeOut = 30000)

0 commit comments

Comments
 (0)