Skip to content

Commit 39c0f5d

Browse files
authored
feat(metadata): limit sparse index cache update interval (#1847)
* feat(metadata): limit sparse index cache update interval Signed-off-by: Shichao Nie <[email protected]> * fix(metadata): clear lru cache with pop Signed-off-by: Shichao Nie <[email protected]> --------- Signed-off-by: Shichao Nie <[email protected]>
1 parent 38b5b6f commit 39c0f5d

File tree

4 files changed

+71
-24
lines changed

4 files changed

+71
-24
lines changed

metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -394,9 +394,11 @@ private CompletableFuture<Integer> getStartSearchIndex(NodeS3StreamSetObjectMeta
394394
// search in sparse index
395395
return getStartStreamSetObjectId(node.getNodeId(), startOffset, ctx)
396396
.thenApply(objectId -> {
397-
int startIndex = findStartSearchIndex(objectId, node.orderList());
397+
int startIndex = -1;
398+
if (objectId >= 0) {
399+
startIndex = findStartSearchIndex(objectId, node.orderList());
400+
}
398401
if (startIndex < 0 && ctx.indexCache.nodeId() != node.getNodeId()) {
399-
LOGGER.info("Stream set object {} not found in node {}, invalidate index cache", objectId, node.getNodeId());
400402
NodeRangeIndexCache.getInstance().invalidate(node.getNodeId());
401403
}
402404
return Math.max(0, startIndex);
@@ -407,7 +409,7 @@ private CompletableFuture<Integer> getStartSearchIndex(NodeS3StreamSetObjectMeta
407409
* Get the object id of the first stream set object to start search from.
408410
* Possible results:
409411
* <p>
410-
* a. -1, not stream set object can be found, this can happen when index cache is not exist or is invalidated,
412+
* a. -1, no stream set object can be found, this can happen when index cache is not exist or is invalidated,
411413
* searching should be done from the beginning of the objects in this case.
412414
* <p>
413415
* b. non-negative value:

s3stream/src/main/java/com/automq/stream/s3/cache/AsyncLRUCache.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,9 @@ public synchronized boolean containsKey(K key) {
134134
}
135135

136136
public synchronized void clear() {
137-
cache.clear();
137+
while (cache.size() > 0) {
138+
pop();
139+
}
138140
}
139141

140142
public long totalSize() {

s3stream/src/main/java/com/automq/stream/s3/index/NodeRangeIndexCache.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,23 @@
1313

1414
import com.automq.stream.s3.cache.AsyncMeasurable;
1515
import com.automq.stream.s3.cache.AsyncLRUCache;
16+
import com.automq.stream.utils.Time;
1617
import java.util.List;
1718
import java.util.Map;
1819
import java.util.concurrent.CompletableFuture;
20+
import java.util.concurrent.ConcurrentHashMap;
1921
import java.util.function.Supplier;
2022
import org.slf4j.Logger;
2123
import org.slf4j.LoggerFactory;
2224

2325
public class NodeRangeIndexCache {
2426
private static final int MAX_CACHE_SIZE = 100 * 1024 * 1024;
2527
public static final int ZGC_OBJECT_HEADER_SIZE_BYTES = 16;
28+
public static final int MIN_CACHE_UPDATE_INTERVAL_MS = 1000; // 1s
2629
private static final Logger LOGGER = LoggerFactory.getLogger(NodeRangeIndexCache.class);
2730
private volatile static NodeRangeIndexCache instance = null;
2831
private final LRUCache nodeRangeIndexMap = new LRUCache(MAX_CACHE_SIZE);
32+
private final Map<Long, Long> nodeCacheUpdateTimestamp = new ConcurrentHashMap<>();
2933

3034
private NodeRangeIndexCache() {
3135

@@ -44,6 +48,7 @@ public static NodeRangeIndexCache getInstance() {
4448

4549
void clear() {
4650
this.nodeRangeIndexMap.clear();
51+
this.nodeCacheUpdateTimestamp.clear();
4752
}
4853

4954
// fot test only
@@ -62,8 +67,21 @@ public synchronized void invalidate(long nodeId) {
6267

6368
public synchronized CompletableFuture<Long> searchObjectId(long nodeId, long streamId, long startOffset,
6469
Supplier<CompletableFuture<Map<Long, List<RangeIndex>>>> cacheSupplier) {
70+
return searchObjectId(nodeId, streamId, startOffset, cacheSupplier, Time.SYSTEM);
71+
}
72+
73+
public synchronized CompletableFuture<Long> searchObjectId(long nodeId, long streamId, long startOffset,
74+
Supplier<CompletableFuture<Map<Long, List<RangeIndex>>>> cacheSupplier, Time time) {
6575
StreamRangeIndexCache indexCache = this.nodeRangeIndexMap.get(nodeId);
6676
if (indexCache == null) {
77+
long now = time.milliseconds();
78+
long expect = this.nodeCacheUpdateTimestamp.getOrDefault(nodeId, 0L) + MIN_CACHE_UPDATE_INTERVAL_MS;
79+
if (expect <= now) {
80+
this.nodeCacheUpdateTimestamp.put(nodeId, now);
81+
} else {
82+
// Skip updating from remote
83+
return CompletableFuture.completedFuture(-1L);
84+
}
6785
indexCache = new StreamRangeIndexCache(cacheSupplier.get());
6886
this.nodeRangeIndexMap.put(nodeId, indexCache);
6987
LOGGER.info("Update stream range index for node {}", nodeId);

s3stream/src/test/java/com/automq/stream/s3/index/NodeRangeIndexCacheTest.java

Lines changed: 45 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
package com.automq.stream.s3.index;
1313

14+
import com.automq.stream.utils.MockTime;
1415
import java.util.ArrayList;
1516
import java.util.Collections;
1617
import java.util.List;
@@ -19,7 +20,6 @@
1920
import java.util.concurrent.CompletableFuture;
2021
import java.util.concurrent.CopyOnWriteArrayList;
2122
import java.util.concurrent.TimeUnit;
22-
import org.junit.jupiter.api.AfterEach;
2323
import org.junit.jupiter.api.Assertions;
2424
import org.junit.jupiter.api.BeforeEach;
2525
import org.junit.jupiter.api.Test;
@@ -31,11 +31,6 @@ public void setUp() {
3131
NodeRangeIndexCache.getInstance().clear();
3232
}
3333

34-
@AfterEach
35-
public void tearDown() {
36-
NodeRangeIndexCache.getInstance().clear();
37-
}
38-
3934
@Test
4035
public void testIndex() {
4136
int node0 = 32;
@@ -52,43 +47,47 @@ public void testIndex() {
5247
new RangeIndex(50, 100, object0),
5348
new RangeIndex(150, 250, object1),
5449
new RangeIndex(300, 400, object2)));
50+
51+
MockTime time = new MockTime();
5552
// refresh cache
56-
NodeRangeIndexCache.getInstance().searchObjectId(node0, stream0, 50, () -> CompletableFuture.completedFuture(streamRangeMap0));
53+
NodeRangeIndexCache.getInstance().searchObjectId(node0, stream0, 50,
54+
() -> CompletableFuture.completedFuture(streamRangeMap0), time);
5755

5856
Assertions.assertTrue(NodeRangeIndexCache.getInstance().isValid(node0));
5957
Assertions.assertFalse(NodeRangeIndexCache.getInstance().isValid(node1));
6058
Assertions.assertEquals(-1, NodeRangeIndexCache.getInstance().searchObjectId(node1, stream0, 50,
61-
() -> CompletableFuture.completedFuture(Collections.emptyMap())).join());
59+
() -> CompletableFuture.completedFuture(Collections.emptyMap()), time).join());
6260
Assertions.assertEquals(-1, NodeRangeIndexCache.getInstance().searchObjectId(node0, stream1, 50,
63-
() -> CompletableFuture.completedFuture(streamRangeMap0)).join());
61+
() -> CompletableFuture.completedFuture(streamRangeMap0), time).join());
6462
Assertions.assertEquals(-1, NodeRangeIndexCache.getInstance().searchObjectId(node0, stream0, 0,
65-
() -> CompletableFuture.completedFuture(streamRangeMap0)).join());
63+
() -> CompletableFuture.completedFuture(streamRangeMap0), time).join());
6664
Assertions.assertEquals(object0, NodeRangeIndexCache.getInstance().searchObjectId(node0, stream0, 50,
67-
() -> CompletableFuture.completedFuture(streamRangeMap0)).join());
65+
() -> CompletableFuture.completedFuture(streamRangeMap0), time).join());
6866
Assertions.assertEquals(object0, NodeRangeIndexCache.getInstance().searchObjectId(node0, stream0, 100,
69-
() -> CompletableFuture.completedFuture(streamRangeMap0)).join());
67+
() -> CompletableFuture.completedFuture(streamRangeMap0), time).join());
7068
Assertions.assertEquals(object1, NodeRangeIndexCache.getInstance().searchObjectId(node0, stream0, 200,
71-
() -> CompletableFuture.completedFuture(streamRangeMap0)).join());
69+
() -> CompletableFuture.completedFuture(streamRangeMap0), time).join());
7270
Assertions.assertEquals(object2, NodeRangeIndexCache.getInstance().searchObjectId(node0, stream0, 300,
73-
() -> CompletableFuture.completedFuture(streamRangeMap0)).join());
71+
() -> CompletableFuture.completedFuture(streamRangeMap0), time).join());
7472
Assertions.assertEquals(object2, NodeRangeIndexCache.getInstance().searchObjectId(node0, stream0, 500,
75-
() -> CompletableFuture.completedFuture(streamRangeMap0)).join());
73+
() -> CompletableFuture.completedFuture(streamRangeMap0), time).join());
7674

