Skip to content

Commit fb9fcdd

Browse files
authored
perf(s3stream): async heavy log cache operation (#3017)
Signed-off-by: Robin Han <[email protected]>
1 parent f817095 commit fb9fcdd

File tree

2 files changed

+22
-7
lines changed

2 files changed

+22
-7
lines changed

s3stream/src/main/java/com/automq/stream/s3/cache/LogCache.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.automq.stream.s3.model.StreamRecordBatch;
2525
import com.automq.stream.s3.trace.context.TraceContext;
2626
import com.automq.stream.s3.wal.RecordOffset;
27+
import com.automq.stream.utils.Threads;
2728
import com.automq.stream.utils.biniarysearch.StreamRecordBatchList;
2829

2930
import org.slf4j.Logger;
@@ -37,7 +38,9 @@
3738
import java.util.List;
3839
import java.util.Map;
3940
import java.util.Optional;
41+
import java.util.concurrent.CompletableFuture;
4042
import java.util.concurrent.ConcurrentHashMap;
43+
import java.util.concurrent.ExecutorService;
4144
import java.util.concurrent.TimeUnit;
4245
import java.util.concurrent.atomic.AtomicInteger;
4346
import java.util.concurrent.atomic.AtomicLong;
@@ -59,6 +62,8 @@ public class LogCache {
5962
private static final Consumer<LogCacheBlock> DEFAULT_BLOCK_FREE_LISTENER = block -> {
6063
};
6164
private static final int MAX_BLOCKS_COUNT = 64;
65+
private static final ExecutorService LOG_CACHE_ASYNC_EXECUTOR = Threads.newFixedFastThreadLocalThreadPoolWithMonitor(
66+
1, "LOG_CACHE_ASYNC", true, LOGGER);
6267
static final int MERGE_BLOCK_THRESHOLD = 8;
6368
final List<LogCacheBlock> blocks = new ArrayList<>();
6469
final AtomicInteger blockCount = new AtomicInteger(1);
@@ -259,10 +264,19 @@ Optional<LogCacheBlock> archiveCurrentBlockIfContains0(long streamId) {
259264

260265
}
261266

262-
public void markFree(LogCacheBlock block) {
267+
public CompletableFuture<Void> markFree(LogCacheBlock block) {
263268
block.free = true;
264269
tryRealFree();
265-
tryMerge();
270+
CompletableFuture<Void> cf = new CompletableFuture<>();
271+
LOG_CACHE_ASYNC_EXECUTOR.execute(() -> {
272+
try {
273+
tryMerge();
274+
cf.complete(null);
275+
} catch (Throwable t) {
276+
cf.completeExceptionally(t);
277+
}
278+
});
279+
return cf;
266280
}
267281

268282
private void tryRealFree() {
@@ -295,10 +309,10 @@ private void tryRealFree() {
295309
writeLock.unlock();
296310
}
297311
size.addAndGet(-freeSize);
298-
removed.forEach(b -> {
312+
LOG_CACHE_ASYNC_EXECUTOR.execute(() -> removed.forEach(b -> {
299313
blockFreeListener.accept(b);
300314
b.free();
301-
});
315+
}));
302316
}
303317

304318
private void tryMerge() {

s3stream/src/test/java/com/automq/stream/s3/cache/LogCacheTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import java.util.List;
3030
import java.util.Map;
31+
import java.util.concurrent.ExecutionException;
3132

3233
import static org.junit.jupiter.api.Assertions.assertEquals;
3334
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -164,7 +165,7 @@ public void testMergeBlock() {
164165
}
165166

166167
@Test
167-
public void testTryMergeLogic() {
168+
public void testTryMergeLogic() throws ExecutionException, InterruptedException {
168169
LogCache logCache = new LogCache(Long.MAX_VALUE, 10_000L);
169170
final long streamId = 233L;
170171
final int blocksToCreate = LogCache.MERGE_BLOCK_THRESHOLD + 2;
@@ -187,8 +188,8 @@ public void testTryMergeLogic() {
187188
assertEquals(leftCache.endOffset(), rightCache.startOffset());
188189

189190
// mark both blocks free to trigger tryMerge (called inside markFree)
190-
logCache.markFree(left);
191-
logCache.markFree(right);
191+
logCache.markFree(left).get();
192+
logCache.markFree(right).get();
192193

193194
int after = logCache.blocks.size();
194195
assertEquals(before - 1, after, "two adjacent free contiguous blocks should be merged into one");

0 commit comments

Comments
 (0)