Skip to content

Commit 9b740d2

Browse files
authored
feat(metadata): limit the size of sparse index cache (#1832)
Signed-off-by: Shichao Nie <[email protected]>
1 parent 660d582 commit 9b740d2

File tree

8 files changed

+294
-96
lines changed

8 files changed

+294
-96
lines changed

metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,13 +183,13 @@ public CompletableFuture<ByteBuf> readNodeRangeIndex(long nodeId) {
183183
Map<Long, SparseRangeIndex> streamRangeIndexMap;
184184
if (nodeId == BROKER0) {
185185
streamRangeIndexMap = Map.of(
186-
STREAM0, new SparseRangeIndex(2, 1, List.of(
186+
STREAM0, new SparseRangeIndex(2, List.of(
187187
new RangeIndex(100, 120, 0),
188188
new RangeIndex(180, 200, 2),
189189
new RangeIndex(520, 600, 4))));
190190
} else {
191191
streamRangeIndexMap = Map.of(
192-
STREAM0, new SparseRangeIndex(2, 1, List.of(
192+
STREAM0, new SparseRangeIndex(2, List.of(
193193
new RangeIndex(140, 160, 5),
194194
new RangeIndex(420, 520, 7),
195195
// objectId 8 is not exist (compacted)

s3stream/src/main/java/com/automq/stream/s3/compact/CompactionManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ private void compactObjects(List<StreamMetadata> streamMetadataList, List<S3Obje
373373
logger.info("No stream set objects to compact");
374374
return;
375375
}
376-
logger.info("Build compact request for {} stream set objects complete, stream set object id: {}, stresam set object size: {}, stream object num: {}, time cost: {}, start committing objects",
376+
logger.info("Build compact request for {} stream set objects complete, stream set object id: {}, stream set object size: {}, stream object num: {}, time cost: {}, start committing objects",
377377
request.getCompactedObjectIds().size(), request.getObjectId(), request.getObjectSize(), request.getStreamObjects().size(), timerUtil.elapsedAs(TimeUnit.MILLISECONDS));
378378
timerUtil.reset();
379379
objectManager.commitStreamSetObject(request)

s3stream/src/main/java/com/automq/stream/s3/index/LocalStreamRangeIndexCache.java

Lines changed: 56 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -46,33 +46,31 @@
4646
public class LocalStreamRangeIndexCache implements S3StreamClient.StreamLifeCycleListener {
4747
private static final short VERSION = 0;
4848
private static final Logger LOGGER = LoggerFactory.getLogger(LocalStreamRangeIndexCache.class);
49-
private static final int COMPACT_NUM = Systems.getEnvInt("AUTOMQ_STREAM_RANGE_INDEX_COMPACT_NUM", 5);
50-
private static final int SPARSE_PADDING = Systems.getEnvInt("AUTOMQ_STREAM_RANGE_INDEX_SPARSE_PADDING", 1);
49+
private static final int COMPACT_NUM = Systems.getEnvInt("AUTOMQ_STREAM_RANGE_INDEX_COMPACT_NUM", 3);
50+
public static final int MAX_INDEX_SIZE = Systems.getEnvInt("AUTOMQ_STREAM_RANGE_INDEX_MAX_SIZE", 5 * 1024 * 1024);
5151
private final Map<Long, SparseRangeIndex> streamRangeIndexMap = new HashMap<>();
5252
private final ReadWriteLock lock = new ReentrantReadWriteLock();
5353
private final Lock readLock = lock.readLock();
5454
private final Lock writeLock = lock.writeLock();
5555
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(
5656
ThreadUtils.createThreadFactory("upload-index", true));
5757
private final Queue<CompletableFuture<Void>> uploadQueue = new LinkedList<>();
58+
private final CompletableFuture<Void> initCf = new CompletableFuture<>();
5859
private long nodeId = -1;
5960
private ObjectStorage objectStorage;
60-
private CompletableFuture<Void> initCf = new CompletableFuture<>();
61+
private int totalSize = 0;
6162

6263
public void start() {
6364
executorService.scheduleAtFixedRate(this::batchUpload, 0, 10, TimeUnit.MILLISECONDS);
6465
executorService.scheduleAtFixedRate(this::flush, 1, 1, TimeUnit.MINUTES);
6566
}
6667

67-
// for test
68-
void reset() {
69-
writeLock.lock();
70-
try {
71-
streamRangeIndexMap.clear();
72-
initCf = new CompletableFuture<>();
73-
} finally {
74-
writeLock.unlock();
75-
}
68+
public int totalSize() {
69+
return totalSize;
70+
}
71+
72+
CompletableFuture<Void> initCf() {
73+
return initCf;
7674
}
7775

7876
// test only
@@ -158,7 +156,8 @@ public void init(int nodeId, ObjectStorage objectStorage) {
158156
writeLock.lock();
159157
try {
160158
for (Map.Entry<Long, List<RangeIndex>> entry : LocalStreamRangeIndexCache.fromBuffer(data).entrySet()) {
161-
this.streamRangeIndexMap.put(entry.getKey(), new SparseRangeIndex(COMPACT_NUM, SPARSE_PADDING, entry.getValue()));
159+
this.streamRangeIndexMap.put(entry.getKey(), new SparseRangeIndex(COMPACT_NUM, entry.getValue()));
160+
this.totalSize += entry.getValue().size() * RangeIndex.OBJECT_SIZE;
162161
}
163162
} finally {
164163
writeLock.unlock();
@@ -207,16 +206,44 @@ public CompletableFuture<Void> append(Map<Long, RangeIndex> rangeIndexMap) {
207206
for (Map.Entry<Long, RangeIndex> entry : rangeIndexMap.entrySet()) {
208207
long streamId = entry.getKey();
209208
RangeIndex rangeIndex = entry.getValue();
210-
streamRangeIndexMap.computeIfAbsent(streamId,
211-
k -> new SparseRangeIndex(COMPACT_NUM, SPARSE_PADDING)).append(rangeIndex);
209+
totalSize += streamRangeIndexMap.computeIfAbsent(streamId,
210+
k -> new SparseRangeIndex(COMPACT_NUM)).append(rangeIndex);
212211
}
212+
evictIfNecessary();
213213
} finally {
214214
writeLock.unlock();
215215
}
216216
return null;
217217
});
218218
}
219219

220+
private void evictIfNecessary() {
221+
if (totalSize <= MAX_INDEX_SIZE) {
222+
return;
223+
}
224+
boolean evicted = false;
225+
boolean hasSufficientIndex = true;
226+
List<SparseRangeIndex> streamRangeIndexList = new ArrayList<>(streamRangeIndexMap.values());
227+
Collections.shuffle(streamRangeIndexList);
228+
while (totalSize > MAX_INDEX_SIZE) {
229+
// try to evict from each stream in round-robin manner
230+
for (SparseRangeIndex sparseRangeIndex : streamRangeIndexList) {
231+
if (sparseRangeIndex.length() <= 1 + COMPACT_NUM && hasSufficientIndex) {
232+
// skip evict if there is still sufficient stream to be evicted
233+
continue;
234+
}
235+
totalSize -= sparseRangeIndex.evictOnce();
236+
evicted = true;
237+
if (totalSize <= MAX_INDEX_SIZE) {
238+
break;
239+
}
240+
}
241+
if (!evicted) {
242+
hasSufficientIndex = false;
243+
}
244+
}
245+
}
246+
220247
public CompletableFuture<Void> compact(Map<Long, RangeIndex> rangeIndexMap, Set<Long> compactedObjectIds) {
221248
return exec(() -> {
222249
writeLock.lock();
@@ -225,8 +252,8 @@ public CompletableFuture<Void> compact(Map<Long, RangeIndex> rangeIndexMap, Set<
225252
Iterator<Map.Entry<Long, SparseRangeIndex>> iterator = streamRangeIndexMap.entrySet().iterator();
226253
while (iterator.hasNext()) {
227254
Map.Entry<Long, SparseRangeIndex> entry = iterator.next();
228-
entry.getValue().compact(null, compactedObjectIds);
229-
if (entry.getValue().size() == 0) {
255+
totalSize += entry.getValue().compact(null, compactedObjectIds);
256+
if (entry.getValue().length() == 0) {
230257
iterator.remove();
231258
}
232259
}
@@ -237,10 +264,10 @@ public CompletableFuture<Void> compact(Map<Long, RangeIndex> rangeIndexMap, Set<
237264
RangeIndex rangeIndex = entry.getValue();
238265
streamRangeIndexMap.compute(streamId, (k, v) -> {
239266
if (v == null) {
240-
v = new SparseRangeIndex(COMPACT_NUM, SPARSE_PADDING);
267+
v = new SparseRangeIndex(COMPACT_NUM);
241268
}
242-
v.compact(rangeIndex, compactedObjectIds);
243-
if (v.size() == 0) {
269+
totalSize += v.compact(rangeIndex, compactedObjectIds);
270+
if (v.length() == 0) {
244271
// remove stream with empty index
245272
return null;
246273
}
@@ -270,11 +297,7 @@ public CompletableFuture<Void> updateIndexFromRequest(CommitStreamSetObjectReque
270297
}
271298

272299
public static ByteBuf toBuffer(Map<Long, SparseRangeIndex> streamRangeIndexMap) {
273-
int capacity = Short.BYTES // version
274-
+ Integer.BYTES // stream num
275-
+ streamRangeIndexMap.values().stream().mapToInt(index -> Long.BYTES // stream id
276-
+ Integer.BYTES // range index num
277-
+ index.getRangeIndexList().size() * (3 * Long.BYTES)).sum();
300+
int capacity = bufferSize(streamRangeIndexMap);
278301
ByteBuf buffer = ByteBufAlloc.byteBuffer(capacity);
279302
try {
280303
buffer.writeShort(VERSION);
@@ -295,6 +318,14 @@ public static ByteBuf toBuffer(Map<Long, SparseRangeIndex> streamRangeIndexMap)
295318
return buffer;
296319
}
297320

321+
private static int bufferSize(Map<Long, SparseRangeIndex> streamRangeIndexMap) {
322+
return Short.BYTES // version
323+
+ Integer.BYTES // stream num
324+
+ streamRangeIndexMap.values().stream().mapToInt(index -> Long.BYTES // stream id
325+
+ Integer.BYTES // range index num
326+
+ index.getRangeIndexList().size() * (3 * Long.BYTES)).sum();
327+
}
328+
298329
public static Map<Long, List<RangeIndex>> fromBuffer(ByteBuf data) {
299330
Map<Long, List<RangeIndex>> rangeIndexMap = new HashMap<>();
300331
short version = data.readShort();

s3stream/src/main/java/com/automq/stream/s3/index/SparseRangeIndex.java

Lines changed: 81 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,44 +22,64 @@
2222
public class SparseRangeIndex {
2323
private static final Logger LOGGER = LoggerFactory.getLogger(SparseRangeIndex.class);
2424
private final int compactNum;
25-
private final int sparsePadding;
2625
// sorted by startOffset in descending order
2726
private List<RangeIndex> sortedRangeIndexList;
28-
private long evictIndex = 0;
27+
private int size = 0;
28+
private int evictIndex = 0;
2929

30-
public SparseRangeIndex(int compactNum, int sparsePadding) {
31-
this(compactNum, sparsePadding, new ArrayList<>());
30+
public SparseRangeIndex(int compactNum) {
31+
this(compactNum, new ArrayList<>());
3232
}
3333

34-
public SparseRangeIndex(int compactNum, int sparsePadding, List<RangeIndex> sortedRangeIndexList) {
34+
public SparseRangeIndex(int compactNum, List<RangeIndex> sortedRangeIndexList) {
3535
this.compactNum = compactNum;
36-
this.sparsePadding = sparsePadding;
36+
init(sortedRangeIndexList);
37+
}
38+
39+
private void init(List<RangeIndex> sortedRangeIndexList) {
40+
if (sortedRangeIndexList == null) {
41+
sortedRangeIndexList = new ArrayList<>();
42+
}
3743
this.sortedRangeIndexList = sortedRangeIndexList;
44+
this.size = sortedRangeIndexList.size() * RangeIndex.OBJECT_SIZE;
3845
}
3946

40-
public void append(RangeIndex newRangeIndex) {
47+
/**
48+
* Append new range index to the list.
49+
* @param newRangeIndex the range index to append
50+
* @return the change of size after appending
51+
*/
52+
public int append(RangeIndex newRangeIndex) {
53+
int delta = 0;
4154
if (newRangeIndex == null) {
42-
return;
55+
return delta;
4356
}
4457
if (!this.sortedRangeIndexList.isEmpty()
4558
&& newRangeIndex.compareTo(this.sortedRangeIndexList.get(this.sortedRangeIndexList.size() - 1)) <= 0) {
4659
LOGGER.error("Unexpected new range index {}, last: {}, maybe initialized with outdated index file, " +
4760
"reset local cache", newRangeIndex, this.sortedRangeIndexList.get(this.sortedRangeIndexList.size() - 1));
61+
delta -= size;
4862
reset();
4963
}
5064
this.sortedRangeIndexList.add(newRangeIndex);
51-
evict();
65+
size += RangeIndex.OBJECT_SIZE;
66+
return delta + RangeIndex.OBJECT_SIZE;
5267
}
5368

5469
public void reset() {
55-
this.sortedRangeIndexList.clear();
56-
evictIndex = 0;
70+
init(new ArrayList<>());
5771
}
5872

59-
public void compact(RangeIndex newRangeIndex, Set<Long> compactedObjectIds) {
73+
/**
74+
* Compact the list by removing the compacted object ids and add the new range index if not null.
75+
*
76+
* @param newRangeIndex the new range index to add
77+
* @param compactedObjectIds the object ids to compact
78+
* @return the change of size after compacting
79+
*/
80+
public int compact(RangeIndex newRangeIndex, Set<Long> compactedObjectIds) {
6081
if (compactedObjectIds.isEmpty()) {
61-
append(newRangeIndex);
62-
return;
82+
return append(newRangeIndex);
6383
}
6484
List<RangeIndex> newRangeIndexList = new ArrayList<>();
6585
boolean found = false;
@@ -68,29 +88,70 @@ public void compact(RangeIndex newRangeIndex, Set<Long> compactedObjectIds) {
6888
continue;
6989
}
7090
if (newRangeIndex != null && !found && rangeIndex.compareTo(newRangeIndex) > 0) {
91+
// insert new range index into the list
7192
newRangeIndexList.add(newRangeIndex);
7293
found = true;
7394
}
7495
newRangeIndexList.add(rangeIndex);
7596
}
7697
if (newRangeIndex != null && !found) {
98+
// insert new range index into the end of the list
7799
newRangeIndexList.add(newRangeIndex);
78100
}
79-
this.sortedRangeIndexList = newRangeIndexList;
101+
int oldSize = size;
102+
init(newRangeIndexList);
103+
return size - oldSize;
80104
}
81105

82-
private void evict() {
83-
if (this.sortedRangeIndexList.size() > this.compactNum) {
84-
if (evictIndex++ % (sparsePadding + 1) == 0) {
85-
this.sortedRangeIndexList.remove(this.sortedRangeIndexList.size() - this.compactNum - 1);
106+
/**
107+
* Try to evict one range index from the list, the eviction priority for each element is:
108+
* 1. any element that's not the first and last N compacted elements
109+
* 2. the last N compacted elements
110+
* 3. the first element
111+
* <p>
112+
* For example for a list of [0, 1, 2, 3, 4, 5], compact number is 2, the eviction result will be:
113+
* <ul>
114+
* <li><code>1rst: [0, 2, 3, 4, 5]</code></li>
115+
* <li><code>2nd: [0, 2, 4, 5]</code></li>
116+
* <li><code>3rd: [0, 4, 5]</code></li>
117+
* <li><code>4th: [0, 5]</code></li>
118+
* <li><code>5th: [0]</code></li>
119+
* <li><code>6th: []</code></li>
120+
* </ul>
121+
*
122+
* @return evicted size
123+
*/
124+
public int evictOnce() {
125+
int indexToEvict = -1;
126+
if (this.sortedRangeIndexList.isEmpty()) {
127+
return 0;
128+
} else if (this.sortedRangeIndexList.size() == 1) {
129+
// evict the only element
130+
indexToEvict = 0;
131+
} else if (this.sortedRangeIndexList.size() <= (1 + compactNum)) {
132+
indexToEvict = 1;
133+
}
134+
135+
if (indexToEvict == -1) {
136+
if (evictIndex % this.sortedRangeIndexList.size() == 0
137+
|| this.evictIndex >= this.sortedRangeIndexList.size() - compactNum) {
138+
this.evictIndex = 1;
86139
}
140+
indexToEvict = evictIndex++;
87141
}
142+
this.sortedRangeIndexList.remove(indexToEvict);
143+
size -= RangeIndex.OBJECT_SIZE;
144+
return RangeIndex.OBJECT_SIZE;
88145
}
89146

90-
public int size() {
147+
public int length() {
91148
return this.sortedRangeIndexList.size();
92149
}
93150

151+
public int size() {
152+
return size;
153+
}
154+
94155
List<RangeIndex> getRangeIndexList() {
95156
return this.sortedRangeIndexList;
96157
}

s3stream/src/main/java/com/automq/stream/s3/operator/ProxyWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ protected void newLargeObjectWriter(WriteOptions writeOptions, AbstractObjectSto
120120
}
121121

122122
class ObjectWriter implements Writer {
123-
// max upload size, when object data size is larger MAX_UPLOAD_SIZE, we should use multi-part upload to upload it.
123+
// max upload size, when object data size is larger than MAX_UPLOAD_SIZE, we should use multi-part upload to upload it.
124124
static final long MAX_UPLOAD_SIZE = 32L * 1024 * 1024;
125125
CompletableFuture<Void> cf = new CompletableFuture<>();
126126
CompositeByteBuf data = ByteBufAlloc.compositeByteBuffer();

0 commit comments

Comments
 (0)