Skip to content

Commit 73a1161

Browse files
authored
feat(metadata): prune invalid sparse index with image on startup (#1856)
* feat(metadata): prune invalid sparse index with image on startup Signed-off-by: Shichao Nie <[email protected]> * feat(metadata): address comments Signed-off-by: Shichao Nie <[email protected]> --------- Signed-off-by: Shichao Nie <[email protected]>
1 parent 83ddbe1 commit 73a1161

File tree

3 files changed

+144
-1
lines changed

3 files changed

+144
-1
lines changed

core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.LinkedList;
3030
import java.util.List;
3131
import java.util.Optional;
32+
import java.util.Set;
3233
import java.util.concurrent.CompletableFuture;
3334
import java.util.concurrent.ExecutorService;
3435
import java.util.concurrent.Executors;
@@ -45,6 +46,7 @@
4546
import org.apache.kafka.metadata.stream.S3Object;
4647
import org.apache.kafka.metadata.stream.S3ObjectState;
4748
import org.apache.kafka.metadata.stream.S3StreamObject;
49+
import org.apache.kafka.metadata.stream.S3StreamSetObject;
4850
import org.slf4j.Logger;
4951
import org.slf4j.LoggerFactory;
5052

@@ -84,6 +86,7 @@ public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage, Loader
8486
}
8587
// retry all pending tasks
8688
retryPendingTasks();
89+
this.indexCache.asyncPrune(this::getStreamSetObjectIds);
8790
}
8891

8992
public CompletableFuture<List<S3ObjectMetadata>> getStreamSetObjects() {
@@ -103,6 +106,13 @@ public CompletableFuture<List<S3ObjectMetadata>> getStreamSetObjects() {
103106
}
104107
}
105108

109+
public Set<Long> getStreamSetObjectIds() {
110+
try (Image image = getImage()) {
111+
return image.streamsMetadata().getStreamSetObjects(nodeId).stream()
112+
.map(S3StreamSetObject::objectId).collect(Collectors.toSet());
113+
}
114+
}
115+
106116
@Override
107117
public CompletableFuture<InRangeObjects> fetch(long streamId, long startOffset, long endOffset, int limit) {
108118
// TODO: cache the object list for next search

s3stream/src/main/java/com/automq/stream/s3/index/LocalStreamRangeIndexCache.java

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,11 @@
3737
import java.util.concurrent.Executors;
3838
import java.util.concurrent.ScheduledExecutorService;
3939
import java.util.concurrent.TimeUnit;
40+
import java.util.concurrent.atomic.AtomicBoolean;
4041
import java.util.concurrent.locks.Lock;
4142
import java.util.concurrent.locks.ReadWriteLock;
4243
import java.util.concurrent.locks.ReentrantReadWriteLock;
44+
import java.util.function.Supplier;
4345
import org.slf4j.Logger;
4446
import org.slf4j.LoggerFactory;
4547

@@ -56,6 +58,7 @@ public class LocalStreamRangeIndexCache implements S3StreamClient.StreamLifeCycl
5658
ThreadUtils.createThreadFactory("upload-index", true));
5759
private final Queue<CompletableFuture<Void>> uploadQueue = new LinkedList<>();
5860
private final CompletableFuture<Void> initCf = new CompletableFuture<>();
61+
private final AtomicBoolean pruned = new AtomicBoolean(false);
5962
private long nodeId = -1;
6063
private ObjectStorage objectStorage;
6164
private int totalSize = 0;
@@ -65,7 +68,8 @@ public void start() {
6568
executorService.scheduleAtFixedRate(this::flush, 1, 1, TimeUnit.MINUTES);
6669
}
6770

68-
public int totalSize() {
71+
// test only
72+
int totalSize() {
6973
return totalSize;
7074
}
7175

@@ -181,10 +185,21 @@ private <T> CompletableFuture<T> exec(Callable<T> r) {
181185
});
182186
}
183187

188+
private <T> CompletableFuture<T> execCompose(Callable<CompletableFuture<T>> r) {
189+
return initCf.thenCompose(v -> {
190+
try {
191+
return r.call();
192+
} catch (Exception e) {
193+
throw new RuntimeException(e);
194+
}
195+
});
196+
}
197+
184198
public void clear() {
185199
writeLock.lock();
186200
try {
187201
streamRangeIndexMap.clear();
202+
totalSize = 0;
188203
} finally {
189204
writeLock.unlock();
190205
}
@@ -379,6 +394,60 @@ public CompletableFuture<Long> searchObjectId(long streamId, long startOffset) {
379394
});
380395
}
381396

