Skip to content

Commit a8277fb

Browse files
[kv] support shared block cache for kv tablets
1 parent 942eaad commit a8277fb

File tree

6 files changed

+661
-2
lines changed

6 files changed

+661
-2
lines changed

fluss-common/src/main/java/com/alibaba/fluss/config/ConfigOptions.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1535,6 +1535,19 @@ public class ConfigOptions {
15351535
"The amount of the cache for data blocks in RocksDB. "
15361536
+ "The default block-cache size is `8MB`.");
15371537

1538+
public static final ConfigOption<Boolean> KV_SHARED_BLOCK_CACHE_ENABLED =
1539+
key("kv.rocksdb.shared-block-cache.enabled")
1540+
.booleanType()
1541+
.defaultValue(false)
1542+
.withDescription(
1543+
"Whether to enable the shared block cache across all column families.");
1544+
1545+
public static final ConfigOption<MemorySize> KV_SHARED_BLOCK_CACHE_SIZE =
1546+
key("kv.rocksdb.shared-block-cache.size")
1547+
.memoryType()
1548+
.defaultValue(MemorySize.parse("4gb"))
1549+
.withDescription("The size of the shared block cache if enabled.");
1550+
15381551
public static final ConfigOption<Boolean> KV_USE_BLOOM_FILTER =
15391552
key("kv.rocksdb.use-bloom-filter")
15401553
.booleanType()

fluss-server/src/main/java/com/alibaba/fluss/server/kv/KvManager.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.alibaba.fluss.metadata.TableInfo;
3434
import com.alibaba.fluss.metadata.TablePath;
3535
import com.alibaba.fluss.server.TabletManagerBase;
36+
import com.alibaba.fluss.server.kv.rocksdb.RocksDBSharedResource;
3637
import com.alibaba.fluss.server.kv.rowmerger.RowMerger;
3738
import com.alibaba.fluss.server.log.LogManager;
3839
import com.alibaba.fluss.server.log.LogTablet;
@@ -86,6 +87,9 @@ public final class KvManager extends TabletManagerBase {
8687

8788
private final FileSystem remoteFileSystem;
8889

90+
/** Global shared RocksDB resource. */
91+
private final RocksDBSharedResource sharedResource;
92+
8993
private KvManager(
9094
File dataDir,
9195
Configuration conf,
@@ -100,6 +104,9 @@ private KvManager(
100104
this.zkClient = zkClient;
101105
this.remoteKvDir = FlussPaths.remoteKvDir(conf);
102106
this.remoteFileSystem = remoteKvDir.getFileSystem();
107+
108+
// Initialize global shared resource
109+
this.sharedResource = RocksDBSharedResource.getInstance(conf);
103110
}
104111

105112
public static KvManager create(
@@ -129,6 +136,16 @@ public void shutdown() {
129136
LOG.warn("Exception while closing kv tablet {}.", kvTablet.getTableBucket(), e);
130137
}
131138
}
139+
140+
// Clean up shared resources
141+
if (sharedResource != null) {
142+
try {
143+
sharedResource.close();
144+
} catch (Exception e) {
145+
LOG.warn("Exception while closing shared RocksDB resource.", e);
146+
}
147+
}
148+
132149
arrowBufferAllocator.close();
133150
memorySegmentPool.close();
134151
LOG.info("Shut down KvManager complete.");

fluss-server/src/main/java/com/alibaba/fluss/server/kv/rocksdb/RocksDBResourceContainer.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import org.rocksdb.BlockBasedTableConfig;
2828
import org.rocksdb.BloomFilter;
29+
import org.rocksdb.Cache;
2930
import org.rocksdb.ColumnFamilyOptions;
3031
import org.rocksdb.CompactionStyle;
3132
import org.rocksdb.CompressionType;
@@ -74,6 +75,9 @@ public class RocksDBResourceContainer implements AutoCloseable {
7475
/** The handles to be closed when the container is closed. */
7576
private final ArrayList<AutoCloseable> handlesToClose;
7677

78+
/** Global shared RocksDB resource. */
79+
private final RocksDBSharedResource sharedResource;
80+
7781
@VisibleForTesting
7882
RocksDBResourceContainer() {
7983
this(new Configuration(), null, false);
@@ -96,6 +100,10 @@ public RocksDBResourceContainer(
96100
this.enableStatistics = enableStatistics;
97101

98102
this.handlesToClose = new ArrayList<>();
103+
104+
// Get global shared resource and increase reference count
105+
this.sharedResource = RocksDBSharedResource.getInstance(configuration);
106+
this.sharedResource.acquire();
99107
}
100108

101109
/** Gets the RocksDB {@link DBOptions} to be used for RocksDB instances. */
@@ -153,10 +161,21 @@ public ReadOptions getReadOptions() {
153161
return opt;
154162
}
155163

164+
@VisibleForTesting
165+
@Nullable
166+
Cache getSharedBlockCache() {
167+
return sharedResource.getSharedBlockCache();
168+
}
169+
156170
@Override
157171
public void close() throws Exception {
158172
handlesToClose.forEach(IOUtils::closeQuietly);
159173
handlesToClose.clear();
174+
175+
// Release reference to shared resource
176+
if (sharedResource != null) {
177+
sharedResource.release();
178+
}
160179
}
161180

162181
/** Create a {@link DBOptions} for RocksDB, including some common settings. */
@@ -253,14 +272,22 @@ private ColumnFamilyOptions setColumnFamilyOptionsFromConfigurableOptions(
253272
}
254273
}
255274

275+
Cache sharedCache = getSharedBlockCache();
276+
if (sharedCache != null) {
277+
blockBasedTableConfig.setBlockCache(sharedCache);
278+
} else {
279+
// Original per-CF block cache settings
280+
blockBasedTableConfig.setBlockCacheSize(
281+
internalGetOption(ConfigOptions.KV_BLOCK_CACHE_SIZE).getBytes());
282+
}
283+
256284
blockBasedTableConfig.setBlockSize(
257285
internalGetOption(ConfigOptions.KV_BLOCK_SIZE).getBytes());
258286

259287
blockBasedTableConfig.setMetadataBlockSize(
260288
internalGetOption(ConfigOptions.KV_METADATA_BLOCK_SIZE).getBytes());
261289

262-
blockBasedTableConfig.setBlockCacheSize(
263-
internalGetOption(ConfigOptions.KV_BLOCK_CACHE_SIZE).getBytes());
290+
blockBasedTableConfig.setPinL0FilterAndIndexBlocksInCache(true);
264291

265292
if (internalGetOption(ConfigOptions.KV_USE_BLOOM_FILTER)) {
266293
final double bitsPerKey = internalGetOption(ConfigOptions.KV_BLOOM_FILTER_BITS_PER_KEY);

0 commit comments

Comments
 (0)