Skip to content

Commit d64581a

Browse files
lhotarihanmz
authored andcommitted
[fix][broker] PIP-379 Key_Shared implementation race condition causing out-of-order message delivery (apache#23874)
1 parent 572d9ae commit d64581a

File tree

4 files changed

+399
-17
lines changed

4 files changed

+399
-17
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1422,6 +1422,9 @@ public void cursorIsReset() {
14221422

14231423
protected boolean addMessageToReplay(long ledgerId, long entryId, long stickyKeyHash) {
14241424
if (checkIfMessageIsUnacked(ledgerId, entryId)) {
1425+
if (log.isDebugEnabled()) {
1426+
log.debug("[{}] Adding message to replay for {}:{} hash: {}", name, ledgerId, entryId, stickyKeyHash);
1427+
}
14251428
redeliveryMessages.add(ledgerId, entryId, stickyKeyHash);
14261429
return true;
14271430
} else {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import static org.apache.pulsar.broker.service.StickyKeyConsumerSelector.STICKY_KEY_HASH_NOT_SET;
2222
import com.google.common.annotations.VisibleForTesting;
23+
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
24+
import it.unimi.dsi.fastutil.ints.IntSet;
2325
import java.util.ArrayList;
2426
import java.util.HashMap;
2527
import java.util.HashSet;
@@ -407,6 +409,8 @@ private Map<Consumer, List<Entry>> filterAndGroupEntriesForDispatching(List<Entr
407409
Set<Consumer> blockedByHashConsumers = lookAheadAllowed && readType == ReadType.Normal ? new HashSet<>() : null;
408410
// in replay read mode, keep track of consumers for entries, used for look-ahead check
409411
Set<Consumer> consumersForEntriesForLookaheadCheck = lookAheadAllowed ? new HashSet<>() : null;
412+
// track already blocked hashes to block any further messages with the same hash
413+
IntSet alreadyBlockedHashes = new IntOpenHashSet();
410414

411415
for (Entry inputEntry : entries) {
412416
EntryAndMetadata entry;
@@ -419,24 +423,29 @@ private Map<Consumer, List<Entry>> filterAndGroupEntriesForDispatching(List<Entr
419423
Commands.peekAndCopyMessageMetadata(inputEntry.getDataBuffer(), getSubscriptionName(), -1));
420424
}
421425
int stickyKeyHash = getStickyKeyHash(entry);
422-
Consumer consumer = selector.select(stickyKeyHash);
426+
Consumer consumer = null;
423427
MutableBoolean blockedByHash = null;
424428
boolean dispatchEntry = false;
425-
if (consumer != null) {
426-
if (lookAheadAllowed) {
427-
consumersForEntriesForLookaheadCheck.add(consumer);
428-
}
429-
blockedByHash = lookAheadAllowed && readType == ReadType.Normal ? new MutableBoolean(false) : null;
430-
MutableInt permits =
431-
permitsForConsumer.computeIfAbsent(consumer,
432-
k -> new MutableInt(getAvailablePermits(consumer)));
433-
// a consumer was found for the sticky key hash and the entry can be dispatched
434-
if (permits.intValue() > 0
435-
&& canDispatchEntry(consumer, entry, readType, stickyKeyHash, blockedByHash)) {
436-
// decrement the permits for the consumer
437-
permits.decrement();
438-
// allow the entry to be dispatched
439-
dispatchEntry = true;
429+
// check if the hash is already blocked
430+
boolean hashIsAlreadyBlocked = alreadyBlockedHashes.contains(stickyKeyHash);
431+
if (!hashIsAlreadyBlocked) {
432+
consumer = selector.select(stickyKeyHash);
433+
if (consumer != null) {
434+
if (lookAheadAllowed) {
435+
consumersForEntriesForLookaheadCheck.add(consumer);
436+
}
437+
blockedByHash = lookAheadAllowed && readType == ReadType.Normal ? new MutableBoolean(false) : null;
438+
MutableInt permits =
439+
permitsForConsumer.computeIfAbsent(consumer,
440+
k -> new MutableInt(getAvailablePermits(k)));
441+
// a consumer was found for the sticky key hash and the entry can be dispatched
442+
if (permits.intValue() > 0
443+
&& canDispatchEntry(consumer, entry, readType, stickyKeyHash, blockedByHash)) {
444+
// decrement the permits for the consumer
445+
permits.decrement();
446+
// allow the entry to be dispatched
447+
dispatchEntry = true;
448+
}
440449
}
441450
}
442451
if (dispatchEntry) {
@@ -445,6 +454,10 @@ && canDispatchEntry(consumer, entry, readType, stickyKeyHash, blockedByHash)) {
445454
entriesGroupedByConsumer.computeIfAbsent(consumer, k -> new ArrayList<>());
446455
consumerEntries.add(entry);
447456
} else {
457+
if (!hashIsAlreadyBlocked) {
458+
// the hash is blocked, add it to the set of blocked hashes
459+
alreadyBlockedHashes.add(stickyKeyHash);
460+
}
448461
if (blockedByHash != null && blockedByHash.isTrue()) {
449462
// the entry is blocked by hash, add the consumer to the blocked set
450463
blockedByHashConsumers.add(consumer);
@@ -536,6 +549,9 @@ private class ReplayPositionFilter implements Predicate<Position> {
536549
// tracks the available permits for each consumer for the duration of the filter usage
537550
// the filter is stateful and shouldn't be shared or reused later
538551
private final Map<Consumer, MutableInt> availablePermitsMap = new HashMap<>();
552+
// tracks the hashes that have been blocked during the filtering
553+
// it is necessary to block all later messages after a hash gets blocked so that ordering is preserved
554+
private final Set<Long> alreadyBlockedHashes = new HashSet<>();
539555

540556
@Override
541557
public boolean test(Position position) {
@@ -553,25 +569,34 @@ public boolean test(Position position) {
553569
}
554570
return true;
555571
}
572+
// check if the hash is already blocked, if so, then replaying of the position should be skipped
573+
// to preserve ordering
574+
if (alreadyBlockedHashes.contains(stickyKeyHash)) {
575+
return false;
576+
}
556577

557578
// find the consumer for the sticky key hash
558579
Consumer consumer = selector.select(stickyKeyHash.intValue());
559580
// skip replaying the message position if there's no assigned consumer
560581
if (consumer == null) {
582+
alreadyBlockedHashes.add(stickyKeyHash);
561583
return false;
562584
}
585+
563586
// lookup the available permits for the consumer
564587
MutableInt availablePermits =
565588
availablePermitsMap.computeIfAbsent(consumer,
566589
k -> new MutableInt(getAvailablePermits(consumer)));
567590
// skip replaying the message position if the consumer has no available permits
568591
if (availablePermits.intValue() <= 0) {
592+
alreadyBlockedHashes.add(stickyKeyHash);
569593
return false;
570594
}
571595

572596
if (drainingHashesRequired
573597
&& drainingHashesTracker.shouldBlockStickyKeyHash(consumer, stickyKeyHash.intValue())) {
574598
// the hash is draining and the consumer is not the draining consumer
599+
alreadyBlockedHashes.add(stickyKeyHash);
575600
return false;
576601
}
577602

0 commit comments

Comments
 (0)