Skip to content

Commit c21efa4

Browse files
committed
server-side implementation
1 parent a950a30 commit c21efa4

File tree

5 files changed

+344
-1
lines changed

5 files changed

+344
-1
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1120,6 +1120,13 @@ public class ConfigOptions {
11201120
+ CLIENT_SCANNER_LOG_FETCH_WAIT_MAX_TIME.key()
11211121
+ " time to return.");
11221122

1123+
public static final ConfigOption<Integer> CLIENT_SCANNER_KV_FETCH_MAX_BYTES =
1124+
key("client.scanner.kv.fetch.max-bytes")
1125+
.intType()
1126+
.defaultValue(4 * 1024 * 1024)
1127+
.withDescription(
1128+
"Max bytes per streaming KV scan batch (ScanKvRequest.batch_size_bytes).");
1129+
11231130
public static final ConfigOption<Integer> CLIENT_LOOKUP_QUEUE_SIZE =
11241131
key("client.lookup.queue-size")
11251132
.intType()

fluss-common/src/main/java/org/apache/fluss/record/DefaultValueRecordBatch.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,10 @@ public void append(byte[] valueBytes) throws IOException {
272272
currentRecordNumber++;
273273
}
274274

275+
public int sizeInBytes() {
276+
return sizeInBytes;
277+
}
278+
275279
public DefaultValueRecordBatch build() throws IOException {
276280
writeBatchHeader();
277281
MemorySegment segment = outputView.getMemorySegment();
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.server.kv.scan;
19+
20+
import org.rocksdb.ReadOptions;
21+
import org.rocksdb.RocksDB;
22+
import org.rocksdb.RocksIterator;
23+
import org.rocksdb.Snapshot;
24+
25+
/**
26+
* Holds server-side state for a single streaming KV scan.
27+
*
28+
* <p>A scanner is bound to exactly one bucket (table_id, partition_id, bucket_id) and owns:
29+
*
30+
* <ul>
31+
* <li>a RocksDB {@link Snapshot} providing a consistent point-in-time view
32+
* <li>a {@link RocksIterator} iterating over that snapshot
33+
* <li>sequencing state (call_seq_id) to detect missed/out-of-order batches
34+
* <li>a scan boundary {@code logOffset} captured at scanner creation
35+
* </ul>
36+
*
37+
* <p>Lifecycle: created on first ScanKvRequest; closed when exhausted, explicitly closed, or
38+
* expired by {@link ScannerManager}.
39+
*/
40+
public class ScannerContext implements AutoCloseable {
41+
private String scannerId;
42+
private RocksDB db;
43+
private Snapshot snapshot;
44+
private ReadOptions readOptions;
45+
private RocksIterator iterator;
46+
47+
private long logOffset;
48+
private long rowLimit; // -1 means unlimited
49+
50+
private long emittedRows;
51+
private int expectedCallSeqId;
52+
private volatile long lastAccessNanos;
53+
54+
ScannerContext(String scannerId, RocksDB db, long logOffset, long rowLimit, long nowNanos) {
55+
this.scannerId = scannerId;
56+
this.db = db;
57+
this.logOffset = logOffset;
58+
this.rowLimit = rowLimit;
59+
60+
this.snapshot = db.getSnapshot();
61+
this.readOptions = new ReadOptions().setSnapshot(snapshot);
62+
this.iterator = db.newIterator(readOptions);
63+
this.iterator.seekToFirst();
64+
65+
this.lastAccessNanos = nowNanos;
66+
this.emittedRows = 0L;
67+
this.expectedCallSeqId = 0;
68+
}
69+
70+
public String getScannerId() {
71+
return scannerId;
72+
}
73+
74+
public int getExpectedCallSeqId() {
75+
return expectedCallSeqId;
76+
}
77+
78+
public RocksIterator getIterator() {
79+
return iterator;
80+
}
81+
82+
public void advanceSeq() {
83+
expectedCallSeqId++;
84+
}
85+
86+
public void touch(long nowNanos) {
87+
lastAccessNanos = nowNanos;
88+
}
89+
90+
public boolean limitReached() {
91+
return rowLimit >= 0 && emittedRows >= rowLimit;
92+
}
93+
94+
public void incrementEmitted() {
95+
emittedRows++;
96+
}
97+
98+
public long getLogOffset() {
99+
return logOffset;
100+
}
101+
102+
public long getLastAccessNanos() {
103+
return lastAccessNanos;
104+
}
105+
106+
@Override
107+
public void close() {
108+
iterator.close();
109+
readOptions.close();
110+
db.releaseSnapshot(snapshot);
111+
}
112+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.server.kv.scan;
19+
20+
import org.apache.fluss.utils.MapUtils;
21+
22+
import org.rocksdb.RocksDB;
23+
24+
import java.time.Duration;
25+
import java.util.Map;
26+
import java.util.concurrent.Executors;
27+
import java.util.concurrent.ScheduledExecutorService;
28+
import java.util.concurrent.TimeUnit;
29+
30+
/**
31+
* Manages server-side KV scan scanners.
32+
*
33+
* <p>Responsibilities:
34+
*
35+
* <ul>
36+
* <li>create & lookup scanners by scannerId
37+
* <li>expire idle scanners based on {@code server.scanner.ttl}
38+
* <li>release RocksDB snapshot resources on removal
39+
* </ul>
40+
*
41+
* <p>Thread-safety: all public methods are thread-safe. Each scanner itself is single-threaded by
42+
* protocol (call_seq_id ordering), enforced at service layer.
43+
*/
44+
public class ScannerManager implements AutoCloseable {
45+
private final Map<String, ScannerContext> scanners = MapUtils.newConcurrentHashMap();
46+
private final long ttlNanos;
47+
private final ScheduledExecutorService cleanerThread;
48+
49+
public ScannerManager() {
50+
this.ttlNanos = Duration.ofMinutes(5).toNanos();
51+
this.cleanerThread =
52+
Executors.newSingleThreadScheduledExecutor(
53+
r -> {
54+
Thread t = new Thread(r, "scanner-cleaner");
55+
t.setDaemon(true);
56+
return t;
57+
});
58+
59+
// expires scanners periodically
60+
cleanerThread.scheduleAtFixedRate(this::cleanerExpired, 30, 30, TimeUnit.SECONDS);
61+
}
62+
63+
public ScannerContext create(String id, RocksDB db, long logOffset, long rowLimit) {
64+
long now = System.nanoTime();
65+
ScannerContext ctx = new ScannerContext(id, db, logOffset, rowLimit, now);
66+
scanners.put(id, ctx);
67+
return ctx;
68+
}
69+
70+
public ScannerContext get(String id) {
71+
return scanners.get(id);
72+
}
73+
74+
public void remove(String id) {
75+
ScannerContext ctx = scanners.remove(id);
76+
if (ctx != null) {
77+
ctx.close();
78+
}
79+
}
80+
81+
void cleanerExpired() {
82+
long now = System.nanoTime();
83+
for (var entry : scanners.entrySet()) {
84+
if (now - entry.getValue().getLastAccessNanos() > ttlNanos) {
85+
remove(entry.getKey());
86+
}
87+
}
88+
}
89+
90+
@Override
91+
public void close() {
92+
cleanerThread.shutdownNow();
93+
scanners.forEach(
94+
(id, ctx) -> {
95+
try {
96+
ctx.close();
97+
} catch (Throwable ignored) {
98+
// best-effort shutdown
99+
}
100+
});
101+
scanners.clear();
102+
}
103+
}

0 commit comments

Comments
 (0)