397+
public CompletableFuture<Void> asyncPrune(Supplier<Set<Long>> initStreamSetObjectIdsSupplier) {
398+
if (pruned.compareAndSet(false, true)) {
399+
CompletableFuture<Void> cf = new CompletableFuture<>();
400+
executorService.execute(() -> prune(initStreamSetObjectIdsSupplier).whenComplete((v, ex) -> {
401+
if (ex != null) {
402+
cf.completeExceptionally(ex);
403+
} else {
404+
cf.complete(null);
405+
}
406+
}));
407+
return cf;
408+
}
409+
return CompletableFuture.completedFuture(null);
410+
}
411+
412+
CompletableFuture<Void> prune(Supplier<Set<Long>> initStreamSetObjectIdsSupplier) {
413+
if (initStreamSetObjectIdsSupplier == null) {
414+
return CompletableFuture.failedFuture(new IllegalStateException("initStreamSetObjectIdsSupplier cannot be null"));
415+
}
416+
return execCompose(() -> {
417+
writeLock.lock();
418+
try {
419+
Set<Long> streamSetObjectIds = initStreamSetObjectIdsSupplier.get();
420+
Iterator<Map.Entry<Long, SparseRangeIndex>> iterator = streamRangeIndexMap.entrySet().iterator();
421+
boolean pruned = false;
422+
while (iterator.hasNext()) {
423+
Map.Entry<Long, SparseRangeIndex> entry = iterator.next();
424+
SparseRangeIndex sparseRangeIndex = entry.getValue();
425+
Set<Long> invalidateObjectIds = new HashSet<>();
426+
for (RangeIndex rangeIndex : sparseRangeIndex.getRangeIndexList()) {
427+
if (!streamSetObjectIds.contains(rangeIndex.getObjectId())) {
428+
invalidateObjectIds.add(rangeIndex.getObjectId());
429+
}
430+
}
431+
if (invalidateObjectIds.isEmpty()) {
432+
continue;
433+
}
434+
totalSize += sparseRangeIndex.compact(null, invalidateObjectIds);
435+
if (sparseRangeIndex.length() == 0) {
436+
iterator.remove();
437+
}
438+
pruned = true;
439+
}
440+
return pruned ? upload() : CompletableFuture.completedFuture(null);
441+
} catch (Throwable t) {
442+
LOGGER.error("Failed to prune local sparse index, clear all", t);
443+
clear();
444+
return CompletableFuture.failedFuture(t);
445+
} finally {
446+
writeLock.unlock();
447+
}
448+
});
449+
}
450+
382451
public static long binarySearchObjectId(long startOffset, List<RangeIndex> rangeIndexList) {
383452
if (rangeIndexList == null || rangeIndexList.isEmpty()) {
384453
return -1L;

s3stream/src/test/java/com/automq/stream/s3/index/LocalStreamRangeIndexCacheTest.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import com.automq.stream.s3.operator.ObjectStorage;
1818
import java.util.Collections;
1919
import java.util.List;
20+
import java.util.Set;
21+
import java.util.concurrent.CompletableFuture;
2022
import org.junit.jupiter.api.Assertions;
2123
import org.junit.jupiter.api.Test;
2224
import org.junit.jupiter.api.Timeout;
@@ -76,6 +78,68 @@ public void testAppend() {
7678
Assertions.assertEquals(97, cache.searchObjectId(STREAM_0, 1500).join());
7779
}
7880

81+
@Test
82+
public void testPrune() {
83+
ObjectStorage objectStorage = new MemoryObjectStorage();
84+
LocalStreamRangeIndexCache cache = new LocalStreamRangeIndexCache();
85+
cache.start();
86+
cache.init(NODE_0, objectStorage);
87+
CommitStreamSetObjectRequest request = new CommitStreamSetObjectRequest();
88+
long startOffset = 50;
89+
for (int i = 0; i < 10; i++) {
90+
request.setObjectId(88 + i);
91+
request.setStreamRanges(List.of(new ObjectStreamRange(STREAM_0, 0, startOffset, startOffset + 100, 100)));
92+
cache.updateIndexFromRequest(request).join();
93+
startOffset += 100;
94+
}
95+
Assertions.assertEquals(10, cache.getStreamRangeIndexMap().get(STREAM_0).length());
96+
Assertions.assertEquals(400, cache.totalSize());
97+
Assertions.assertEquals(-1, cache.searchObjectId(STREAM_0, 0).join());
98+
Assertions.assertEquals(88, cache.searchObjectId(STREAM_0, 50).join());
99+
Assertions.assertEquals(88, cache.searchObjectId(STREAM_0, 100).join());
100+
Assertions.assertEquals(89, cache.searchObjectId(STREAM_0, 150).join());
101+
Assertions.assertEquals(93, cache.searchObjectId(STREAM_0, 600).join());
102+
Assertions.assertEquals(97, cache.searchObjectId(STREAM_0, 950).join());
103+
Assertions.assertEquals(97, cache.searchObjectId(STREAM_0, 1500).join());
104+
105+
CompletableFuture<Void> cf = cache.asyncPrune(() -> Set.of(94L, 95L, 96L, 97L));
106+
Assertions.assertDoesNotThrow(cf::join);
107+
Assertions.assertEquals(4, cache.getStreamRangeIndexMap().get(STREAM_0).length());
108+
Assertions.assertEquals(160, cache.totalSize());
109+
Assertions.assertEquals(-1, cache.searchObjectId(STREAM_0, 0).join());
110+
Assertions.assertEquals(-1, cache.searchObjectId(STREAM_0, 50).join());
111+
Assertions.assertEquals(-1, cache.searchObjectId(STREAM_0, 100).join());
112+
Assertions.assertEquals(-1, cache.searchObjectId(STREAM_0, 150).join());
113+
Assertions.assertEquals(-1, cache.searchObjectId(STREAM_0, 600).join());
114+
Assertions.assertEquals(94, cache.searchObjectId(STREAM_0, 700).join());
115+
Assertions.assertEquals(95, cache.searchObjectId(STREAM_0, 800).join());
116+
Assertions.assertEquals(96, cache.searchObjectId(STREAM_0, 900).join());
117+
Assertions.assertEquals(97, cache.searchObjectId(STREAM_0, 950).join());
118+
Assertions.assertEquals(97, cache.searchObjectId(STREAM_0, 1500).join());
119+
120+
// test load from object storage
121+
cache = new LocalStreamRangeIndexCache();
122+
cache.start();
123+
cache.init(NODE_0, objectStorage);
124+
cache.initCf().join();
125+
Assertions.assertEquals(4, cache.getStreamRangeIndexMap().get(STREAM_0).length());
126+
Assertions.assertEquals(160, cache.totalSize());
127+
Assertions.assertEquals(-1, cache.searchObjectId(STREAM_0, 0).join());
128+
Assertions.assertEquals(-1, cache.searchObjectId(STREAM_0, 50).join());
129+
Assertions.assertEquals(-1, cache.searchObjectId(STREAM_0, 100).join());
130+
Assertions.assertEquals(-1, cache.searchObjectId(STREAM_0, 150).join());
131+
Assertions.assertEquals(-1, cache.searchObjectId(STREAM_0, 600).join());
132+
Assertions.assertEquals(94, cache.searchObjectId(STREAM_0, 700).join());
133+
Assertions.assertEquals(95, cache.searchObjectId(STREAM_0, 800).join());
134+
Assertions.assertEquals(96, cache.searchObjectId(STREAM_0, 900).join());
135+
Assertions.assertEquals(97, cache.searchObjectId(STREAM_0, 950).join());
136+
Assertions.assertEquals(97, cache.searchObjectId(STREAM_0, 1500).join());
137+
138+
cache.asyncPrune(Collections::emptySet).join();
139+
Assertions.assertEquals(0, cache.getStreamRangeIndexMap().size());
140+
Assertions.assertEquals(0, cache.totalSize());
141+
}
142+
79143
@Test
80144
public void testEvict() {
81145
ObjectStorage objectStorage = new MemoryObjectStorage();

0 commit comments

Comments
 (0)