7775
NodeRangeIndexCache.getInstance().invalidate(node0);
76+
time.setCurrentTimeMs(time.milliseconds() + NodeRangeIndexCache.MIN_CACHE_UPDATE_INTERVAL_MS);
7877
Map<Long, List<RangeIndex>> streamRangeMap1 = Map.of(stream0, List.of(
7978
new RangeIndex(50, 300, object3),
8079
new RangeIndex(500, 600, object4)));
8180
Assertions.assertFalse(NodeRangeIndexCache.getInstance().isValid(node0));
8281
Assertions.assertEquals(-1, NodeRangeIndexCache.getInstance().searchObjectId(node0, stream0, 0,
83-
() -> CompletableFuture.completedFuture(streamRangeMap1)).join());
82+
() -> CompletableFuture.completedFuture(streamRangeMap1), time).join());
8483
Assertions.assertEquals(object3, NodeRangeIndexCache.getInstance().searchObjectId(node0, stream0, 50,
85-
() -> CompletableFuture.completedFuture(streamRangeMap1)).join());
84+
() -> CompletableFuture.completedFuture(streamRangeMap1), time).join());
8685
Assertions.assertEquals(object3, NodeRangeIndexCache.getInstance().searchObjectId(node0, stream0, 400,
87-
() -> CompletableFuture.completedFuture(streamRangeMap1)).join());
86+
() -> CompletableFuture.completedFuture(streamRangeMap1), time).join());
8887
Assertions.assertEquals(object4, NodeRangeIndexCache.getInstance().searchObjectId(node0, stream0, 500,
89-
() -> CompletableFuture.completedFuture(streamRangeMap1)).join());
88+
() -> CompletableFuture.completedFuture(streamRangeMap1), time).join());
9089
Assertions.assertEquals(object4, NodeRangeIndexCache.getInstance().searchObjectId(node0, stream0, 1000,
91-
() -> CompletableFuture.completedFuture(streamRangeMap1)).join());
90+
() -> CompletableFuture.completedFuture(streamRangeMap1), time).join());
9291
}
9392

