Skip to content

Commit e387577

Browse files
fix: kafka deduplicator cf setup (#52386)
1 parent c3c5b6b commit e387577

File tree

2 files changed

+53
-60
lines changed

2 files changed

+53
-60
lines changed

rust/kafka-deduplicator/src/rocksdb/store.rs

Lines changed: 45 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ use std::{
66
use anyhow::{Context, Result};
77
use rocksdb::{
88
checkpoint::Checkpoint, BlockBasedOptions, BoundColumnFamily, Cache, ColumnFamilyDescriptor,
9-
DBCompressionType, DBWithThreadMode, MultiThreaded, Options, SliceTransform, WriteBatch,
10-
WriteBufferManager, WriteOptions,
9+
DBCompressionType, DBWithThreadMode, MultiThreaded, Options, WriteBatch, WriteBufferManager,
10+
WriteOptions,
1111
};
1212
use std::time::Instant;
1313
use tracing::error;
@@ -35,7 +35,7 @@ const DEFAULT_L0_STOP_WRITES_TRIGGER: i32 = 36;
3535
/// Bloom filter bits per key — 10 bits ≈ 1% false-positive rate.
3636
const BLOOM_FILTER_BITS_PER_KEY: f64 = 10.0;
3737
/// Timestamp key prefix length for SliceTransform (8-byte epoch seconds).
38-
const TIMESTAMP_PREFIX_LEN: usize = 8;
38+
pub const TIMESTAMP_PREFIX_LEN: usize = 8;
3939
/// 2 write buffers: one active for writes, one flushing to disk.
4040
const MAX_WRITE_BUFFER_NUMBER: i32 = 2;
4141
/// Merge every buffer immediately — avoids batching delay before flush.
@@ -153,7 +153,7 @@ pub fn init_shared_resources(config: &RocksDbConfig) {
153153
});
154154
}
155155

