Skip to content

Commit 3d0625b

Browse files
authored
[improve][broker] PIP-379: Key_Shared Draining Hashes for Improved Message Ordering (#23352)
1 parent 4efcc15 commit 3d0625b

File tree

47 files changed

+3324
-1838
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+3324
-1838
lines changed

distribution/server/src/assemble/LICENSE.bin.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,7 @@ The Apache Software License, Version 2.0
262262
- com.fasterxml.jackson.module-jackson-module-parameter-names-2.17.2.jar
263263
* Caffeine -- com.github.ben-manes.caffeine-caffeine-2.9.1.jar
264264
* Conscrypt -- org.conscrypt-conscrypt-openjdk-uber-2.5.2.jar
265+
* Fastutil -- it.unimi.dsi-fastutil-8.5.14.jar
265266
* Proto Google Common Protos -- com.google.api.grpc-proto-google-common-protos-2.17.0.jar
266267
* Bitbucket -- org.bitbucket.b_c-jose4j-0.9.4.jar
267268
* Gson

pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ flexible messaging model and an intuitive client API.</description>
162162
<bouncycastle.bcpkix-fips.version>1.0.7</bouncycastle.bcpkix-fips.version>
163163
<bouncycastle.bc-fips.version>1.0.2.5</bouncycastle.bc-fips.version>
164164
<jackson.version>2.17.2</jackson.version>
165+
<fastutil.version>8.5.14</fastutil.version>
165166
<reflections.version>0.10.2</reflections.version>
166167
<swagger.version>1.6.2</swagger.version>
167168
<puppycrawl.checkstyle.version>10.14.2</puppycrawl.checkstyle.version>
@@ -911,6 +912,12 @@ flexible messaging model and an intuitive client API.</description>
911912
<scope>import</scope>
912913
</dependency>
913914

915+
<dependency>
916+
<groupId>it.unimi.dsi</groupId>
917+
<artifactId>fastutil</artifactId>
918+
<version>${fastutil.version}</version>
919+
</dependency>
920+
914921
<dependency>
915922
<groupId>org.codehaus.jettison</groupId>
916923
<artifactId>jettison</artifactId>

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -979,6 +979,16 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
979979
)
980980
private int keySharedLookAheadMsgInReplayThresholdPerSubscription = 20000;
981981

