Skip to content

Commit f878fc6

Browse files
gaoran10eolivelli
authored andcommitted
[fix] Fix read unstable messages (#1856)
Currently, the consumer can read unstable messages in `read_committed` mode. 1. If the fetch offset is equal to or greater than LSO(last stable offset), skip reading entries to avoid useless reading. 2. If the entires' offset is equal to or greater than LSO, discard them to avoid sending them to the consumer. Add a new unit to verify consumers can't receive unstable messages. (cherry picked from commit 0d2ed74)
1 parent 4adfdd7 commit f878fc6

File tree

2 files changed

+91
-1
lines changed

2 files changed

+91
-1
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/storage/PartitionLog.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,19 @@ public static ReadRecordsResult get(DecodeResult decodeResult,
348348
return readRecordsResult;
349349
}
350350

351+
public static ReadRecordsResult empty(long highWatermark,
352+
long lastStableOffset,
353+
Position lastPosition,
354+
PartitionLog partitionLog) {
355+
return ReadRecordsResult.get(
356+
DecodeResult.get(MemoryRecords.EMPTY),
357+
Collections.emptyList(),
358+
highWatermark,
359+
lastStableOffset,
360+
lastPosition,
361+
partitionLog);
362+
}
363+
351364
public static ReadRecordsResult error(Errors errors, PartitionLog partitionLog) {
352365
return ReadRecordsResult.error(PositionImpl.EARLIEST, errors, partitionLog);
353366
}
@@ -587,6 +600,21 @@ public CompletableFuture<ReadRecordsResult> readRecords(final FetchRequestData.F
587600
requestStats.getPrepareMetadataStats().registerSuccessfulEvent(
588601
MathUtils.elapsedNanos(startPrepareMetadataNanos), TimeUnit.NANOSECONDS);
589602
long adjustedMaxBytes = Math.min(partitionData.partitionMaxBytes(), limitBytes.get());
603+
if (readCommitted) {
604+
long firstUndecidedOffset = producerStateManager.firstUndecidedOffset().orElse(-1L);
605+
if (firstUndecidedOffset >= 0 && firstUndecidedOffset <= offset) {
606+
long highWaterMark = MessageMetadataUtils.getHighWatermark(cursor.getManagedLedger());
607+
future.complete(
608+
ReadRecordsResult.empty(
609+
highWaterMark,
610+
firstUndecidedOffset,
611+
tcm.getManagedLedger().getLastConfirmedEntry(),
612+
this
613+
)
614+
);
615+
return;
616+
}
617+
}
590618
readEntries(cursor, topicPartition, cursorOffset, maxReadEntriesNum, adjustedMaxBytes,
591619
fullPartitionName -> {
592620
topicManager.invalidateCacheForFencedManagerLedgerOnTopic(fullPartitionName);
@@ -743,7 +771,7 @@ private List<Entry> getCommittedEntries(List<Entry> entries, long lso) {
743771
committedEntries = new ArrayList<>();
744772
for (Entry entry : entries) {
745773
try {
746-
if (lso >= MessageMetadataUtils.peekBaseOffsetFromEntry(entry)) {
774+
if (lso > MessageMetadataUtils.peekBaseOffsetFromEntry(entry)) {
747775
committedEntries.add(entry);
748776
} else {
749777
break;

tests/src/test/java/io/streamnative/pulsar/handlers/kop/TransactionTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import java.util.function.Function;
5454
import lombok.Cleanup;
5555
import lombok.extern.slf4j.Slf4j;
56+
import org.apache.commons.lang3.RandomStringUtils;
5657
import org.apache.kafka.clients.admin.AbortTransactionSpec;
5758
import org.apache.kafka.clients.admin.AdminClient;
5859
import org.apache.kafka.clients.admin.DescribeProducersResult;
@@ -1206,6 +1207,7 @@ private KafkaConsumer<Integer, String> buildTransactionConsumer(String groupId,
12061207
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
12071208
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
12081209
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, isolation);
1210+
consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
12091211
addCustomizeProps(consumerProps);
12101212

12111213
return new KafkaConsumer<>(consumerProps);
@@ -1746,4 +1748,64 @@ protected void addCustomizeProps(Properties producerProps) {
17461748
// No-op
17471749
}
17481750

1751+
@DataProvider(name = "isolationProvider")
1752+
protected Object[][] isolationProvider() {
1753+
return new Object[][]{
1754+
{"read_committed"},
1755+
{"read_uncommitted"},
1756+
};
1757+
}
1758+
1759+
@Test(dataProvider = "isolationProvider", timeOut = 1000 * 30)
1760+
public void readUnstableMessagesTest(String isolation) throws InterruptedException, ExecutionException {
1761+
String topic = "unstable-message-test-" + RandomStringUtils.randomAlphabetic(5);
1762+
1763+
KafkaConsumer<Integer, String> consumer = buildTransactionConsumer("unstable-read", isolation);
1764+
consumer.subscribe(Collections.singleton(topic));
1765+
1766+
String tnxId = "txn-" + RandomStringUtils.randomAlphabetic(5);
1767+
KafkaProducer<Integer, String> producer = buildTransactionProducer(tnxId);
1768+
producer.initTransactions();
1769+
1770+
String baseMsg = "test msg commit - ";
1771+
producer.beginTransaction();
1772+
producer.send(new ProducerRecord<>(topic, baseMsg + 0)).get();
1773+
producer.send(new ProducerRecord<>(topic, baseMsg + 1)).get();
1774+
producer.flush();
1775+
1776+
AtomicInteger messageCount = new AtomicInteger(0);
1777+
// make sure consumer can't receive unstable messages in `read_committed` mode
1778+
readAndCheckMessages(consumer, baseMsg, messageCount, isolation.equals("read_committed") ? 0 : 2);
1779+
1780+
producer.commitTransaction();
1781+
producer.beginTransaction();
1782+
// these two unstable message shouldn't be received in `read_committed` mode
1783+
producer.send(new ProducerRecord<>(topic, baseMsg + 2)).get();
1784+
producer.send(new ProducerRecord<>(topic, baseMsg + 3)).get();
1785+
producer.flush();
1786+
1787+
readAndCheckMessages(consumer, baseMsg, messageCount, isolation.equals("read_committed") ? 2 : 4);
1788+
1789+
consumer.close();
1790+
producer.close();
1791+
}
1792+
1793+
private void readAndCheckMessages(KafkaConsumer<Integer, String> consumer, String baseMsg,
1794+
AtomicInteger messageCount, int expectedMessageCount) {
1795+
while (true) {
1796+
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(3));
1797+
if (records.isEmpty()) {
1798+
break;
1799+
}
1800+
for (ConsumerRecord<Integer, String> record : records) {
1801+
assertEquals(record.value(), baseMsg + messageCount.getAndIncrement());
1802+
}
1803+
}
1804+
// make sure there is no message can be received
1805+
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(3));
1806+
assertTrue(records.isEmpty());
1807+
// make sure only receive the expected number of stable messages
1808+
assertEquals(messageCount.get(), expectedMessageCount);
1809+
}
1810+
17491811
}

0 commit comments

Comments
 (0)