Skip to content

Commit 7682d1a

Browse files
[kv] Implement KvTablet RocksDB Lazy Open
Defer RocksDB open to first access and auto-release idle tablets to reduce memory usage on tablet servers with many KV buckets. Key design: - Sentinel pattern: lightweight KvTablet shell with RocksDBState hot-swapped via single volatile write - 6-state lifecycle (LAZY/OPENING/OPEN/RELEASING/FAILED/CLOSED) with generation+epoch fencing - Guard/Pin mechanism: lock-free fast path for OPEN state, blocking slow path triggers on-demand open - Idle release: periodic scan releases tablets idle beyond timeout, preserving local SST files for fast reopen - Cold-to-hot reopen loads local RocksDB + replays incremental logs, falls back to full snapshot download on failure Configs: kv.lazy-open.enabled (default false), kv.lazy-open.idle-timeout (default 24h). Metrics: kvTabletOpenCount, kvTabletLazyCount, kvTabletFailedCount (Gauge).
1 parent b03cec4 commit 7682d1a

File tree

15 files changed

+2904
-265
lines changed

15 files changed

+2904
-265
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1919,6 +1919,23 @@ public class ConfigOptions {
19191919
.withDescription(
19201920
"The max fetch size for fetching log to apply to kv during recovering kv.");
19211921

1922+
// ------------------------------------------------------------------------
1923+
// ConfigOptions for KV lazy open
1924+
// ------------------------------------------------------------------------
1925+
1926+
public static final ConfigOption<Boolean> KV_LAZY_OPEN_ENABLED =
1927+
key("kv.lazy-open.enabled")
1928+
.booleanType()
1929+
.defaultValue(false)
1930+
.withDescription("Whether to enable KvTablet lazy open.");
1931+
1932+
public static final ConfigOption<Duration> KV_LAZY_OPEN_IDLE_TIMEOUT =
1933+
key("kv.lazy-open.idle-timeout")
1934+
.durationType()
1935+
.defaultValue(Duration.ofHours(24))
1936+
.withDescription(
1937+
"Idle time before an open KvTablet is eligible for release back to lazy state.");
1938+
19221939
// ------------------------------------------------------------------------
19231940
// ConfigOptions for metrics
19241941
// ------------------------------------------------------------------------

fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,9 @@ public class MetricNames {
147147
"preWriteBufferTruncateAsDuplicatedPerSecond";
148148
public static final String KV_PRE_WRITE_BUFFER_TRUNCATE_AS_ERROR_RATE =
149149
"preWriteBufferTruncateAsErrorPerSecond";
150+
public static final String KV_TABLET_OPEN_COUNT = "kvTabletOpenCount";
151+
public static final String KV_TABLET_LAZY_COUNT = "kvTabletLazyCount";
152+
public static final String KV_TABLET_FAILED_COUNT = "kvTabletFailedCount";
150153

151154
// --------------------------------------------------------------------------------------------
152155
// RocksDB metrics
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.fluss.server.kv;
19+
20+
import org.apache.fluss.annotation.VisibleForTesting;
21+
import org.apache.fluss.utils.clock.Clock;
22+
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import java.io.Closeable;
27+
import java.util.Collection;
28+
import java.util.Comparator;
29+
import java.util.List;
30+
import java.util.concurrent.ScheduledExecutorService;
31+
import java.util.concurrent.ScheduledFuture;
32+
import java.util.concurrent.TimeUnit;
33+
import java.util.function.Supplier;
34+
import java.util.stream.Collectors;
35+
36+
/**
37+
* Periodically checks OPEN KvTablets and releases idle ones back to LAZY state. Tablets are sorted
38+
* by last access time (LRU). Operates directly on {@link KvTablet} — no dependency on Replica
39+
* layer.
40+
*/
41+
public class KvIdleReleaseController implements Closeable {
42+
43+
private static final Logger LOG = LoggerFactory.getLogger(KvIdleReleaseController.class);
44+
45+
private final ScheduledExecutorService scheduler;
46+
private final Supplier<Collection<KvTablet>> tabletSupplier;
47+
private final Clock clock;
48+
49+
private final long checkIntervalMs;
50+
private final long idleIntervalMs;
51+
52+
private volatile ScheduledFuture<?> scheduledTask;
53+
54+
public KvIdleReleaseController(
55+
ScheduledExecutorService scheduler,
56+
Supplier<Collection<KvTablet>> tabletSupplier,
57+
Clock clock,
58+
long checkIntervalMs,
59+
long idleIntervalMs) {
60+
this.scheduler = scheduler;
61+
this.tabletSupplier = tabletSupplier;
62+
this.clock = clock;
63+
this.checkIntervalMs = checkIntervalMs;
64+
this.idleIntervalMs = idleIntervalMs;
65+
}
66+
67+
public void start() {
68+
scheduledTask =
69+
scheduler.scheduleWithFixedDelay(
70+
this::checkAndRelease,
71+
checkIntervalMs,
72+
checkIntervalMs,
73+
TimeUnit.MILLISECONDS);
74+
LOG.info(
75+
"KvIdleReleaseController started: checkInterval={}ms, idleInterval={}ms",
76+
checkIntervalMs,
77+
idleIntervalMs);
78+
}
79+
80+
@VisibleForTesting
81+
void checkAndRelease() {
82+
try {
83+
Collection<KvTablet> tablets = tabletSupplier.get();
84+
85+
long now = clock.milliseconds();
86+
87+
// Quick check: scan for any idle tablet before sorting
88+
boolean hasIdle = false;
89+
for (KvTablet t : tablets) {
90+
if (now - t.getLastAccessTimestamp() > idleIntervalMs) {
91+
hasIdle = true;
92+
break;
93+
}
94+
}
95+
if (!hasIdle) {
96+
return;
97+
}
98+
99+
// Sort by coldest first (LRU)
100+
List<ReleaseCandidate> candidates =
101+
tablets.stream()
102+
.map(t -> new ReleaseCandidate(t, t.getLastAccessTimestamp()))
103+
.sorted(Comparator.comparingLong(c -> c.lastAccessTimestamp))
104+
.collect(Collectors.toList());
105+
106+
int idleCount = 0;
107+
for (ReleaseCandidate c : candidates) {
108+
if (now - c.lastAccessTimestamp > idleIntervalMs) {
109+
idleCount++;
110+
} else {
111+
break;
112+
}
113+
}
114+
115+
LOG.info("Idle release round: open={}, idle={}", tablets.size(), idleCount);
116+
117+
int released = 0;
118+
for (int i = 0; i < idleCount; i++) {
119+
ReleaseCandidate candidate = candidates.get(i);
120+
KvTablet tablet = candidate.tablet;
121+
122+
if (tablet.canRelease(idleIntervalMs, now)) {
123+
try {
124+
boolean success = tablet.releaseKv();
125+
if (success) {
126+
released++;
127+
LOG.debug(
128+
"Released KvTablet for {} (idle {}ms)",
129+
tablet.getTableBucket(),
130+
now - candidate.lastAccessTimestamp);
131+
}
132+
} catch (Exception e) {
133+
LOG.warn("Failed to release KvTablet for {}", tablet.getTableBucket(), e);
134+
}
135+
}
136+
}
137+
138+
LOG.info("Idle release round complete: released={}/{}", released, idleCount);
139+
} catch (Exception e) {
140+
LOG.error("Error during idle release check", e);
141+
}
142+
}
143+
144+
@Override
145+
public void close() {
146+
if (scheduledTask != null) {
147+
scheduledTask.cancel(false);
148+
}
149+
}
150+
151+
private static class ReleaseCandidate {
152+
final KvTablet tablet;
153+
final long lastAccessTimestamp;
154+
155+
ReleaseCandidate(KvTablet tablet, long lastAccessTimestamp) {
156+
this.tablet = tablet;
157+
this.lastAccessTimestamp = lastAccessTimestamp;
158+
}
159+
}
160+
}

0 commit comments

Comments
 (0)