Skip to content

Commit a330c92

Browse files
authored
Merge branch 'master' into lh-broker-cache-expected-readcount-retention-rebase
2 parents 60e09b5 + d1e2fb7 commit a330c92

File tree

7 files changed

+344
-211
lines changed

7 files changed

+344
-211
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 255 additions & 180 deletions
Large diffs are not rendered by default.

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1064,23 +1064,36 @@ public synchronized void asyncDeleteCursor(final String consumerName, final Dele
10641064
callback.deleteCursorFailed(new ManagedLedgerException.CursorNotFoundException("ManagedCursor not found: "
10651065
+ consumerName), ctx);
10661066
return;
1067-
} else if (!cursor.isDurable()) {
1068-
cursor.setState(ManagedCursorImpl.State.Closed);
1069-
cursor.cancelPendingReadRequest();
1067+
}
1068+
1069+
// Non-durable cursors can be closed and removed immediately
1070+
if (!cursor.isDurable()) {
1071+
try {
1072+
cursor.close();
1073+
} catch (Exception e) {
1074+
log.warn("[{}] Failed to close non-durable cursor {}", name, consumerName, e);
1075+
}
10701076
cursors.removeCursor(consumerName);
1071-
deactivateCursorByName(consumerName);
10721077
callback.deleteCursorComplete(ctx);
10731078
return;
10741079
}
10751080

