Skip to content

Commit 254d13e

Browse files
zjxxzjwangzjxxzjwang
authored andcommitted
[fix][broker] Fix chunked message loss when no consumers are available (#25077)
Co-authored-by: zjxxzjwang <[email protected]>
1 parent b4dafdc commit 254d13e

File tree

4 files changed

+68
-4
lines changed

4 files changed

+68
-4
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@
2727
import java.util.function.Supplier;
2828
import lombok.Getter;
2929
import lombok.RequiredArgsConstructor;
30+
import lombok.extern.slf4j.Slf4j;
3031
import org.apache.pulsar.common.api.proto.MessageMetadata;
3132

3233
/**
3334
* The assigner to assign entries to the proper {@link Consumer} in the shared subscription.
3435
*/
36+
@Slf4j
3537
@RequiredArgsConstructor
3638
public class SharedConsumerAssignor {
3739

@@ -50,6 +52,8 @@ public class SharedConsumerAssignor {
5052
// Process the unassigned messages, e.g. adding them to the replay queue
5153
private final java.util.function.Consumer<EntryAndMetadata> unassignedMessageProcessor;
5254

55+
private final Subscription subscription;
56+
5357
public Map<Consumer, List<EntryAndMetadata>> assign(final List<EntryAndMetadata> entryAndMetadataList,
5458
final int numConsumers) {
5559
assert numConsumers >= 0;
@@ -58,7 +62,11 @@ public Map<Consumer, List<EntryAndMetadata>> assign(final List<EntryAndMetadata>
5862

5963
Consumer consumer = getConsumer(numConsumers);
6064
if (consumer == null) {
61-
entryAndMetadataList.forEach(EntryAndMetadata::release);
65+
if (subscription != null) {
66+
log.info("No consumer found to assign in topic:{}, subscription:{}, redelivering {} messages.",
67+
subscription.getTopic().getName(), subscription.getName(), entryAndMetadataList.size());
68+
}
69+
entryAndMetadataList.forEach(unassignedMessageProcessor);
6270
return consumerToEntries;
6371
}
6472
// The actual available permits might change, here we use the permits at the moment to assign entries

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso
163163
: RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
164164
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
165165
this.initializeDispatchRateLimiterIfNeeded();
166-
this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addEntryToReplay);
166+
this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addEntryToReplay, subscription);
167167
ServiceConfiguration serviceConfiguration = topic.getBrokerService().pulsar().getConfiguration();
168168
this.readFailureBackoff = new Backoff(
169169
serviceConfiguration.getDispatcherReadFailureBackoffInitialTimeInMs(),

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ public PersistentDispatcherMultipleConsumersClassic(PersistentTopic topic, Manag
157157
: RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
158158
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
159159
this.initializeDispatchRateLimiterIfNeeded();
160-
this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addMessageToReplay);
160+
this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addMessageToReplay, subscription);
161161
this.readFailureBackoff = new Backoff(
162162
topic.getBrokerService().pulsar().getConfiguration().getDispatcherReadFailureBackoffInitialTimeInMs(),
163163
TimeUnit.MILLISECONDS,

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void prepareData() {
5858
roundRobinConsumerSelector.clear();
5959
entryAndMetadataList.clear();
6060
replayQueue.clear();
61-
assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, replayQueue::add);
61+
assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, replayQueue::add, null);
6262
final AtomicLong entryId = new AtomicLong(0L);
6363
final MockProducer producerA = new MockProducer("A", entryId, entryAndMetadataList);
6464
final MockProducer producerB = new MockProducer("B", entryId, entryAndMetadataList);
@@ -238,4 +238,60 @@ private static MessageMetadata createMetadata(final String producerName,
238238
}
239239
return metadata;
240240
}
241+
242+
/**
243+
* When there are no consumers online, chunk messages will not be directly lost.
244+
*/
245+
@Test
246+
public void testChunkMessagesNotBeLostNoConsumer() {
247+
// 1. No consumer initially
248+
Map<Consumer, List<EntryAndMetadata>> result = assignor.assign(entryAndMetadataList, 1);
249+
assertTrue(result.isEmpty());
250+
assertEquals(replayQueue.size(), entryAndMetadataList.size());
251+
assertEquals(toString(replayQueue), toString(entryAndMetadataList));
252+
253+
// 2. Two Consumers come online
254+
final Consumer consumerA = new Consumer("A", 100);
255+
final Consumer consumerB = new Consumer("B", 100);
256+
roundRobinConsumerSelector.addConsumers(consumerA, consumerB);
257+
258+
// 3. Retry messages from replay queue
259+
List<EntryAndMetadata> retryList = new ArrayList<>(replayQueue);
260+
replayQueue.clear();
261+
262+
// Use a larger batch size to ensure we can process enough messages
263+
result = assignor.assign(retryList, 10);
264+
265+
// 4. Verify consumer receives all messages
266+
int totalReceived = result.values().stream().mapToInt(List::size).sum();
267+
assertEquals(totalReceived, retryList.size());
268+
269+
// Verify that chunks are assigned to the same consumer
270+
List<String> entriesA = toString(result.getOrDefault(consumerA, Collections.emptyList()));
271+
List<String> entriesB = toString(result.getOrDefault(consumerB, Collections.emptyList()));
272+
273+
// Check A-1 chunks (0:1, 0:2, 0:5)
274+
boolean a1InA = entriesA.stream().anyMatch(s -> s.contains("A-1"));
275+
if (a1InA) {
276+
assertTrue(entriesA.containsAll(Arrays.asList("0:1@A-1-0-3", "0:2@A-1-1-3", "0:5@A-1-2-3")));
277+
assertTrue(entriesB.stream().noneMatch(s -> s.contains("A-1")));
278+
} else {
279+
assertTrue(entriesB.containsAll(Arrays.asList("0:1@A-1-0-3", "0:2@A-1-1-3", "0:5@A-1-2-3")));
280+
assertTrue(entriesA.stream().noneMatch(s -> s.contains("A-1")));
281+
}
282+
283+
// Check B-1 chunks (0:4, 0:6)
284+
boolean b1InA = entriesA.stream().anyMatch(s -> s.contains("B-1"));
285+
if (b1InA) {
286+
assertTrue(entriesA.containsAll(Arrays.asList("0:4@B-1-0-2", "0:6@B-1-1-2")));
287+
assertTrue(entriesB.stream().noneMatch(s -> s.contains("B-1")));
288+
} else {
289+
assertTrue(entriesB.containsAll(Arrays.asList("0:4@B-1-0-2", "0:6@B-1-1-2")));
290+
assertTrue(entriesA.stream().noneMatch(s -> s.contains("B-1")));
291+
}
292+
293+
// 5. Verify internal state is clean (since all chunks are completed)
294+
assertTrue(assignor.getUuidToConsumer().isEmpty());
295+
}
296+
241297
}

0 commit comments

Comments
 (0)