Skip to content

Commit 59a92fa

Browse files
authored
fix(streamReader): implement scheduled cleanup for expired stream readers (#2719)
* fix(streamReader): implement scheduled cleanup for expired stream readers * fix(streamReader): implement scheduled cleanup for expired stream readers * fix(streamReader): add missing import statements in StreamReaders and StreamReadersTest * fix(StreamReadersTest): improve test setup and cleanup logic for stream readers * test(StreamReadersTest): update expired stream reader cleanup test for manual trigger and faster execution * style(StreamReadersTest): remove extra blank line in import statements * test(StreamReadersTest): use reflection to simulate expired stream readers for faster cleanup testing Signed-off-by: Gezi-lzq <[email protected]> * refactor(StreamReader, StreamReaders): inject Time for testable time control and remove reflection from tests - Add Time dependency to StreamReader and StreamReaders for time-related operations - Update constructors to accept Time, defaulting to Time.SYSTEM - Replace System.currentTimeMillis() with time.milliseconds() throughout - Refactor StreamReadersTest to use MockTime for simulating time passage - Remove reflection-based time manipulation in tests for cleaner and safer testing --------- Signed-off-by: Gezi-lzq <[email protected]>
1 parent 06eef9a commit 59a92fa

File tree

3 files changed

+256
-8
lines changed

3 files changed

+256
-8
lines changed

s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.automq.stream.s3.objects.ObjectManager;
3535
import com.automq.stream.utils.FutureUtil;
3636
import com.automq.stream.utils.LogSuppressor;
37+
import com.automq.stream.utils.Time;
3738
import com.automq.stream.utils.threads.EventLoop;
3839
import com.google.common.annotations.VisibleForTesting;
3940

@@ -80,19 +81,27 @@
8081
private final ObjectManager objectManager;
8182
private final ObjectReaderFactory objectReaderFactory;
8283
private final DataBlockCache dataBlockCache;
84+
private final Time time;
8385
long nextReadOffset;
8486
private CompletableFuture<Void> inflightLoadIndexCf;
8587
private volatile CompletableFuture<Void> afterReadTryReadaheadCf;
86-
private long lastAccessTimestamp = System.currentTimeMillis();
88+
private long lastAccessTimestamp;
8789
private boolean reading = false;
8890

8991
private boolean closed = false;
9092

9193
public StreamReader(long streamId, long nextReadOffset, EventLoop eventLoop, ObjectManager objectManager,
9294
ObjectReaderFactory objectReaderFactory, DataBlockCache dataBlockCache) {
95+
this(streamId, nextReadOffset, eventLoop, objectManager, objectReaderFactory, dataBlockCache, Time.SYSTEM);
96+
}
97+
98+
public StreamReader(long streamId, long nextReadOffset, EventLoop eventLoop, ObjectManager objectManager,
99+
ObjectReaderFactory objectReaderFactory, DataBlockCache dataBlockCache, Time time) {
93100
this.streamId = streamId;
94101
this.nextReadOffset = nextReadOffset;
95102
this.readahead = new Readahead();
103+
this.time = time;
104+
this.lastAccessTimestamp = time.milliseconds();
96105

97106
this.eventLoop = eventLoop;
98107
this.objectManager = objectManager;
@@ -117,7 +126,7 @@ public CompletableFuture<ReadDataBlock> read(long startOffset, long endOffset, i
117126
}
118127

119128
CompletableFuture<ReadDataBlock> read(long startOffset, long endOffset, int maxBytes, int leftRetries) {
120-
lastAccessTimestamp = System.currentTimeMillis();
129+
lastAccessTimestamp = time.milliseconds();
121130
ReadContext readContext = new ReadContext();
122131
read0(readContext, startOffset, endOffset, maxBytes);
123132
CompletableFuture<ReadDataBlock> retCf = new CompletableFuture<>();
@@ -617,7 +626,7 @@ class Readahead {
617626
private int cacheMissCount;
618627

619628
public void tryReadahead(boolean cacheMiss) {
620-
if (System.currentTimeMillis() - resetTimestamp < READAHEAD_RESET_COLD_DOWN_MILLS) {
629+
if (time.milliseconds() - resetTimestamp < READAHEAD_RESET_COLD_DOWN_MILLS) {
621630
// skip readahead when readahead is in cold down
622631
return;
623632
}
@@ -660,7 +669,7 @@ public void tryReadahead(boolean cacheMiss) {
660669

661670
public void reset() {
662671
requireReset = true;
663-
resetTimestamp = System.currentTimeMillis();
672+
resetTimestamp = time.milliseconds();
664673
}
665674
}
666675

s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReaders.java

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@
2626
import com.automq.stream.s3.trace.context.TraceContext;
2727
import com.automq.stream.utils.FutureUtil;
2828
import com.automq.stream.utils.Systems;
29+
import com.automq.stream.utils.Threads;
30+
import com.automq.stream.utils.Time;
2931
import com.automq.stream.utils.threads.EventLoop;
32+
import com.google.common.annotations.VisibleForTesting;
3033

3134
import org.slf4j.Logger;
3235
import org.slf4j.LoggerFactory;
@@ -43,6 +46,7 @@ public class StreamReaders implements S3BlockCache {
4346
private static final long STREAM_READER_EXPIRED_MILLS = TimeUnit.MINUTES.toMillis(1);
4447
private static final long STREAM_READER_EXPIRED_CHECK_INTERVAL_MILLS = TimeUnit.MINUTES.toMillis(1);
4548
private final Cache[] caches;
49+
private final Time time;
4650
private final DataBlockCache dataBlockCache;
4751
private final ObjectReaderFactory objectReaderFactory;
4852

@@ -51,11 +55,17 @@ public class StreamReaders implements S3BlockCache {
5155

5256
public StreamReaders(long size, ObjectManager objectManager, ObjectStorage objectStorage,
5357
ObjectReaderFactory objectReaderFactory) {
54-
this(size, objectManager, objectStorage, objectReaderFactory, Systems.CPU_CORES);
58+
this(size, objectManager, objectStorage, objectReaderFactory, Systems.CPU_CORES, Time.SYSTEM);
5559
}
5660

5761
public StreamReaders(long size, ObjectManager objectManager, ObjectStorage objectStorage,
5862
ObjectReaderFactory objectReaderFactory, int concurrency) {
63+
this(size, objectManager, objectStorage, objectReaderFactory, concurrency, Time.SYSTEM);
64+
}
65+
66+
public StreamReaders(long size, ObjectManager objectManager, ObjectStorage objectStorage,
67+
ObjectReaderFactory objectReaderFactory, int concurrency, Time time) {
68+
this.time = time;
5969
EventLoop[] eventLoops = new EventLoop[concurrency];
6070
for (int i = 0; i < concurrency; i++) {
6171
eventLoops[i] = new EventLoop("stream-reader-" + i);
@@ -69,6 +79,11 @@ public StreamReaders(long size, ObjectManager objectManager, ObjectStorage objec
6979
this.objectReaderFactory = objectReaderFactory;
7080
this.objectManager = objectManager;
7181
this.objectStorage = objectStorage;
82+
83+
Threads.COMMON_SCHEDULER.scheduleAtFixedRate(this::triggerExpiredStreamReaderCleanup,
84+
STREAM_READER_EXPIRED_CHECK_INTERVAL_MILLS,
85+
STREAM_READER_EXPIRED_CHECK_INTERVAL_MILLS,
86+
TimeUnit.MILLISECONDS);
7287
}
7388

7489
@Override
@@ -78,6 +93,26 @@ public CompletableFuture<ReadDataBlock> read(TraceContext context, long streamId
7893
return cache.read(streamId, startOffset, endOffset, maxBytes);
7994
}
8095

96+
/**
97+
* Get the total number of active StreamReaders across all caches.
98+
* This method is intended for testing purposes only.
99+
*/
100+
@VisibleForTesting
101+
int getActiveStreamReaderCount() {
102+
int total = 0;
103+
for (Cache cache : caches) {
104+
total += cache.getStreamReaderCount();
105+
}
106+
return total;
107+
}
108+
109+
@VisibleForTesting
110+
void triggerExpiredStreamReaderCleanup() {
111+
for (Cache cache : caches) {
112+
cache.submitCleanupExpiredStreamReader();
113+
}
114+
}
115+
81116
static class StreamReaderKey {
82117
final long streamId;
83118
final long startOffset;
@@ -114,10 +149,11 @@ public String toString() {
114149
class Cache {
115150
private final EventLoop eventLoop;
116151
private final Map<StreamReaderKey, StreamReader> streamReaders = new HashMap<>();
117-
private long lastStreamReaderExpiredCheckTime = System.currentTimeMillis();
152+
private long lastStreamReaderExpiredCheckTime;
118153

119154
public Cache(EventLoop eventLoop) {
120155
this.eventLoop = eventLoop;
156+
this.lastStreamReaderExpiredCheckTime = time.milliseconds();
121157
}
122158

123159
public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset,
@@ -129,7 +165,7 @@ public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset,
129165
StreamReaderKey key = new StreamReaderKey(streamId, startOffset);
130166
StreamReader streamReader = streamReaders.remove(key);
131167
if (streamReader == null) {
132-
streamReader = new StreamReader(streamId, startOffset, eventLoop, objectManager, objectReaderFactory, dataBlockCache);
168+
streamReader = new StreamReader(streamId, startOffset, eventLoop, objectManager, objectReaderFactory, dataBlockCache, time);
133169
}
134170
StreamReader finalStreamReader = streamReader;
135171
CompletableFuture<ReadDataBlock> streamReadCf = streamReader.read(startOffset, endOffset, maxBytes)
@@ -150,8 +186,21 @@ public CompletableFuture<ReadDataBlock> read(long streamId, long startOffset,
150186
return cf;
151187
}
152188

189+
private void submitCleanupExpiredStreamReader() {
190+
eventLoop.execute(this::cleanupExpiredStreamReader);
191+
}
192+
193+
/**
194+
* Get the number of StreamReaders in this cache.
195+
* This method is intended for testing purposes only.
196+
*/
197+
@VisibleForTesting
198+
int getStreamReaderCount() {
199+
return streamReaders.size();
200+
}
201+
153202
private void cleanupExpiredStreamReader() {
154-
long now = System.currentTimeMillis();
203+
long now = time.milliseconds();
155204
if (now > lastStreamReaderExpiredCheckTime + STREAM_READER_EXPIRED_CHECK_INTERVAL_MILLS) {
156205
lastStreamReaderExpiredCheckTime = now;
157206
Iterator<Map.Entry<StreamReaderKey, StreamReader>> it = streamReaders.entrySet().iterator();
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
/*
2+
* Copyright 2024, AutoMQ HK Limited.
3+
*
4+
* The use of this file is governed by the Business Source License,
5+
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
6+
*
7+
* As of the Change Date specified in that file, in accordance with
8+
* the Business Source License, use of this software will be governed
9+
* by the Apache License, Version 2.0
10+
*/
11+
12+
package com.automq.stream.s3.cache.blockcache;
13+
14+
import com.automq.stream.s3.ObjectReader;
15+
import com.automq.stream.s3.TestUtils;
16+
import com.automq.stream.s3.cache.ReadDataBlock;
17+
import com.automq.stream.s3.metadata.S3ObjectMetadata;
18+
import com.automq.stream.s3.model.StreamRecordBatch;
19+
import com.automq.stream.s3.objects.ObjectManager;
20+
import com.automq.stream.s3.operator.MemoryObjectStorage;
21+
import com.automq.stream.s3.operator.ObjectStorage;
22+
import com.automq.stream.s3.trace.context.TraceContext;
23+
import com.automq.stream.utils.MockTime;
24+
25+
import org.junit.jupiter.api.AfterEach;
26+
import org.junit.jupiter.api.BeforeEach;
27+
import org.junit.jupiter.api.Tag;
28+
import org.junit.jupiter.api.Test;
29+
30+
import java.util.HashMap;
31+
import java.util.List;
32+
import java.util.Map;
33+
import java.util.concurrent.CompletableFuture;
34+
import java.util.concurrent.TimeUnit;
35+
36+
import static org.awaitility.Awaitility.await;
37+
import static org.junit.jupiter.api.Assertions.assertEquals;
38+
import static org.mockito.ArgumentMatchers.anyInt;
39+
import static org.mockito.ArgumentMatchers.anyLong;
40+
import static org.mockito.ArgumentMatchers.eq;
41+
import static org.mockito.Mockito.mock;
42+
import static org.mockito.Mockito.when;
43+
44+
@Tag("S3Unit")
45+
public class StreamReadersTest {
46+
private static final long STREAM_ID_1 = 100L;
47+
private static final long STREAM_ID_2 = 200L;
48+
private static final int BLOCK_SIZE_THRESHOLD = 1024;
49+
50+
private Map<Long, MockObject> objects;
51+
private ObjectManager objectManager;
52+
private ObjectStorage objectStorage;
53+
private ObjectReaderFactory objectReaderFactory;
54+
private StreamReaders streamReaders;
55+
private MockTime mockTime;
56+
57+
@BeforeEach
58+
void setup() {
59+
objects = new HashMap<>();
60+
61+
// Create mock objects for testing with different offset ranges
62+
// Object 1: STREAM_ID_1 offset 0-2
63+
objects.put(1L, MockObject.builder(1L, BLOCK_SIZE_THRESHOLD).write(STREAM_ID_1, List.of(
64+
new StreamRecordBatch(STREAM_ID_1, 0, 0, 2, TestUtils.random(100))
65+
)).build());
66+
// Object 2: STREAM_ID_2 offset 0-1
67+
objects.put(2L, MockObject.builder(2L, BLOCK_SIZE_THRESHOLD).write(STREAM_ID_2, List.of(
68+
new StreamRecordBatch(STREAM_ID_2, 0, 0, 1, TestUtils.random(100))
69+
)).build());
70+
71+
objectManager = mock(ObjectManager.class);
72+
73+
when(objectManager.isObjectExist(anyLong())).thenReturn(true);
74+
// Mock getObjects method to return appropriate objects based on offset ranges
75+
// For STREAM_ID_1, use the combined object that covers 0-2 range
76+
when(objectManager.getObjects(eq(STREAM_ID_1), anyLong(), anyLong(), anyInt()))
77+
.thenReturn(CompletableFuture.completedFuture(List.of(objects.get(1L).metadata)));
78+
// STREAM_ID_2 offset 0-1 -> object 3
79+
when(objectManager.getObjects(eq(STREAM_ID_2), anyLong(), anyLong(), anyInt()))
80+
.thenReturn(CompletableFuture.completedFuture(List.of(objects.get(2L).metadata)));
81+
82+
objectStorage = new MemoryObjectStorage();
83+
84+
objectReaderFactory = new ObjectReaderFactory() {
85+
@Override
86+
public ObjectReader get(S3ObjectMetadata metadata) {
87+
return objects.get(metadata.objectId()).objectReader();
88+
}
89+
90+
@Override
91+
public ObjectStorage getObjectStorage() {
92+
return objectStorage;
93+
}
94+
};
95+
96+
mockTime = new MockTime();
97+
streamReaders = new StreamReaders(Long.MAX_VALUE, objectManager, objectStorage, objectReaderFactory, 2, mockTime);
98+
}
99+
100+
@AfterEach
101+
void tearDown() {
102+
if (streamReaders != null) {
103+
// Clean up resources
104+
streamReaders = null;
105+
}
106+
}
107+
108+
@Test
109+
public void testStreamReaderCreationAndReuse() throws Exception {
110+
TraceContext context = TraceContext.DEFAULT;
111+
112+
// Initially no StreamReaders
113+
assertEquals(0, streamReaders.getActiveStreamReaderCount());
114+
115+
// Create first StreamReader
116+
CompletableFuture<ReadDataBlock> readFuture1 = streamReaders.read(context, STREAM_ID_1, 0, 1, Integer.MAX_VALUE);
117+
ReadDataBlock result1 = readFuture1.get(5, TimeUnit.SECONDS);
118+
result1.getRecords().forEach(StreamRecordBatch::release);
119+
120+
assertEquals(1, streamReaders.getActiveStreamReaderCount());
121+
122+
// Read from same stream again - should reuse existing StreamReader
123+
CompletableFuture<ReadDataBlock> readFuture2 = streamReaders.read(context, STREAM_ID_1, 1, 2, Integer.MAX_VALUE);
124+
ReadDataBlock result2 = readFuture2.get(5, TimeUnit.SECONDS);
125+
result2.getRecords().forEach(StreamRecordBatch::release);
126+
127+
// Should still have 1 StreamReader (reused)
128+
assertEquals(1, streamReaders.getActiveStreamReaderCount());
129+
}
130+
131+
@Test
132+
public void testCleanupTrigger() throws Exception {
133+
TraceContext context = TraceContext.DEFAULT;
134+
135+
// Create some StreamReaders
136+
CompletableFuture<ReadDataBlock> readFuture1 = streamReaders.read(context, STREAM_ID_1, 0, 1, Integer.MAX_VALUE);
137+
ReadDataBlock result1 = readFuture1.get(5, TimeUnit.SECONDS);
138+
result1.getRecords().forEach(StreamRecordBatch::release);
139+
140+
CompletableFuture<ReadDataBlock> readFuture2 = streamReaders.read(context, STREAM_ID_2, 0, 1, Integer.MAX_VALUE);
141+
ReadDataBlock result2 = readFuture2.get(5, TimeUnit.SECONDS);
142+
result2.getRecords().forEach(StreamRecordBatch::release);
143+
144+
assertEquals(2, streamReaders.getActiveStreamReaderCount());
145+
146+
// Trigger cleanup - should not affect non-expired readers
147+
streamReaders.triggerExpiredStreamReaderCleanup();
148+
149+
// Wait for async cleanup to complete
150+
await().atMost(1, TimeUnit.SECONDS)
151+
.pollInterval(100, TimeUnit.MILLISECONDS)
152+
.until(() -> streamReaders.getActiveStreamReaderCount() == 2);
153+
154+
// StreamReaders should still be there (not expired yet)
155+
assertEquals(2, streamReaders.getActiveStreamReaderCount());
156+
}
157+
158+
@Test
159+
public void testExpiredStreamReaderCleanupExecution() throws Exception {
160+
TraceContext context = TraceContext.DEFAULT;
161+
162+
// Create a StreamReader
163+
CompletableFuture<ReadDataBlock> readFuture = streamReaders.read(context, STREAM_ID_1, 0, 1, Integer.MAX_VALUE);
164+
ReadDataBlock result = readFuture.get(5, TimeUnit.SECONDS);
165+
result.getRecords().forEach(StreamRecordBatch::release);
166+
167+
assertEquals(1, streamReaders.getActiveStreamReaderCount());
168+
169+
// Advance mock time to simulate expiration (advance by 2 minutes, expiration is 1 minute)
170+
mockTime.sleep(TimeUnit.MINUTES.toMillis(2));
171+
172+
// Trigger cleanup - should now clean up expired StreamReaders
173+
streamReaders.triggerExpiredStreamReaderCleanup();
174+
175+
// Wait for async cleanup to complete
176+
await().atMost(5, TimeUnit.SECONDS)
177+
.pollInterval(100, TimeUnit.MILLISECONDS)
178+
.until(() -> streamReaders.getActiveStreamReaderCount() == 0);
179+
180+
// Verify system still works after cleanup
181+
CompletableFuture<ReadDataBlock> readFuture2 = streamReaders.read(context, STREAM_ID_2, 0, 1, Integer.MAX_VALUE);
182+
ReadDataBlock result2 = readFuture2.get(5, TimeUnit.SECONDS);
183+
result2.getRecords().forEach(StreamRecordBatch::release);
184+
185+
assertEquals(1, streamReaders.getActiveStreamReaderCount());
186+
}
187+
188+
189+
190+
}

0 commit comments

Comments
 (0)