Skip to content

Commit f2138f3

Browse files
zhouxy0809eolivelli
authored andcommitted
Fix list offsets for times failure when ledgers are removed by a rollover operation (#1854)
### Motivation When lac ledger is expired or removed,KafkaRequestHandler.fetchOffsetByTimestamp call failed and `NonRecoverableLedgerException` will be thrown. - When lac ledger is expired or removed,the first legerId in managedLedger is greater than the lac ledgerId , then the `asyncFindNewestMatching.startPosition` is lac'sLedgerId:0. - `OpFindNewestEntry.find` will read `startPosition`'s entry, then managedLedger will thrown `NonRecoverableLedgerException` ### Modifications - Handle the NonRecoverableLedgerException in findEntryFailed callback. In this case, return the LEO as the offset because we can treat the topic as empty. (cherry picked from commit c0ece19)
1 parent 2abc731 commit f2138f3

File tree

2 files changed

+23
-3
lines changed

2 files changed

+23
-3
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/KafkaRequestHandler.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1396,9 +1396,16 @@ public void findEntryComplete(Position position, Object ctx) {
13961396
@Override
13971397
public void findEntryFailed(ManagedLedgerException exception,
13981398
Optional<Position> position, Object ctx) {
1399-
log.warn("Unable to find position for topic {} time {}. Exception:",
1400-
topic, timestamp, exception);
1401-
partitionData.complete(Pair.of(Errors.UNKNOWN_SERVER_ERROR, null));
1399+
if (exception instanceof ManagedLedgerException.NonRecoverableLedgerException) {
1400+
// The position doesn't exist, it usually happens when the rollover of managed ledger leads to
1401+
// the deletion of all expired ledgers. In this case, there's only one empty ledger in the managed
1402+
// ledger. So here we complete it with the latest offset.
1403+
partitionData.complete(Pair.of(Errors.NONE, MessageMetadataUtils.getLogEndOffset(managedLedger)));
1404+
} else {
1405+
log.warn("Unable to find position for topic {} time {}. Exception:",
1406+
topic, timestamp, exception);
1407+
partitionData.complete(Pair.of(Errors.UNKNOWN_SERVER_ERROR, null));
1408+
}
14021409
}
14031410
});
14041411
}

tests/src/test/java/io/streamnative/pulsar/handlers/kop/MultiLedgerTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.kafka.clients.consumer.ConsumerRecord;
3838
import org.apache.kafka.clients.consumer.ConsumerRecords;
3939
import org.apache.kafka.clients.consumer.KafkaConsumer;
40+
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
4041
import org.apache.kafka.clients.producer.KafkaProducer;
4142
import org.apache.kafka.clients.producer.ProducerRecord;
4243
import org.apache.kafka.common.TopicPartition;
@@ -238,6 +239,18 @@ public void testListOffsetForEmptyRolloverLedger() throws Exception {
238239
fail(e.getMessage());
239240
}
240241

242+
// Verify listing offsets for timestamp return a correct offset
243+
try {
244+
final Map<TopicPartition, OffsetAndTimestamp> partitionToTimestamp =
245+
consumer.offsetsForTimes(Collections.singletonMap(new TopicPartition(topic, 0), 0L),
246+
Duration.ofSeconds(2));
247+
assertTrue(partitionToTimestamp.containsKey(topicPartition));
248+
assertEquals(partitionToTimestamp.get(topicPartition).offset(), numMessages);
249+
} catch (Exception e) {
250+
log.error("Failed to get offsets for times: {}", e.getMessage());
251+
fail(e.getMessage());
252+
}
253+
241254
// Verify consumer can start consuming from the correct position
242255
consumer.subscribe(Collections.singleton(topic));
243256
try (final KafkaProducer<String, String> producer = new KafkaProducer<>(newKafkaProducerProperties());) {

0 commit comments

Comments
 (0)