Skip to content

Commit 32c51bd

Browse files
authored
feat(log): enhance reading logic to handle offset gaps and add unit tests (#2699)
1 parent a34c536 commit 32c51bd

File tree

4 files changed

+358
-7
lines changed

4 files changed

+358
-7
lines changed

build.gradle

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1060,6 +1060,7 @@ project(':core') {
10601060
testImplementation project(':storage:storage-api').sourceSets.test.output
10611061
testImplementation project(':server').sourceSets.test.output
10621062
testImplementation libs.bcpkix
1063+
testImplementation libs.mockitoJunitJupiter // supports MockitoExtension
10631064
testImplementation libs.mockitoCore
10641065
testImplementation libs.guava
10651066
testImplementation(libs.apacheda) {
@@ -2218,9 +2219,9 @@ project(':s3stream') {
22182219
implementation 'com.ibm.async:asyncutil:0.1.0'
22192220

22202221
testImplementation 'org.slf4j:slf4j-simple:2.0.9'
2221-
testImplementation 'org.junit.jupiter:junit-jupiter:5.10.0'
2222-
testImplementation 'org.mockito:mockito-core:5.5.0'
2223-
testImplementation 'org.mockito:mockito-junit-jupiter:5.5.0'
2222+
testImplementation libs.junitJupiter
2223+
testImplementation libs.mockitoCore
2224+
testImplementation libs.mockitoJunitJupiter // supports MockitoExtension
22242225
testImplementation 'org.awaitility:awaitility:4.2.1'
22252226
}
22262227

core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import com.automq.stream.s3.context.FetchContext;
4747
import com.automq.stream.s3.trace.TraceUtils;
4848
import com.automq.stream.utils.FutureUtil;
49+
import com.google.common.annotations.VisibleForTesting;
4950

5051
import org.slf4j.Logger;
5152
import org.slf4j.LoggerFactory;
@@ -406,7 +407,8 @@ public long lastOffset() {
406407
}
407408

408409
static class StreamSegmentInputStream implements LogInputStream<RecordBatch> {
409-
private static final int FETCH_BATCH_SIZE = 64 * 1024;
410+
@VisibleForTesting
411+
protected static final int FETCH_BATCH_SIZE = 64 * 1024;
410412
private final ElasticLogFileRecords elasticLogFileRecords;
411413
private final Queue<RecordBatch> remaining = new LinkedList<>();
412414
private final int maxSize;
@@ -446,9 +448,9 @@ public RecordBatch nextBatch() throws IOException {
446448
buf = heapBuf;
447449
}
448450
readSize += buf.remaining();
451+
nextFetchOffset = Math.max(streamRecord.lastOffset(), nextFetchOffset);
449452
for (RecordBatch r : MemoryRecords.readableRecords(buf).batches()) {
450453
remaining.offer(r);
451-
nextFetchOffset = r.lastOffset() - elasticLogFileRecords.baseOffset + 1;
452454
}
453455
} catch (Throwable e) {
454456
ElasticStreamSlice slice = elasticLogFileRecords.streamSlice;

core/src/main/scala/kafka/log/streamaspect/MemoryClient.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public CompletableFuture<FailoverResponse> failover(FailoverRequest request) {
8080
return FutureUtil.failedFuture(new UnsupportedOperationException());
8181
}
8282

83-
static class StreamImpl implements Stream {
83+
public static class StreamImpl implements Stream {
8484
private final AtomicLong nextOffsetAlloc = new AtomicLong();
8585
private NavigableMap<Long, RecordBatchWithContext> recordMap = new ConcurrentSkipListMap<>();
8686
private final long streamId;
@@ -132,10 +132,26 @@ public CompletableFuture<FetchResult> fetch(FetchContext context, long startOffs
132132
if (floorKey == null) {
133133
return CompletableFuture.completedFuture(ArrayList::new);
134134
}
135-
List<RecordBatchWithContext> records = new ArrayList<>(recordMap.subMap(floorKey, endOffset).values());
135+
NavigableMap<Long, RecordBatchWithContext> subMap = recordMap.subMap(floorKey, true, endOffset, false);
136+
List<RecordBatchWithContext> records = new ArrayList<>();
137+
int accumulatedSize = 0;
138+
for (Map.Entry<Long, RecordBatchWithContext> entry : subMap.entrySet()) {
139+
RecordBatchWithContext batch = entry.getValue();
140+
int batchSize = batch.rawPayload().remaining();
141+
if (accumulatedSize + batchSize > maxSizeHint && !records.isEmpty()) {
142+
break;
143+
}
144+
records.add(batch);
145+
accumulatedSize += batchSize;
146+
147+
if (accumulatedSize > maxSizeHint) {
148+
break;
149+
}
150+
}
136151
return CompletableFuture.completedFuture(() -> records);
137152
}
138153

154+
139155
@Override
140156
public CompletableFuture<Void> trim(long newStartOffset) {
141157
recordMap = new ConcurrentSkipListMap<>(recordMap.tailMap(newStartOffset));

0 commit comments

Comments
 (0)