982+
@FieldContext(
983+
category = CATEGORY_POLICIES,
984+
doc = "For Key_Shared subscriptions, when a blocked key hash gets unblocked,"
985+
+ " a redelivery will be attempted after a delay. This setting controls the delay."
986+
+ " The reason to have the delay is to batch multiple unblocking events instead of triggering"
987+
+ " redelivery for each unblocking event.",
988+
dynamic = true
989+
)
990+
private long keySharedUnblockingIntervalMs = 10L;
991+
982992
@FieldContext(
983993
category = CATEGORY_POLICIES,
984994
doc = "Once broker reaches maxUnackedMessagesPerBroker limit, it blocks subscriptions which has higher "

pulsar-broker/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@
6363
<artifactId>protobuf-java</artifactId>
6464
</dependency>
6565

66+
<dependency>
67+
<groupId>it.unimi.dsi</groupId>
68+
<artifactId>fastutil</artifactId>
69+
</dependency>
70+
6671
<dependency>
6772
<groupId>${project.groupId}</groupId>
6873
<artifactId>pulsar-client-original</artifactId>

pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ public void run(Timeout timeout) throws Exception {
146146
lastTickRun = clock.millis();
147147
currentTimeoutTarget = -1;
148148
this.timeout = null;
149-
dispatcher.readMoreEntries();
149+
dispatcher.readMoreEntriesAsync();
150150
}
151151
}
152152

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3416,7 +3416,7 @@ public void unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleC
34163416
try {
34173417
dispatcherList.forEach(dispatcher -> {
34183418
dispatcher.unBlockDispatcherOnUnackedMsgs();
3419-
executor().execute(() -> dispatcher.readMoreEntries());
3419+
dispatcher.readMoreEntriesAsync();
34203420
log.info("[{}] Dispatcher is unblocked", dispatcher.getName());
34213421
blockedDispatchers.remove(dispatcher);
34223422
});

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java

Lines changed: 72 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.pulsar.broker.service;
2020

2121
import java.util.ArrayList;
22-
import java.util.IdentityHashMap;
2322
import java.util.List;
2423
import java.util.Map;
2524
import java.util.NavigableMap;
@@ -28,13 +27,10 @@
2827
import java.util.concurrent.locks.ReadWriteLock;
2928
import java.util.concurrent.locks.ReentrantReadWriteLock;
3029
import org.apache.pulsar.client.api.Range;
31-
import org.apache.pulsar.common.util.Murmur3_32Hash;
3230

3331
/**
34-
* This is a consumer selector based fixed hash range.
35-
*
36-
* The implementation uses consistent hashing to evenly split, the
37-
* number of keys assigned to each consumer.
32+
* This is a consumer selector using consistent hashing to evenly split
33+
* the number of keys assigned to each consumer.
3834
*/
3935
public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyConsumerSelector {
4036
// use NUL character as field separator for hash key calculation
@@ -47,14 +43,22 @@ public class ConsistentHashingStickyKeyConsumerSelector implements StickyKeyCons
4743
private final ConsumerNameIndexTracker consumerNameIndexTracker = new ConsumerNameIndexTracker();
4844

4945
private final int numberOfPoints;
46+
private final Range keyHashRange;
47+
private ConsumerHashAssignmentsSnapshot consumerHashAssignmentsSnapshot;
5048

5149
public ConsistentHashingStickyKeyConsumerSelector(int numberOfPoints) {
50+
this(numberOfPoints, DEFAULT_RANGE_SIZE - 1);
51+
}
52+
53+
public ConsistentHashingStickyKeyConsumerSelector(int numberOfPoints, int rangeMaxValue) {
5254
this.hashRing = new TreeMap<>();
5355
this.numberOfPoints = numberOfPoints;
56+
this.keyHashRange = Range.of(STICKY_KEY_HASH_NOT_SET + 1, rangeMaxValue);
57+
this.consumerHashAssignmentsSnapshot = ConsumerHashAssignmentsSnapshot.empty();
5458
}
5559

5660
@Override
57-
public CompletableFuture<Void> addConsumer(Consumer consumer) {
61+
public CompletableFuture<ImpactedConsumersResult> addConsumer(Consumer consumer) {
5862
rwLock.writeLock().lock();
5963
try {
6064
ConsumerIdentityWrapper consumerIdentityWrapper = new ConsumerIdentityWrapper(consumer);
@@ -72,7 +76,11 @@ public CompletableFuture<Void> addConsumer(Consumer consumer) {
7276
consumerNameIndexTracker.decreaseConsumerRefCount(removed);
7377
}
7478
}
75-
return CompletableFuture.completedFuture(null);
79+
ConsumerHashAssignmentsSnapshot assignmentsAfter = internalGetConsumerHashAssignmentsSnapshot();
80+
ImpactedConsumersResult impactedConsumers =
81+
consumerHashAssignmentsSnapshot.resolveImpactedConsumers(assignmentsAfter);
82+
consumerHashAssignmentsSnapshot = assignmentsAfter;
83+
return CompletableFuture.completedFuture(impactedConsumers);
7684
} finally {
7785
rwLock.writeLock().unlock();
7886
}
@@ -88,14 +96,14 @@ public CompletableFuture<Void> addConsumer(Consumer consumer) {
8896
* @param hashRingPointIndex the index of the hash ring point
8997
* @return the hash value
9098
*/
91-
private static int calculateHashForConsumerAndIndex(Consumer consumer, int consumerNameIndex,
99+
private int calculateHashForConsumerAndIndex(Consumer consumer, int consumerNameIndex,
92100
int hashRingPointIndex) {
93101
String key = consumer.consumerName() + KEY_SEPARATOR + consumerNameIndex + KEY_SEPARATOR + hashRingPointIndex;
94-
return Murmur3_32Hash.getInstance().makeHash(key.getBytes());
102+
return makeStickyKeyHash(key.getBytes());
95103
}
96104

97105
@Override
98-
public void removeConsumer(Consumer consumer) {
106+
public ImpactedConsumersResult removeConsumer(Consumer consumer) {
99107
rwLock.writeLock().lock();
100108
try {
101109
ConsumerIdentityWrapper consumerIdentityWrapper = new ConsumerIdentityWrapper(consumer);
@@ -109,6 +117,11 @@ public void removeConsumer(Consumer consumer) {
109117
}
110118
}
111119
}
120+
ConsumerHashAssignmentsSnapshot assignmentsAfter = internalGetConsumerHashAssignmentsSnapshot();
121+
ImpactedConsumersResult impactedConsumers =
122+
consumerHashAssignmentsSnapshot.resolveImpactedConsumers(assignmentsAfter);
123+
consumerHashAssignmentsSnapshot = assignmentsAfter;
124+
return impactedConsumers;
112125
} finally {
113126
rwLock.writeLock().unlock();
114127
}
@@ -134,32 +147,58 @@ public Consumer select(int hash) {
134147
}
135148

136149
@Override
137-
public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
138-
Map<Consumer, List<Range>> result = new IdentityHashMap<>();
150+
public Range getKeyHashRange() {
151+
return keyHashRange;
152+
}
153+
154+
@Override
155+
public ConsumerHashAssignmentsSnapshot getConsumerHashAssignmentsSnapshot() {
139156
rwLock.readLock().lock();
140157
try {
141-
if (hashRing.isEmpty()) {
142-
return result;
143-
}
144-
int start = 0;
145-
int lastKey = 0;
146-
for (Map.Entry<Integer, ConsumerIdentityWrapper> entry: hashRing.entrySet()) {
147-
Consumer consumer = entry.getValue().consumer;
148-
result.computeIfAbsent(consumer, key -> new ArrayList<>())
149-
.add(Range.of(start, entry.getKey()));
150-
lastKey = entry.getKey();
151-
start = lastKey + 1;
152-
}
153-
// Handle wrap-around in the hash ring, the first consumer will also contain the range from the last key
154-
// to the maximum value of the hash range
155-
Consumer firstConsumer = hashRing.firstEntry().getValue().consumer;
156-
List<Range> ranges = result.get(firstConsumer);
157-
if (lastKey != Integer.MAX_VALUE - 1) {
158-
ranges.add(Range.of(lastKey + 1, Integer.MAX_VALUE - 1));
159-
}
158+
return consumerHashAssignmentsSnapshot;
160159
} finally {
161160
rwLock.readLock().unlock();
162161
}
163-
return result;
162+
}
163+
164+
private ConsumerHashAssignmentsSnapshot internalGetConsumerHashAssignmentsSnapshot() {
165+
if (hashRing.isEmpty()) {
166+
return ConsumerHashAssignmentsSnapshot.empty();
167+
}
168+
List<HashRangeAssignment> result = new ArrayList<>();
169+
int start = getKeyHashRange().getStart();
170+
int lastKey = -1;
171+
Consumer previousConsumer = null;
172+
Range previousRange = null;
173+
for (Map.Entry<Integer, ConsumerIdentityWrapper> entry: hashRing.entrySet()) {
174+
Consumer consumer = entry.getValue().consumer;
175+
Range range;
176+
if (consumer == previousConsumer) {
177+
// join ranges
178+
result.remove(result.size() - 1);
179+
range = Range.of(previousRange.getStart(), entry.getKey());
180+
} else {
181+
range = Range.of(start, entry.getKey());
182+
}
183+
result.add(new HashRangeAssignment(range, consumer));
184+
lastKey = entry.getKey();
185+
start = lastKey + 1;
186+
previousConsumer = consumer;
187+
previousRange = range;
188+
}
189+
// Handle wrap-around
190+
Consumer firstConsumer = hashRing.firstEntry().getValue().consumer;
191+
if (lastKey != getKeyHashRange().getEnd()) {
192+
Range range;
193+
if (firstConsumer == previousConsumer && previousRange.getEnd() == lastKey) {
194+
// join ranges
195+
result.remove(result.size() - 1);
196+
range = Range.of(previousRange.getStart(), getKeyHashRange().getEnd());
197+
} else {
198+
range = Range.of(lastKey + 1, getKeyHashRange().getEnd());
199+
}
200+
result.add(new HashRangeAssignment(range, firstConsumer));
201+
}
202+
return ConsumerHashAssignmentsSnapshot.of(result);
164203
}
165204
}

0 commit comments

Comments
 (0)