Skip to content

Commit 81c9f99

Browse files
authored
fix(s3stream): fix compaction exit on empty stream metadata list (#2095)
Signed-off-by: Shichao Nie <[email protected]>
1 parent ccd04f3 commit 81c9f99

File tree

2 files changed

+81
-8
lines changed

2 files changed

+81
-8
lines changed

s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -231,10 +231,6 @@ public CompletableFuture<Void> compact() {
231231
return CompletableFuture.failedFuture(e);
232232
}
233233
return this.streamManager.getStreams(new ArrayList<>(streamIds)).thenAcceptAsync(streamMetadataList -> {
234-
if (streamMetadataList.isEmpty()) {
235-
logger.info("No stream metadata found for stream set objects");
236-
return;
237-
}
238234
filterInvalidStreamDataBlocks(streamMetadataList);
239235
this.compact(streamMetadataList, objectMetadataList);
240236
}, compactThreadPool);
@@ -425,10 +421,6 @@ public CompletableFuture<Void> forceSplitAll() {
425421
List<Long> streamIds = streamDataBlockMap.values().stream().flatMap(Collection::stream)
426422
.map(StreamDataBlock::getStreamId).distinct().collect(Collectors.toList());
427423
this.streamManager.getStreams(streamIds).thenAcceptAsync(streamMetadataList -> {
428-
if (streamMetadataList.isEmpty()) {
429-
logger.info("No stream metadata found for stream set objects");
430-
return;
431-
}
432424
filterInvalidStreamDataBlocks(streamMetadataList);
433425
forceSplitObjects(streamMetadataList, objectMetadataList);
434426
cf.complete(null);

s3stream/src/test/java/com/automq/stream/s3/compact/CompactionManagerTest.java

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,72 @@ public void testForceSplitWithOutDatedObject() {
201201
Assertions.assertTrue(request.getStreamRanges().isEmpty());
202202
}
203203

204+
205+
@Test
206+
public void testForceSplitWithNonExistStream() {
207+
List<StreamMetadata> streamMetadataList = this.streamManager.getStreams(Collections.emptyList()).join();
208+
streamMetadataList = streamMetadataList.stream().filter(s -> s.streamId() != STREAM_0).collect(Collectors.toList());
209+
List<S3ObjectMetadata> s3ObjectMetadata = this.objectManager.getServerObjects().join();
210+
when(config.streamSetObjectCompactionForceSplitPeriod()).thenReturn(0);
211+
compactionManager = new CompactionManager(config, objectManager, streamManager, objectStorage);
212+
213+
compactionManager.updateStreamDataBlockMap(List.of(s3ObjectMetadata.get(0)));
214+
compactionManager.filterInvalidStreamDataBlocks(streamMetadataList);
215+
CommitStreamSetObjectRequest request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(0));
216+
Assertions.assertEquals(-1, request.getObjectId());
217+
Assertions.assertEquals(List.of(OBJECT_0), request.getCompactedObjectIds());
218+
Assertions.assertEquals(2, request.getStreamObjects().size());
219+
Assertions.assertTrue(checkDataIntegrity(streamMetadataList, Collections.singletonList(s3ObjectMetadata.get(0)), request));
220+
221+
compactionManager.updateStreamDataBlockMap(List.of(s3ObjectMetadata.get(1)));
222+
compactionManager.filterInvalidStreamDataBlocks(streamMetadataList);
223+
request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(1));
224+
Assertions.assertEquals(-1, request.getObjectId());
225+
Assertions.assertEquals(List.of(OBJECT_1), request.getCompactedObjectIds());
226+
Assertions.assertEquals(1, request.getStreamObjects().size());
227+
Assertions.assertTrue(checkDataIntegrity(streamMetadataList, Collections.singletonList(s3ObjectMetadata.get(1)), request));
228+
229+
compactionManager.updateStreamDataBlockMap(List.of(s3ObjectMetadata.get(2)));
230+
compactionManager.filterInvalidStreamDataBlocks(streamMetadataList);
231+
request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(2));
232+
Assertions.assertEquals(-1, request.getObjectId());
233+
Assertions.assertEquals(List.of(OBJECT_2), request.getCompactedObjectIds());
234+
Assertions.assertEquals(2, request.getStreamObjects().size());
235+
Assertions.assertTrue(checkDataIntegrity(streamMetadataList, Collections.singletonList(s3ObjectMetadata.get(2)), request));
236+
}
237+
238+
@Test
239+
public void testForceSplitWithEmptyStreamList() {
240+
List<StreamMetadata> streamMetadataList = Collections.emptyList();
241+
List<S3ObjectMetadata> s3ObjectMetadata = this.objectManager.getServerObjects().join();
242+
when(config.streamSetObjectCompactionForceSplitPeriod()).thenReturn(0);
243+
compactionManager = new CompactionManager(config, objectManager, streamManager, objectStorage);
244+
245+
compactionManager.updateStreamDataBlockMap(List.of(s3ObjectMetadata.get(0)));
246+
compactionManager.filterInvalidStreamDataBlocks(streamMetadataList);
247+
CommitStreamSetObjectRequest request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(0));
248+
Assertions.assertEquals(-1, request.getObjectId());
249+
Assertions.assertEquals(List.of(OBJECT_0), request.getCompactedObjectIds());
250+
Assertions.assertTrue(request.getStreamObjects().isEmpty());
251+
Assertions.assertTrue(request.getStreamRanges().isEmpty());
252+
253+
compactionManager.updateStreamDataBlockMap(List.of(s3ObjectMetadata.get(1)));
254+
compactionManager.filterInvalidStreamDataBlocks(streamMetadataList);
255+
request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(1));
256+
Assertions.assertEquals(-1, request.getObjectId());
257+
Assertions.assertEquals(List.of(OBJECT_1), request.getCompactedObjectIds());
258+
Assertions.assertTrue(request.getStreamObjects().isEmpty());
259+
Assertions.assertTrue(request.getStreamRanges().isEmpty());
260+
261+
compactionManager.updateStreamDataBlockMap(List.of(s3ObjectMetadata.get(2)));
262+
compactionManager.filterInvalidStreamDataBlocks(streamMetadataList);
263+
request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(2));
264+
Assertions.assertEquals(-1, request.getObjectId());
265+
Assertions.assertEquals(List.of(OBJECT_2), request.getCompactedObjectIds());
266+
Assertions.assertTrue(request.getStreamObjects().isEmpty());
267+
Assertions.assertTrue(request.getStreamRanges().isEmpty());
268+
}
269+
204270
@Test
205271
public void testForceSplitWithException() {
206272
S3AsyncClient s3AsyncClient = Mockito.mock(S3AsyncClient.class);
@@ -465,6 +531,21 @@ public void testCompactWithNonExistStream() {
465531
Assertions.assertTrue(checkDataIntegrity(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST, request));
466532
}
467533

534+
@Test
535+
public void testCompactWithEmptyStream() {
536+
compactionManager = new CompactionManager(config, objectManager, streamManager, objectStorage);
537+
List<StreamMetadata> streamMetadataList = Collections.emptyList();
538+
compactionManager.updateStreamDataBlockMap(S3_WAL_OBJECT_METADATA_LIST);
539+
compactionManager.filterInvalidStreamDataBlocks(streamMetadataList);
540+
CommitStreamSetObjectRequest request = compactionManager.buildCompactRequest(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST);
541+
542+
Assertions.assertEquals(-1, request.getObjectId());
543+
Assertions.assertTrue(request.getStreamObjects().isEmpty());
544+
Assertions.assertTrue(request.getStreamRanges().isEmpty());
545+
assertEquals(List.of(OBJECT_0, OBJECT_1, OBJECT_2), request.getCompactedObjectIds());
546+
Assertions.assertTrue(checkDataIntegrity(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST, request));
547+
}
548+
468549
@Test
469550
public void testCompactNoneExistObjects() {
470551
when(config.streamSetObjectCompactionStreamSplitSize()).thenReturn(100L);

0 commit comments

Comments
 (0)