Skip to content

Commit eace9db

Browse files
authored
chore(cubestore): Rockstore - use named threads per RWLoop/Store (#9906)
1 parent 26c3465 commit eace9db

File tree

6 files changed

+109
-70
lines changed

6 files changed

+109
-70
lines changed

rust/cubestore/cubestore/benches/cachestore_queue.rs

Lines changed: 46 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,18 @@ fn generate_queue_path(queue_path: &str, queue_id: usize) -> String {
4343
}
4444

4545
async fn do_insert(
46+
cachestore_name: &str,
4647
cachestore: &Arc<RocksCacheStore>,
4748
total: usize,
4849
size_kb: usize,
4950
queue_path: &str,
5051
insert_id_padding: usize,
5152
) {
53+
println!(
54+
"[Preparing] {}: Inserting {} items into queue: {}",
55+
cachestore_name, total, queue_path
56+
);
57+
5258
for i in 0..total {
5359
let fut = cachestore.queue_add(QueueAddPayload {
5460
path: generate_queue_path(queue_path, i + insert_id_padding),
@@ -60,12 +66,13 @@ async fn do_insert(
6066
let res = fut.await;
6167
assert!(res.is_ok());
6268
}
69+
70+
println!("[Preparing] {}: Done", cachestore_name);
6371
}
6472

6573
fn do_insert_bench(c: &mut Criterion, runtime: &Runtime, total: usize, size_kb: usize) {
66-
let cachestore = runtime.block_on(async {
67-
prepare_cachestore(&format!("cachestore_queue_add_{}", size_kb)).unwrap()
68-
});
74+
let cachestore_name = format!("cachestore_queue_add_{}", size_kb);
75+
let cachestore = runtime.block_on(async { prepare_cachestore(&cachestore_name).unwrap() });
6976

7077
c.bench_with_input(
7178
BenchmarkId::new(format!("queue_add queues:1, size:{} kb", size_kb), total),
@@ -78,6 +85,7 @@ fn do_insert_bench(c: &mut Criterion, runtime: &Runtime, total: usize, size_kb:
7885
insert_id_padding += total;
7986

8087
do_insert(
88+
&cachestore_name,
8189
&cachestore,
8290
*total,
8391
*size_kb,
@@ -116,18 +124,24 @@ fn do_list_bench(
116124
total: usize,
117125
) {
118126
let cachestore = runtime.block_on(async {
119-
let cachestore = prepare_cachestore(&format!(
127+
let cachestore_name = format!(
120128
"cachestore_queue_list_{}_{}",
121129
format!("{:?}", status_filter).to_ascii_lowercase(),
122130
size_kb
123-
))
124-
.unwrap();
125-
126-
do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue", 0).await;
127-
do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue", 0).await;
128-
do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue", 0).await;
129-
do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue", 0).await;
130-
do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue", 0).await;
131+
);
132+
let cachestore = prepare_cachestore(&cachestore_name).unwrap();
133+
134+
for idx in 0..5 {
135+
do_insert(
136+
&cachestore_name,
137+
&cachestore,
138+
per_queue,
139+
size_kb,
140+
&format!("STANDALONE#queue{}", idx + 1),
141+
0,
142+
)
143+
.await;
144+
}
131145

132146
cachestore
133147
});
@@ -148,11 +162,11 @@ fn do_list_bench(
148162
);
149163
}
150164

151-
async fn do_get(cachestore: &Arc<RocksCacheStore>, total: usize) {
165+
async fn do_get(cachestore: &Arc<RocksCacheStore>, total: usize, total_queues: usize) {
152166
for i in 0..total {
153167
let fut = cachestore.queue_get(QueueKey::ByPath(generate_queue_path(
154-
"STANDALONE#queue",
155-
i + ((i - 1) * 5),
168+
&format!("STANDALONE#queue{}", (i % total_queues) + 1),
169+
i,
156170
)));
157171

158172
let res = fut.await;
@@ -166,15 +180,23 @@ fn do_get_bench(
166180
per_queue: usize,
167181
size_kb: usize,
168182
total: usize,
183+
queues: usize,
169184
) {
170185
let cachestore = runtime.block_on(async {
171-
let cachestore = prepare_cachestore(&format!("cachestore_queue_get_{}", size_kb)).unwrap();
172-
173-
do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue", 0).await;
174-
do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue", 0).await;
175-
do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue", 0).await;
176-
do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue", 0).await;
177-
do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue", 0).await;
186+
let cachestore_name = format!("cachestore_queue_get_{}", size_kb);
187+
let cachestore = prepare_cachestore(&cachestore_name).unwrap();
188+
189+
for idx in 0..queues {
190+
do_insert(
191+
&cachestore_name,
192+
&cachestore,
193+
per_queue,
194+
size_kb,
195+
&format!("STANDALONE#queue{}", idx + 1),
196+
0,
197+
)
198+
.await;
199+
}
178200

179201
cachestore
180202
});
@@ -189,7 +211,7 @@ fn do_get_bench(
189211
),
190212
&total,
191213
|b, total| {
192-
b.to_async(runtime).iter(|| do_get(&cachestore, *total));
214+
b.to_async(runtime).iter(|| do_get(&cachestore, *total, 3));
193215
},
194216
);
195217
}
@@ -205,7 +227,7 @@ fn do_benches(c: &mut Criterion) {
205227
do_list_bench(c, &runtime, Some(QueueItemStatus::Pending), 1_000, 128, 128);
206228
do_list_bench(c, &runtime, Some(QueueItemStatus::Active), 1_000, 128, 128);
207229

208-
do_get_bench(c, &runtime, 10_000, 128, 128);
230+
do_get_bench(c, &runtime, 2_500, 128, 128, 4na);
209231
}
210232

211233
criterion_group!(benches, do_benches);

rust/cubestore/cubestore/benches/metastore.rs

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -140,41 +140,44 @@ fn do_get_tables_with_path_bench(
140140
);
141141
}
142142

143-
async fn do_cold_cache_test(num_schemas: usize, tables_per_schema: usize) {
144-
let fresh_metastore = prepare_metastore("cold_cache_fresh").unwrap();
145-
populate_metastore(&fresh_metastore, num_schemas, tables_per_schema)
146-
.await
147-
.unwrap();
148-
let result = fresh_metastore.get_tables_with_path(false).await;
149-
assert!(result.is_ok());
150-
}
151-
152-
async fn do_warm_cache_test(metastore: &Arc<RocksMetaStore>) {
153-
let result = metastore.get_tables_with_path(false).await;
154-
assert!(result.is_ok());
155-
}
156-
157143
fn do_cold_vs_warm_cache_bench(
158144
c: &mut Criterion,
159145
runtime: &Runtime,
160146
num_schemas: usize,
161147
tables_per_schema: usize,
162148
) {
163-
let metastore = runtime.block_on(async {
164-
let metastore = prepare_metastore("cold_warm_cache").unwrap();
149+
let cold_metastore = runtime.block_on(async {
150+
let metastore = prepare_metastore("warm_cache").unwrap();
151+
populate_metastore(&metastore, num_schemas, tables_per_schema)
152+
.await
153+
.unwrap();
154+
metastore
155+
});
156+
157+
let warm_metastore = runtime.block_on(async {
158+
let metastore = prepare_metastore("cold_cache").unwrap();
165159
populate_metastore(&metastore, num_schemas, tables_per_schema)
166160
.await
167161
.unwrap();
168162
metastore
169163
});
170164

171165
c.bench_function("get_tables_with_path_cold_cache", |b| {
172-
b.to_async(runtime)
173-
.iter(|| do_cold_cache_test(num_schemas, tables_per_schema));
166+
b.to_async(runtime).iter_batched(
167+
|| cold_metastore.reset_cached_tables(),
168+
async |_| {
169+
let result = cold_metastore.get_tables_with_path(false).await;
170+
assert!(result.is_ok());
171+
},
172+
criterion::BatchSize::SmallInput,
173+
);
174174
});
175175

176176
c.bench_function("get_tables_with_path_warm_cache", |b| {
177-
b.to_async(runtime).iter(|| do_warm_cache_test(&metastore));
177+
b.to_async(runtime).iter(async || {
178+
let result = warm_metastore.get_tables_with_path(false).await;
179+
assert!(result.is_ok());
180+
});
178181
});
179182
}
180183

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,7 @@ impl RocksCacheStore {
223223
cache_eviction_manager,
224224
upload_loop: Arc::new(WorkerLoop::new("Cachestore upload")),
225225
metrics_loop: Arc::new(WorkerLoop::new("Cachestore metrics")),
226-
rw_loop_queue_cf: RocksStoreRWLoop::new("queue"),
226+
rw_loop_queue_cf: RocksStoreRWLoop::new("cachestore", "queue"),
227227
}))
228228
}
229229

rust/cubestore/cubestore/src/metastore/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1388,6 +1388,10 @@ impl RocksMetaStore {
13881388
})
13891389
}
13901390

1391+
pub fn reset_cached_tables(&self) {
1392+
*self.store.cached_tables.lock().unwrap() = None;
1393+
}
1394+
13911395
pub async fn load_from_dump(
13921396
path: &Path,
13931397
dump_path: &Path,

rust/cubestore/cubestore/src/metastore/rocks_store.rs

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use crate::config::ConfigObj;
22
use crate::metastore::table::TablePath;
33
use crate::metastore::{MetaStoreEvent, MetaStoreFs};
4-
use crate::util::aborting_join_handle::AbortingJoinHandle;
54
use crate::util::time_span::warn_long;
65

76
use crate::CubeError;
@@ -813,36 +812,42 @@ pub type RocksStoreRWLoopFn = Box<dyn FnOnce() -> Result<(), CubeError> + Send +
813812
pub struct RocksStoreRWLoop {
814813
name: &'static str,
815814
tx: tokio::sync::mpsc::Sender<RocksStoreRWLoopFn>,
816-
_join_handle: Arc<AbortingJoinHandle<()>>,
817815
}
818816

819817
impl RocksStoreRWLoop {
820-
pub fn new(name: &'static str) -> Self {
818+
pub fn new(store_name: &'static str, name: &'static str) -> Self {
821819
let (tx, mut rx) = tokio::sync::mpsc::channel::<RocksStoreRWLoopFn>(32_768);
822820

823-
let join_handle = cube_ext::spawn_blocking(move || loop {
824-
if let Some(fun) = rx.blocking_recv() {
825-
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(fun)) {
826-
Err(panic_payload) => {
827-
let restore_error = CubeError::from_panic_payload(panic_payload);
828-
log::error!("Panic during read write loop execution: {}", restore_error);
829-
}
830-
Ok(res) => {
831-
if let Err(e) = res {
832-
log::error!("Error during read write loop execution: {}", e);
821+
let thread_name = format!("{}-{}-rwloop", store_name, name);
822+
std::thread::Builder::new()
823+
.name(thread_name)
824+
.spawn(move || loop {
825+
if let Some(fun) = rx.blocking_recv() {
826+
match std::panic::catch_unwind(std::panic::AssertUnwindSafe(fun)) {
827+
Err(panic_payload) => {
828+
let restore_error = CubeError::from_panic_payload(panic_payload);
829+
log::error!(
830+
"Panic during read write loop execution: {}",
831+
restore_error
832+
);
833+
}
834+
Ok(res) => {
835+
if let Err(e) = res {
836+
log::error!("Error during read write loop execution: {}", e);
837+
}
833838
}
834839
}
840+
} else {
841+
return;
835842
}
836-
} else {
837-
return;
838-
}
839-
});
843+
})
844+
.expect(&format!(
845+
"Failed to spawn RWLoop thread for store '{}', name '{}'",
846+
store_name, name
847+
));
840848

841-
Self {
842-
name,
843-
tx,
844-
_join_handle: Arc::new(AbortingJoinHandle::new(join_handle)),
845-
}
849+
// Thread handle is intentionally dropped - thread will exit when tx is dropped
850+
Self { name, tx }
846851
}
847852

848853
pub async fn schedule(&self, fun: RocksStoreRWLoopFn) -> Result<(), CubeError> {
@@ -928,7 +933,7 @@ impl RocksStore {
928933
snapshots_upload_stopped: Arc::new(AsyncMutex::new(false)),
929934
config,
930935
cached_tables: Arc::new(Mutex::new(None)),
931-
rw_loop_default_cf: RocksStoreRWLoop::new("default"),
936+
rw_loop_default_cf: RocksStoreRWLoop::new("metastore", "default"),
932937
details,
933938
};
934939

rust/cubestore/cubestore/src/metastore/rocks_table.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
573573
fn migration_check_table(&self) -> Result<(), CubeError> {
574574
let snapshot = self.snapshot();
575575

576-
let table_info = snapshot.get_pinned(
576+
let table_info = snapshot.get(
577577
&RowKey::TableInfo {
578578
table_id: Self::table_id(),
579579
}
@@ -633,7 +633,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
633633
fn migration_check_indexes(&self) -> Result<(), CubeError> {
634634
let snapshot = self.snapshot();
635635
for index in Self::indexes().into_iter() {
636-
let index_info = snapshot.get_pinned(
636+
let index_info = snapshot.get(
637637
&RowKey::SecondaryIndexInfo {
638638
index_id: Self::index_id(index.get_id()),
639639
}
@@ -977,7 +977,7 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
977977
RowKey::SecondaryIndex(Self::index_id(index_id), secondary_key_hash, row_id);
978978
let secondary_index_key = secondary_index_row_key.to_bytes();
979979

980-
if let Some(secondary_key_bytes) = self.db().get_pinned(&secondary_index_key)? {
980+
if let Some(secondary_key_bytes) = self.db().get(&secondary_index_key)? {
981981
let index_value_version = RocksSecondaryIndex::value_version(secondary_index);
982982
let new_value = match RocksSecondaryIndexValue::from_bytes(
983983
&secondary_key_bytes,
@@ -1102,6 +1102,11 @@ pub trait RocksTable: BaseRocksTable + Debug + Send + Sync {
11021102

11031103
fn get_row(&self, row_id: u64) -> Result<Option<IdRow<Self::T>>, CubeError> {
11041104
let ref db = self.snapshot();
1105+
// Use pinned access to avoid double copying. While zero-copy deserialization would be ideal, but
1106+
// we're using flex buffers with serde, which copies String values during deserialization. There is a way
1107+
// to solve it by using &[u8] types, but it's not worth the effort right now.
1108+
//
1109+
// Let's avoid copying on lookup row, but doing copy on deserialization.
11051110
let res = db.get_pinned(RowKey::Table(Self::table_id(), row_id).to_bytes())?;
11061111

11071112
if let Some(buffer) = res {

0 commit comments

Comments
 (0)