1081+
// If the cursor is active, we need to deactivate it first
1082+
cursor.setInactive();
1083+
// Set the state to deleting (which is a closed state) to avoid any new writes
1084+
ManagedCursorImpl.State beforeChangingState = cursor.changeStateToDeletingIfNotDeleted();
1085+
if (beforeChangingState.isDeletingOrDeleted()) {
1086+
log.warn("[{}] [{}] Cursor is already being deleted or has been deleted.", name, consumerName);
1087+
return;
1088+
}
1089+
10761090
// First remove the consumer form the MetaStore. If this operation succeeds and the next one (removing the
10771091
// ledger from BK) don't, we end up having a loose ledger leaked but the state will be consistent.
10781092
store.asyncRemoveCursor(ManagedLedgerImpl.this.name, consumerName, new MetaStoreCallback<Void>() {
10791093
@Override
10801094
public void operationComplete(Void result, Stat stat) {
10811095
cursor.asyncDeleteCursorLedger();
10821096
cursors.removeCursor(consumerName);
1083-
deactivateCursorByName(consumerName);
10841097

10851098
trimConsumedLedgersInBackground();
10861099

@@ -1090,7 +1103,7 @@ public void operationComplete(Void result, Stat stat) {
10901103

10911104
@Override
10921105
public void operationFailed(MetaStoreException e) {
1093-
handleBadVersion(e);
1106+
cursor.getAndSetState(ManagedCursorImpl.State.DeletingFailed);
10941107
callback.deleteCursorFailed(e, ctx);
10951108
}
10961109

@@ -2545,7 +2558,7 @@ void notifyCursors() {
25452558
if (waitingCursor == null) {
25462559
break;
25472560
}
2548-
2561+
waitingCursor.notifyWaitingCursorDequeued();
25492562
executor.execute(waitingCursor::notifyEntriesAvailable);
25502563
}
25512564
}
@@ -4040,13 +4053,18 @@ public int getNumberOfCursorsAtSamePositionOrBefore(ManagedCursor cursor) {
40404053
return activeCursors.getNumberOfCursorsAtSamePositionOrBefore(cursor);
40414054
}
40424055

4043-
40444056
public void removeWaitingCursor(ManagedCursor cursor) {
4045-
this.waitingCursors.remove(cursor);
4057+
((ManagedCursorImpl) cursor).removeWaitingCursorRequested(() -> {
4058+
// remove only if the cursor has been registered
4059+
this.waitingCursors.remove(cursor);
4060+
});
40464061
}
40474062

40484063
public void addWaitingCursor(ManagedCursorImpl cursor) {
4049-
this.waitingCursors.add(cursor);
4064+
cursor.addWaitingCursorRequested(() -> {
4065+
// add only if the cursor has not been registered
4066+
this.waitingCursors.add(cursor);
4067+
});
40504068
}
40514069

40524070
public boolean isCursorActive(ManagedCursor cursor) {

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ protected void internalAsyncMarkDelete(final Position newPosition, Map<String, L
113113
@Override
114114
public void asyncClose(CloseCallback callback, Object ctx) {
115115
STATE_UPDATER.set(this, State.Closed);
116+
closeWaitingCursor();
117+
setInactive();
116118
callback.closeComplete(ctx);
117119
}
118120

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.netty.util.Recycler.Handle;
2323
import java.util.ArrayList;
2424
import java.util.List;
25+
import java.util.concurrent.atomic.AtomicInteger;
2526
import java.util.function.Predicate;
2627
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
2728
import org.apache.bookkeeper.mledger.Entry;
@@ -34,7 +35,14 @@
3435
import org.slf4j.LoggerFactory;
3536

3637
class OpReadEntry implements ReadEntriesCallback {
37-
38+
static final OpReadEntry WAITING_READ_OP_FOR_CLOSED_CURSOR = new OpReadEntry();
39+
private static final AtomicInteger opReadIdGenerator = new AtomicInteger(1);
40+
/**
41+
* id for this read operation. Value can be negative when integer value overflow happens.
42+
* Used for waitingReadOp consistency so the the correct instance is handled after the instance has already been
43+
* recycled.
44+
*/
45+
int id;
3846
ManagedCursorImpl cursor;
3947
Position readPosition;
4048
private int count;
@@ -51,6 +59,7 @@ class OpReadEntry implements ReadEntriesCallback {
5159
public static OpReadEntry create(ManagedCursorImpl cursor, Position readPositionRef, int count,
5260
ReadEntriesCallback callback, Object ctx, Position maxPosition, Predicate<Position> skipCondition) {
5361
OpReadEntry op = RECYCLER.get();
62+
op.id = opReadIdGenerator.getAndIncrement();
5463
op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef);
5564
op.cursor = cursor;
5665
op.count = count;
@@ -123,7 +132,7 @@ private void internalReadEntriesFailed(ManagedLedgerException exception, Object
123132
if (!entries.isEmpty()) {
124133
// There were already some entries that were read before, we can return them
125134
complete(ctx);
126-
} else if (cursor.getConfig().isAutoSkipNonRecoverableData()
135+
} else if (!cursor.isClosed() && cursor.getConfig().isAutoSkipNonRecoverableData()
127136
&& exception instanceof NonRecoverableLedgerException) {
128137
log.warn("[{}][{}] read failed from ledger at position:{} : {}", cursor.ledger.getName(), cursor.getName(),
129138
readPosition, exception.getMessage());
@@ -200,6 +209,22 @@ private OpReadEntry(Handle<OpReadEntry> recyclerHandle) {
200209
this.recyclerHandle = recyclerHandle;
201210
}
202211

212+
// no-op constructor for EMPTY instance
213+
private OpReadEntry() {
214+
this.recyclerHandle = null;
215+
this.callback = new ReadEntriesCallback() {
216+
@Override
217+
public void readEntriesComplete(List<Entry> entries, Object ctx) {
218+
// no-op
219+
}
220+
221+
@Override
222+
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
223+
// no-op
224+
}
225+
};
226+
}
227+
203228
private static final Recycler<OpReadEntry> RECYCLER = new Recycler<>() {
204229
@Override
205230
protected OpReadEntry newObject(Recycler.Handle<OpReadEntry> recyclerHandle) {
@@ -208,6 +233,11 @@ protected OpReadEntry newObject(Recycler.Handle<OpReadEntry> recyclerHandle) {
208233
};
209234

210235
public void recycle() {
236+
if (recyclerHandle == null) {
237+
// This is the no-op instance, do not recycle
238+
return;
239+
}
240+
id = -1;
211241
count = 0;
212242
cursor = null;
213243
readPosition = null;

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyCursorImpl.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ public void skipEntries(int numEntriesToSkip) {
6060
@Override
6161
public void asyncClose(final AsyncCallbacks.CloseCallback callback, final Object ctx) {
6262
state = State.Closed;
63+
closeWaitingCursor();
64+
setInactive();
6365
callback.closeComplete(ctx);
6466
}
6567

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,16 @@ public synchronized void removeConsumer(Consumer consumer, boolean isResetCursor
360360

361361
if (dispatcher != null && dispatcher.getConsumers().isEmpty()) {
362362
deactivateCursor();
363+
// Remove the cursor from the waiting cursors list.
364+
// For durable cursors, we should *not* cancel the pending read with cursor.cancelPendingReadRequest.
365+
// This is because internally, in the dispatcher implementations, there is a "havePendingRead" flag
366+
// that is not reset. If the pending read is cancelled, the dispatcher will not continue reading from
367+
// the managed ledger when a new consumer is added to the dispatcher since based on the "havePendingRead"
368+
// state, it will continue to expect that a read is pending and will not submit a new read.
369+
// For non-durable cursors, there's no difference since the cursor is not expected to be used again.
370+
371+
// remove waiting cursor from the managed ledger, this applies to both durable and non-durable cursors.
372+
topic.getManagedLedger().removeWaitingCursor(cursor);
363373

364374
if (!cursor.isDurable()) {
365375
// If cursor is not durable, we need to clean up the subscription as well. No need to check for active

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959
import org.apache.bookkeeper.mledger.AsyncCallbacks;
6060
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
6161
import org.apache.bookkeeper.mledger.Entry;
62-
import org.apache.bookkeeper.mledger.ManagedCursor;
6362
import org.apache.bookkeeper.mledger.ManagedLedgerException;
6463
import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
6564
import org.apache.bookkeeper.mledger.Position;
@@ -1046,7 +1045,7 @@ public void testResumptionAfterBacklogRelaxed() throws Exception {
10461045
*
10471046
* @throws Exception
10481047
*/
1049-
@Test(timeOut = 15000)
1048+
@Test(timeOut = 30000)
10501049
public void testCloseReplicatorStartProducer() throws Exception {
10511050
TopicName dest = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns1/closeCursor"));
10521051
// Producer on r1
@@ -1063,33 +1062,30 @@ public void testCloseReplicatorStartProducer() throws Exception {
10631062
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get();
10641063
PersistentReplicator replicator = (PersistentReplicator) topic.getPersistentReplicator("r2");
10651064

1065+
// check that the replicator producer is not null
1066+
Awaitility.await().untilAsserted(() -> {
1067+
assertNotNull(replicator.getProducer());
1068+
});
1069+
10661070
// close the cursor
1067-
Field cursorField = PersistentReplicator.class.getDeclaredField("cursor");
1068-
cursorField.setAccessible(true);
1069-
ManagedCursor cursor = (ManagedCursor) cursorField.get(replicator);
1070-
cursor.close();
1071-
// try to read entries
1071+
replicator.getCursor().close();
1072+
1073+
// try to produce entries
10721074
producer1.produce(10);
10731075

1076+
// attempt to read entries directly from replicator cursor
10741077
try {
1075-
cursor.readEntriesOrWait(10);
1078+
replicator.getCursor().readEntriesOrWait(10);
10761079
fail("It should have failed");
10771080
} catch (Exception e) {
10781081
assertEquals(e.getClass(), CursorAlreadyClosedException.class);
10791082
}
10801083

1081-
// replicator-readException: cursorAlreadyClosed
1082-
replicator.readEntriesFailed(new CursorAlreadyClosedException("Cursor already closed exception"), null);
1083-
10841084
// wait replicator producer to be closed
1085-
Thread.sleep(100);
1086-
1087-
// Replicator producer must be closed
1088-
Field producerField = AbstractReplicator.class.getDeclaredField("producer");
1089-
producerField.setAccessible(true);
1090-
@SuppressWarnings("unchecked")
1091-
ProducerImpl<byte[]> replicatorProducer = (ProducerImpl<byte[]>) producerField.get(replicator);
1092-
assertNull(replicatorProducer);
1085+
// Replicator producer must be null after the producer has been closed
1086+
Awaitility.await().untilAsserted(() -> {
1087+
assertNull(replicator.getProducer());
1088+
});
10931089
}
10941090

10951091
@Test(timeOut = 30000)

0 commit comments

Comments
 (0)