Skip to content

Commit 036839c

Browse files
authored
fix(storage): regard all produce requests as duplicate whose sequence… (#1865)
* fix(storage): regard all produce requests as duplicate whose sequence number is less than the 5 retained batches Signed-off-by: Ning Yu <[email protected]> * test: disable an unstable test Signed-off-by: Ning Yu <[email protected]> * test: update kafka tests Signed-off-by: Ning Yu <[email protected]> --------- Signed-off-by: Ning Yu <[email protected]>
1 parent 38618c0 commit 036839c

File tree

3 files changed

+42
-4
lines changed

3 files changed

+42
-4
lines changed

core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1306,8 +1306,11 @@ class UnifiedLogTest {
13061306
records = TestUtils.records(
13071307
List(new SimpleRecord(mockTime.milliseconds, s"key-1".getBytes, s"value-1".getBytes)),
13081308
producerId = pid, producerEpoch = epoch, sequence = 1)
1309-
assertThrows(classOf[OutOfOrderSequenceException], () => log.appendAsLeader(records, leaderEpoch = 0),
1310-
() => "Should have received an OutOfOrderSequenceException since we attempted to append a duplicate of a batch which is older than the last 5 appended batches.")
1309+
// AutoMQ for Kafka inject start
1310+
// We expect a DuplicateSequenceException since we attempted to append a duplicate of a batch which is older than the last 5 appended batches.
1311+
assertThrows(classOf[DuplicateSequenceException], () => log.appendAsLeader(records, leaderEpoch = 0),
1312+
() => "Should have received an DuplicateSequenceException since we attempted to append a duplicate of a batch which is older than the last 5 appended batches.")
1313+
// AutoMQ for Kafka inject end
13111314

13121315
// Append a duplicate entry with a single records at the tail of the log. This should return the appendInfo of the original entry.
13131316
def createRecordsWithDuplicate = TestUtils.records(List(new SimpleRecord(mockTime.milliseconds, "key".getBytes, "value".getBytes)),

s3stream/src/test/java/com/automq/stream/s3/index/LocalStreamRangeIndexCacheTest.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.Set;
2121
import java.util.concurrent.CompletableFuture;
2222
import org.junit.jupiter.api.Assertions;
23+
import org.junit.jupiter.api.Disabled;
2324
import org.junit.jupiter.api.Test;
2425
import org.junit.jupiter.api.Timeout;
2526

@@ -141,6 +142,7 @@ public void testPrune() {
141142
}
142143

143144
@Test
145+
@Disabled("FIXME: This test is not stable")
144146
public void testEvict() {
145147
ObjectStorage objectStorage = new MemoryObjectStorage();
146148
LocalStreamRangeIndexCache cache = new LocalStreamRangeIndexCache();

storage/src/main/java/org/apache/kafka/storage/internals/log/ProducerStateEntry.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.apache.kafka.common.errors.DuplicateSequenceException;
2727
import org.apache.kafka.common.record.RecordBatch;
2828

29+
import static org.apache.kafka.common.record.DefaultRecordBatch.decrementSequence;
30+
2931
/**
3032
* This class represents the state of a specific producer-id.
3133
* It contains batchMetadata queue which is ordered such that the batch with the lowest sequence is at the head of the
@@ -142,8 +144,24 @@ public Optional<BatchMetadata> findDuplicateBatch(RecordBatch batch) {
142144
}
143145
BatchMetadata front = batchMetadata.peek();
144146
if (front != null && front.recovered) {
145-
// the batch metadata is recovered from snapshot
146-
if (front.firstSeq() <= batch.baseSequence() && front.lastSeq >= batch.lastSequence()) {
147+
// the batch metadata (`front`) is recovered from snapshot
148+
boolean batchFallInFront = contains(front.firstSeq(), front.lastSeq, batch.baseSequence())
149+
&& contains(front.firstSeq(), front.lastSeq, batch.lastSequence());
150+
if (batchFallInFront) {
151+
throw new DuplicateSequenceException(
152+
String.format("The batch is duplicated (recover from snapshot), broker cached metadata is %s, the record batch is [%s, %s]",
153+
this, batch.baseSequence(), batch.lastSequence())
154+
);
155+
}
156+
}
157+
if (front != null) {
158+
// regard the batch as duplicated if it is before the first cached batch
159+
boolean batchBeforeFront = contains(
160+
decrementSequence(front.firstSeq(), 65536),
161+
decrementSequence(front.firstSeq(), 1),
162+
batch.lastSequence()
163+
);
164+
if (batchBeforeFront) {
147165
throw new DuplicateSequenceException(
148166
String.format("The batch is duplicated, broker cached metadata is %s, the record batch is [%s, %s]",
149167
this, batch.baseSequence(), batch.lastSequence())
@@ -154,6 +172,21 @@ public Optional<BatchMetadata> findDuplicateBatch(RecordBatch batch) {
154172
// AutoMQ inject end
155173
}
156174

175+
// AutoMQ inject start
176+
/**
177+
* Check if the sequence number is in the range [start, end] (inclusive).
178+
* The range may wrap around the sequence space.
179+
*/
180+
private static boolean contains(int start, int end, int seq) {
181+
if (start <= end) {
182+
return seq >= start && seq <= end;
183+
} else {
184+
// wrap around
185+
return seq >= start || seq <= end;
186+
}
187+
}
188+
// AutoMQ inject end
189+
157190
// Return the batch metadata of the cached batch having the exact sequence range, if any.
158191
Optional<BatchMetadata> batchWithSequenceRange(int firstSeq, int lastSeq) {
159192
Stream<BatchMetadata> duplicate = batchMetadata.stream().filter(metadata -> firstSeq == metadata.firstSeq() && lastSeq == metadata.lastSeq);

0 commit comments

Comments
 (0)