Skip to content

Commit 040f64f

Browse files
committed
chore: fmt
1 parent 4b88711 commit 040f64f

File tree

2 files changed

+79
-20
lines changed

2 files changed

+79
-20
lines changed

rust/cubestore/cubestore/benches/cachestore_queue.rs

Lines changed: 75 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
2-
use cubestore::cachestore::{CacheStore, QueueAddPayload, QueueItemStatus, RocksCacheStore};
2+
use cubestore::cachestore::{
3+
CacheStore, QueueAddPayload, QueueItemStatus, QueueKey, RocksCacheStore,
4+
};
35
use cubestore::config::{Config, CubeServices};
46
use cubestore::CubeError;
57
use std::sync::Arc;
@@ -25,6 +27,14 @@ fn prepare_cachestore(name: &str) -> Result<Arc<RocksCacheStore>, CubeError> {
2527
Ok(cachestore)
2628
}
2729

30+
fn generate_queue_path(queue_path: &str, queue_id: usize) -> String {
31+
format!(
32+
"{}:{}",
33+
queue_path,
34+
format!("{:x}", md5::compute(queue_id.to_be_bytes()))
35+
)
36+
}
37+
2838
async fn do_insert(
2939
cachestore: &Arc<RocksCacheStore>,
3040
total: usize,
@@ -33,11 +43,7 @@ async fn do_insert(
3343
) {
3444
for i in 0..total {
3545
let fut = cachestore.queue_add(QueueAddPayload {
36-
path: format!(
37-
"{}:{}",
38-
queue_path,
39-
format!("{:x}", md5::compute(i.to_be_bytes()))
40-
),
46+
path: generate_queue_path(queue_path, i),
4147
value: "a".repeat(size_kb * 1024), // size in bytes
4248
priority: 0,
4349
orphaned: None,
@@ -50,15 +56,15 @@ async fn do_insert(
5056

5157
fn do_insert_bench(c: &mut Criterion, runtime: &Runtime, total: usize, size_kb: usize) {
5258
let cachestore = runtime.block_on(async {
53-
prepare_cachestore(&format!("cachestore_queue_insert_{}", size_kb)).unwrap()
59+
prepare_cachestore(&format!("cachestore_queue_add_{}", size_kb)).unwrap()
5460
});
5561

5662
c.bench_with_input(
57-
BenchmarkId::new(format!("insert queues:1, size:{} kb", size_kb), total),
63+
BenchmarkId::new(format!("queue_add queues:1, size:{} kb", size_kb), total),
5864
&(total, size_kb),
5965
|b, (total, size_kb)| {
6066
b.to_async(runtime)
61-
.iter(|| do_insert(&cachestore, *total, *size_kb, "STANDALONE#queue:1"));
67+
.iter(|| do_insert(&cachestore, *total, *size_kb, "STANDALONE#queue"));
6268
},
6369
);
6470
}
@@ -69,7 +75,12 @@ async fn do_list(
6975
total: usize,
7076
) {
7177
for _ in 0..total {
72-
let fut = cachestore.queue_list("queue:1".to_string(), status_filter.clone(), true, false);
78+
let fut = cachestore.queue_list(
79+
"STANDALONE#queue:1".to_string(),
80+
status_filter.clone(),
81+
true,
82+
false,
83+
);
7384

7485
let res = fut.await;
7586
assert!(res.is_ok());
@@ -92,19 +103,19 @@ fn do_list_bench(
92103
))
93104
.unwrap();
94105

95-
do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue:1").await;
96-
do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue:2").await;
97-
do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue:3").await;
98-
do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue:4").await;
99-
do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue:5").await;
106+
do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue").await;
107+
do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue").await;
108+
do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue").await;
109+
do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue").await;
110+
do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue").await;
100111

101112
cachestore
102113
});
103114

104115
c.bench_with_input(
105116
BenchmarkId::new(
106117
format!(
107-
"list status_filter: {:?} queues:5, size:{} kb, per_queue:{}",
118+
"queue_list status_filter: {:?} queues:5, size:{} kb, per_queue:{}",
108119
status_filter, size_kb, per_queue
109120
),
110121
total,
@@ -117,6 +128,52 @@ fn do_list_bench(
117128
);
118129
}
119130

131+
async fn do_get(cachestore: &Arc<RocksCacheStore>, total: usize) {
132+
for i in 0..total {
133+
let fut = cachestore.queue_get(QueueKey::ByPath(generate_queue_path(
134+
"STANDALONE#queue",
135+
i + ((i - 1) * 5),
136+
)));
137+
138+
let res = fut.await;
139+
assert!(res.is_ok());
140+
}
141+
}
142+
143+
fn do_get_bench(
144+
c: &mut Criterion,
145+
runtime: &Runtime,
146+
per_queue: usize,
147+
size_kb: usize,
148+
total: usize,
149+
) {
150+
let cachestore = runtime.block_on(async {
151+
let cachestore = prepare_cachestore(&format!("cachestore_queue_get_{}", size_kb)).unwrap();
152+
153+
do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue").await;
154+
do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue").await;
155+
do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue").await;
156+
do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue").await;
157+
do_insert(&cachestore, per_queue, size_kb, "STANDALONE#queue").await;
158+
159+
cachestore
160+
});
161+
162+
c.bench_with_input(
163+
BenchmarkId::new(
164+
format!(
165+
"queue_get queues:5, size:{} kb, per_queue:{}",
166+
size_kb, per_queue
167+
),
168+
total,
169+
),
170+
&total,
171+
|b, total| {
172+
b.to_async(runtime).iter(|| do_get(&cachestore, *total));
173+
},
174+
);
175+
}
176+
120177
fn do_benches(c: &mut Criterion) {
121178
let runtime = Builder::new_multi_thread().enable_all().build().unwrap();
122179

@@ -126,6 +183,8 @@ fn do_benches(c: &mut Criterion) {
126183

127184
do_list_bench(c, &runtime, Some(QueueItemStatus::Pending), 1_000, 128, 128);
128185
do_list_bench(c, &runtime, Some(QueueItemStatus::Active), 1_000, 128, 128);
186+
187+
do_get_bench(c, &runtime, 10_000, 128, 128);
129188
}
130189

131190
criterion_group!(benches, do_benches);

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1312,7 +1312,7 @@ impl CacheStore for RocksCacheStore {
13121312
.map(|item| item.into_row().key)
13131313
.collect();
13141314
if active.len() >= (allow_concurrency as usize) {
1315-
return Ok(QueueRetrieveResponse::NotFound { pending, active });
1315+
return Ok(QueueRetrieveResponse::NotEnoughConcurrency { pending, active });
13161316
}
13171317

13181318
let id_row = queue_schema.get_single_opt_row_by_index(
@@ -1328,7 +1328,7 @@ impl CacheStore for RocksCacheStore {
13281328
if id_row.get_row().get_status() == &QueueItemStatus::Pending {
13291329
let mut new = id_row.get_row().clone();
13301330
new.status = QueueItemStatus::Active;
1331-
// It's an important to insert heartbeat, because
1331+
// It's important to insert heartbeat, because
13321332
// without that created datetime will be used for orphaned filtering
13331333
new.update_heartbeat();
13341334

@@ -1425,8 +1425,8 @@ impl CacheStore for RocksCacheStore {
14251425
key: QueueKey,
14261426
timeout: u64,
14271427
) -> Result<Option<QueueResultResponse>, CubeError> {
1428-
// It's an important to open listener at the beginning to protect race condition
1429-
// it will fix position (subscribe) of broadcast channel
1428+
// It's important to open listener at the beginning to protect race condition
1429+
// it will fix the position (subscribe) of a broadcast channel
14301430
let listener = self.get_listener().await;
14311431

14321432
let store_in_result = self.lookup_queue_result_by_key(key.clone()).await?;

0 commit comments

Comments
 (0)