156-
pub fn block_based_table_factory() -> BlockBasedOptions {
156+
fn block_based_table_options() -> BlockBasedOptions {
157157
let mut block_opts = BlockBasedOptions::default();
158158
// Bloom filter reduces full-key lookups during dedup checks
159159
block_opts.set_bloom_filter(BLOOM_FILTER_BITS_PER_KEY, false);
@@ -162,20 +162,34 @@ pub fn block_based_table_factory() -> BlockBasedOptions {
162162
block_opts.set_whole_key_filtering(true);
163163
block_opts.set_partition_filters(true);
164164
block_opts.set_pin_top_level_index_and_filter(true);
165+
// Use shared block cache across all stores and column families
166+
block_opts.set_block_cache(
167+
SHARED_BLOCK_CACHE
168+
.get()
169+
.expect("shared block cache not initialized"),
170+
);
165171
block_opts
166172
}
167173

168-
fn rocksdb_options(config: &RocksDbConfig) -> Options {
174+
/// Build column family options with all tuning from `RocksDbConfig`.
175+
/// CF options don't inherit from DB options, so this must explicitly set
176+
/// every option that matters for the CF's performance and compression.
177+
pub fn column_family_options(config: &RocksDbConfig) -> Options {
169178
// Ensure shared resources are initialized (idempotent fallback for tests)
170179
init_shared_resources(config);
171180

172181
let mut opts = Options::default();
173-
opts.create_if_missing(true);
174-
opts.set_atomic_flush(true);
175-
opts.create_missing_column_families(true);
182+
183+
opts.set_block_based_table_factory(&block_based_table_options());
184+
185+
// Write buffer tuning — larger buffers = fewer flushes = less I/O on PVC storage.
186+
opts.set_write_buffer_size(config.write_buffer_size_bytes);
187+
opts.set_max_write_buffer_number(MAX_WRITE_BUFFER_NUMBER);
188+
opts.set_min_write_buffer_number_to_merge(MIN_WRITE_BUFFER_NUMBER_TO_MERGE);
189+
190+
opts.set_target_file_size_base(config.target_file_size_base_bytes);
176191

177192
// Universal compaction: lower write amplification for write-heavy workloads
178-
// at the cost of higher space amplification — good for batch writes on slow PVC storage
179193
opts.set_compaction_style(rocksdb::DBCompactionStyle::Universal);
180194
let mut universal_opts = rocksdb::UniversalCompactOptions::default();
181195
universal_opts.set_size_ratio(UNIVERSAL_SIZE_RATIO);
@@ -185,40 +199,12 @@ fn rocksdb_options(config: &RocksDbConfig) -> Options {
185199
universal_opts.set_compression_size_percent(config.universal_compression_size_percent);
186200
opts.set_universal_compaction_options(&universal_opts);
187201

188-
let mut block_opts = block_based_table_factory();
189-
190-
// Timestamp column family uses prefix extractor for 8-byte epoch-second keys
191-
let mut ts_cf = Options::default();
192-
ts_cf.set_block_based_table_factory(&block_opts);
193-
ts_cf.set_prefix_extractor(SliceTransform::create_fixed_prefix(TIMESTAMP_PREFIX_LEN));
194-
195-
// CRITICAL: Use shared block cache across all stores
196-
block_opts.set_block_cache(
197-
SHARED_BLOCK_CACHE
198-
.get()
199-
.expect("shared block cache not initialized"),
200-
);
201-
opts.set_block_based_table_factory(&block_opts);
202-
203-
// CRITICAL: Use shared write buffer manager to limit total memory
204-
opts.set_write_buffer_manager(
205-
SHARED_WRITE_BUFFER_MANAGER
206-
.get()
207-
.expect("shared write buffer manager not initialized"),
208-
);
209-
210-
// Write buffer tuning — larger buffers = fewer flushes = less I/O on PVC storage.
211-
// The shared write buffer manager caps total memory across all partition stores.
212-
opts.set_write_buffer_size(config.write_buffer_size_bytes);
213-
opts.set_max_write_buffer_number(MAX_WRITE_BUFFER_NUMBER);
214-
opts.set_min_write_buffer_number_to_merge(MIN_WRITE_BUFFER_NUMBER_TO_MERGE);
215-
216-
opts.set_target_file_size_base(config.target_file_size_base_bytes);
217-
202+
// L0 compaction triggers
218203
opts.set_level_zero_file_num_compaction_trigger(config.l0_compaction_trigger);
219204
opts.set_level_zero_slowdown_writes_trigger(config.l0_slowdown_writes_trigger);
220205
opts.set_level_zero_stop_writes_trigger(config.l0_stop_writes_trigger);
221206

207+
// Compression
222208
if let Some(ref per_level) = config.compression_per_level {
223209
opts.set_compression_per_level(per_level);
224210
} else {
@@ -231,6 +217,26 @@ fn rocksdb_options(config: &RocksDbConfig) -> Options {
231217
}
232218
}
233219

220+
opts
221+
}
222+
223+
fn rocksdb_options(config: &RocksDbConfig) -> Options {
224+
// Start with column_family_options() as the base — these settings apply to the
225+
// default CF and are shared with custom CFs via column_family_options().
226+
let mut opts = column_family_options(config);
227+
228+
// DB-level settings (not per-CF)
229+
opts.create_if_missing(true);
230+
opts.set_atomic_flush(true);
231+
opts.create_missing_column_families(true);
232+
233+
// CRITICAL: Use shared write buffer manager to limit total memory across all stores
234+
opts.set_write_buffer_manager(
235+
SHARED_WRITE_BUFFER_MANAGER
236+
.get()
237+
.expect("shared write buffer manager not initialized"),
238+
);
239+
234240
// Limit background jobs to reduce I/O contention when many partitions share a disk
235241
opts.increase_parallelism(config.max_background_jobs);
236242
opts.set_max_background_jobs(config.max_background_jobs);

rust/kafka-deduplicator/src/store/deduplication_store.rs

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@ use std::sync::Arc;
33
use std::time::Instant;
44

55
use anyhow::{Context, Result};
6-
use rocksdb::{ColumnFamilyDescriptor, Options, SliceTransform};
6+
use rocksdb::{ColumnFamilyDescriptor, SliceTransform};
77
use tracing::info;
88

99
use crate::metrics::MetricsHelper;
10-
use crate::rocksdb::store::{block_based_table_factory, RocksDbConfig, RocksDbStore};
10+
use crate::rocksdb::store::{
11+
column_family_options, RocksDbConfig, RocksDbStore, TIMESTAMP_PREFIX_LEN,
12+
};
1113

1214
use super::keys::TimestampKey;
1315
use crate::pipelines::ingestion_events::TimestampMetadata;
@@ -54,23 +56,10 @@ impl DeduplicationStore {
5456
}
5557

5658
fn get_cf_descriptors(rocksdb_config: &RocksDbConfig) -> Vec<ColumnFamilyDescriptor> {
57-
let block_opts = block_based_table_factory();
58-
59-
// ----- CF: TimestampKey (prefix = 8-byte BE timestamp)
60-
let mut ts_cf_opts = Options::default();
61-
ts_cf_opts.set_block_based_table_factory(&block_opts);
62-
ts_cf_opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(8)); // <- per-CF
63-
ts_cf_opts.set_write_buffer_size(8 * 1024 * 1024);
64-
ts_cf_opts.set_max_write_buffer_number(3);
65-
// IMPORTANT: CF options don't inherit from DB options, must set compression explicitly
66-
if let Some(ref per_level) = rocksdb_config.compression_per_level {
67-
ts_cf_opts.set_compression_per_level(per_level);
68-
} else {
69-
ts_cf_opts.set_compression_type(rocksdb_config.compression_type);
70-
}
71-
if let Some(bottommost) = rocksdb_config.bottommost_compression_type {
72-
ts_cf_opts.set_bottommost_compression_type(bottommost);
73-
}
59+
// column_family_options() provides all tuning (write buffers, compaction, compression,
60+
// shared block cache) since CF options don't inherit from DB options.
61+
let mut ts_cf_opts = column_family_options(rocksdb_config);
62+
ts_cf_opts.set_prefix_extractor(SliceTransform::create_fixed_prefix(TIMESTAMP_PREFIX_LEN));
7463

7564
vec![ColumnFamilyDescriptor::new(Self::TIMESTAMP_CF, ts_cf_opts)]
7665
}
@@ -349,8 +338,6 @@ impl DeduplicationStore {
349338
&self,
350339
checkpoint_path: P,
351340
) -> Result<LocalCheckpointInfo> {
352-
self.store.flush_wal(true)?;
353-
354341
let sequence = self.store.latest_sequence_number();
355342

356343
self.store.create_checkpoint(checkpoint_path)?;

0 commit comments

Comments
 (0)