9493
@Test
@@ -106,6 +105,32 @@ public void testLRUCache() throws InterruptedException {
106105
Assertions.assertTrue(NodeRangeIndexCache.getInstance().cache().totalSize() - 100 * 1024 * 1024 <= 1000 * Long.BYTES);
107106
}
108107

108+
@Test
109+
public void testUpdateCacheFrequency() {
110+
int node0 = 32;
111+
long stream0 = 0;
112+
int object0 = 99;
113+
Map<Long, List<RangeIndex>> streamRangeMap0 = Map.of(stream0, List.of(
114+
new RangeIndex(50, 100, object0)));
115+
116+
MockTime mockTime = new MockTime();
117+
// refresh cache
118+
Assertions.assertEquals(object0, NodeRangeIndexCache.getInstance().searchObjectId(node0, stream0, 50,
119+
() -> CompletableFuture.completedFuture(streamRangeMap0), mockTime).join());
120+
NodeRangeIndexCache.getInstance().invalidate(node0);
121+
122+
CompletableFuture<Long> cf = NodeRangeIndexCache.getInstance().searchObjectId(node0, stream0, 50,
123+
() -> CompletableFuture.completedFuture(streamRangeMap0), mockTime);
124+
Assertions.assertTrue(cf.isDone());
125+
Assertions.assertEquals(-1L, cf.join());
126+
127+
mockTime.setCurrentTimeMs(mockTime.milliseconds() + NodeRangeIndexCache.MIN_CACHE_UPDATE_INTERVAL_MS);
128+
cf = NodeRangeIndexCache.getInstance().searchObjectId(node0, stream0, 50,
129+
() -> CompletableFuture.completedFuture(streamRangeMap0), mockTime);
130+
Assertions.assertTrue(cf.isDone());
131+
Assertions.assertEquals(object0, cf.join());
132+
}
133+
109134
private List<RangeIndex> createRangeIndex(int size) {
110135
List<RangeIndex> index = new ArrayList<>();
111136
int curr = 0;

0 commit comments

Comments
 (0)