Skip to content

Commit 0f4a431

Browse files
authored
feat(cubestore): Correct queue add handling (#6022)
1 parent cbf75e9 commit 0f4a431

File tree

7 files changed

+97
-51
lines changed

7 files changed

+97
-51
lines changed

packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -52,19 +52,22 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
5252
addedToQueueTime: new Date().getTime()
5353
};
5454

55-
const _rows = await this.driver.query('QUEUE ADD PRIORITY ? ? ?', [
55+
const rows = await this.driver.query('QUEUE ADD PRIORITY ? ? ?', [
5656
priority,
5757
this.prefixKey(this.redisHash(queryKey)),
5858
JSON.stringify(data)
5959
]);
60+
if (rows && rows.length) {
61+
return [
62+
rows[0].added === 'true' ? 1 : 0,
63+
null,
64+
null,
65+
parseInt(rows[0].pending, 10),
66+
data.addedToQueueTime
67+
];
68+
}
6069

61-
return [
62-
1,
63-
null,
64-
null,
65-
1,
66-
data.addedToQueueTime
67-
];
70+
throw new Error('Empty response on QUEUE ADD');
6871
}
6972

7073
// TODO: Looks useless, because we can do it in one step - getQueriesToCancel

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 42 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,23 @@ impl RocksCacheStore {
261261
})
262262
.await
263263
}
264+
265+
fn queue_count_by_prefix_and_status(
266+
db_ref: DbTableRef,
267+
prefix: &Option<String>,
268+
status: QueueItemStatus,
269+
) -> Result<u64, CubeError> {
270+
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
271+
let index_key =
272+
QueueItemIndexKey::ByPrefixAndStatus(prefix.clone().unwrap_or("".to_string()), status);
273+
queue_schema.count_rows_by_index(&index_key, &QueueItemRocksIndex::ByPrefixAndStatus)
274+
}
275+
}
276+
277+
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
278+
pub struct QueueAddResponse {
279+
pub added: bool,
280+
pub pending: u64,
264281
}
265282

266283
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
@@ -285,7 +302,7 @@ pub trait CacheStore: DIService + Send + Sync {
285302

286303
// queue
287304
async fn queue_all(&self) -> Result<Vec<IdRow<QueueItem>>, CubeError>;
288-
async fn queue_add(&self, item: QueueItem) -> Result<bool, CubeError>;
305+
async fn queue_add(&self, item: QueueItem) -> Result<QueueAddResponse, CubeError>;
289306
async fn queue_truncate(&self) -> Result<(), CubeError>;
290307
async fn queue_get(&self, key: String) -> Result<Option<IdRow<QueueItem>>, CubeError>;
291308
async fn queue_to_cancel(
@@ -453,19 +470,32 @@ impl CacheStore for RocksCacheStore {
453470
.await
454471
}
455472

456-
async fn queue_add(&self, item: QueueItem) -> Result<bool, CubeError> {
473+
async fn queue_add(&self, item: QueueItem) -> Result<QueueAddResponse, CubeError> {
457474
self.store
458475
.write_operation(move |db_ref, batch_pipe| {
459476
let queue_schema = QueueItemRocksTable::new(db_ref.clone());
460477
let index_key = QueueItemIndexKey::ByPath(item.get_path());
461478
let id_row_opt = queue_schema
462479
.get_single_opt_row_by_index(&index_key, &QueueItemRocksIndex::ByPath)?;
463480

464-
if id_row_opt.is_none() {
481+
let pending = Self::queue_count_by_prefix_and_status(
482+
db_ref,
483+
item.get_prefix(),
484+
QueueItemStatus::Pending,
485+
)?;
486+
487+
let added = if id_row_opt.is_none() {
465488
queue_schema.insert(item, batch_pipe)?;
466-
}
467489

468-
Ok(true)
490+
true
491+
} else {
492+
false
493+
};
494+
495+
Ok(QueueAddResponse {
496+
added,
497+
pending: if added { pending + 1 } else { pending },
498+
})
469499
})
470500
.await
471501
}
@@ -629,26 +659,12 @@ impl CacheStore for RocksCacheStore {
629659

630660
if let Some(id_row) = id_row_opt {
631661
if id_row.get_row().get_status() == &QueueItemStatus::Pending {
632-
// TODO: Introduce count + Active index?
633-
let index_key = QueueItemIndexKey::ByPrefix(
634-
if let Some(prefix) = id_row.get_row().get_prefix() {
635-
prefix.clone()
636-
} else {
637-
"".to_string()
638-
},
639-
);
640-
let in_queue = queue_schema
641-
.get_rows_by_index(&index_key, &QueueItemRocksIndex::ByPrefix)?;
642-
643-
let mut current_active = 0;
644-
645-
for item in in_queue {
646-
if item.get_row().get_status() == &QueueItemStatus::Active {
647-
current_active += 1;
648-
}
649-
}
650-
651-
if current_active >= allow_concurrency {
662+
let current_active = Self::queue_count_by_prefix_and_status(
663+
db_ref,
664+
id_row.get_row().get_prefix(),
665+
QueueItemStatus::Active,
666+
)?;
667+
if current_active >= (allow_concurrency as u64) {
652668
return Ok(None);
653669
}
654670

@@ -835,7 +851,7 @@ impl CacheStore for ClusterCacheStoreClient {
835851
panic!("CacheStore cannot be used on the worker node! queue_all was used.")
836852
}
837853

838-
async fn queue_add(&self, _item: QueueItem) -> Result<bool, CubeError> {
854+
async fn queue_add(&self, _item: QueueItem) -> Result<QueueAddResponse, CubeError> {
839855
panic!("CacheStore cannot be used on the worker node! queue_add was used.")
840856
}
841857

rust/cubestore/cubestore/src/cachestore/compaction.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ impl MetaStoreCacheCompactionFilter {
3535
}
3636
}
3737

38-
#[cfg(debug_assertions)]
3938
impl Drop for MetaStoreCacheCompactionFilter {
4039
fn drop(&mut self) {
4140
let elapsed = Utc::now() - self.current;

rust/cubestore/cubestore/src/cachestore/lazy.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::cachestore::cache_rocksstore::QueueAddResponse;
12
use crate::cachestore::{
23
CacheItem, CacheStore, QueueItem, QueueItemStatus, QueueResultResponse, RocksCacheStore,
34
};
@@ -203,7 +204,7 @@ impl CacheStore for LazyRocksCacheStore {
203204
self.init().await?.queue_all().await
204205
}
205206

206-
async fn queue_add(&self, item: QueueItem) -> Result<bool, CubeError> {
207+
async fn queue_add(&self, item: QueueItem) -> Result<QueueAddResponse, CubeError> {
207208
self.init().await?.queue_add(item).await
208209
}
209210

rust/cubestore/cubestore/src/cachestore/queue_item.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -217,22 +217,22 @@ impl QueueItem {
217217
#[derive(Clone, Copy, Debug)]
218218
pub(crate) enum QueueItemRocksIndex {
219219
ByPath = 1,
220-
ByStatus = 2,
220+
ByPrefixAndStatus = 2,
221221
ByPrefix = 3,
222222
}
223223

224224
rocks_table_impl!(QueueItem, QueueItemRocksTable, TableId::QueueItems, {
225225
vec![
226226
Box::new(QueueItemRocksIndex::ByPath),
227-
Box::new(QueueItemRocksIndex::ByStatus),
227+
Box::new(QueueItemRocksIndex::ByPrefixAndStatus),
228228
Box::new(QueueItemRocksIndex::ByPrefix),
229229
]
230230
});
231231

232232
#[derive(Hash, Clone, Debug)]
233233
pub enum QueueItemIndexKey {
234234
ByPath(String),
235-
ByStatus(QueueItemStatus),
235+
ByPrefixAndStatus(String, QueueItemStatus),
236236
ByPrefix(String),
237237
}
238238

@@ -242,13 +242,12 @@ impl RocksSecondaryIndex<QueueItem, QueueItemIndexKey> for QueueItemRocksIndex {
242242
fn typed_key_by(&self, row: &QueueItem) -> QueueItemIndexKey {
243243
match self {
244244
QueueItemRocksIndex::ByPath => QueueItemIndexKey::ByPath(row.get_path()),
245-
QueueItemRocksIndex::ByStatus => QueueItemIndexKey::ByStatus(row.get_status().clone()),
245+
QueueItemRocksIndex::ByPrefixAndStatus => QueueItemIndexKey::ByPrefixAndStatus(
246+
row.get_prefix().clone().unwrap_or("".to_string()),
247+
row.get_status().clone(),
248+
),
246249
QueueItemRocksIndex::ByPrefix => {
247-
QueueItemIndexKey::ByPrefix(if let Some(prefix) = row.get_prefix() {
248-
prefix.clone()
249-
} else {
250-
"".to_string()
251-
})
250+
QueueItemIndexKey::ByPrefix(row.get_prefix().clone().unwrap_or("".to_string()))
252251
}
253252
}
254253
}
@@ -257,8 +256,9 @@ impl RocksSecondaryIndex<QueueItem, QueueItemIndexKey> for QueueItemRocksIndex {
257256
match key {
258257
QueueItemIndexKey::ByPath(s) => s.as_bytes().to_vec(),
259258
QueueItemIndexKey::ByPrefix(s) => s.as_bytes().to_vec(),
260-
QueueItemIndexKey::ByStatus(s) => {
261-
let mut r = Vec::with_capacity(1);
259+
QueueItemIndexKey::ByPrefixAndStatus(prefix, s) => {
260+
let mut r = Vec::with_capacity(prefix.len() + 1);
261+
r.extend_from_slice(&prefix.as_bytes());
262262

263263
match s {
264264
QueueItemStatus::Pending => r.push(0_u8),
@@ -274,15 +274,15 @@ impl RocksSecondaryIndex<QueueItem, QueueItemIndexKey> for QueueItemRocksIndex {
274274
fn is_unique(&self) -> bool {
275275
match self {
276276
QueueItemRocksIndex::ByPath => true,
277-
QueueItemRocksIndex::ByStatus => false,
277+
QueueItemRocksIndex::ByPrefixAndStatus => false,
278278
QueueItemRocksIndex::ByPrefix => false,
279279
}
280280
}
281281

282282
fn version(&self) -> u32 {
283283
match self {
284284
QueueItemRocksIndex::ByPath => 1,
285-
QueueItemRocksIndex::ByStatus => 1,
285+
QueueItemRocksIndex::ByPrefixAndStatus => 2,
286286
QueueItemRocksIndex::ByPrefix => 1,
287287
}
288288
}

rust/cubestore/cubestore/src/metastore/rocks_table.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -519,6 +519,23 @@ pub trait RocksTable: Debug + Send + Sync {
519519
Ok(existing_keys)
520520
}
521521

522+
fn count_rows_by_index<K: Debug + Hash>(
523+
&self,
524+
row_key: &K,
525+
secondary_index: &impl RocksSecondaryIndex<Self::T, K>,
526+
) -> Result<u64, CubeError> {
527+
#[cfg(debug_assertions)]
528+
if RocksSecondaryIndex::is_unique(secondary_index) {
529+
return Err(CubeError::internal(format!(
530+
"Wrong usage of count_rows_by_index, called on unique index for {:?} table",
531+
self
532+
)));
533+
}
534+
535+
let rows_ids = self.get_row_ids_by_index(row_key, secondary_index)?;
536+
Ok(rows_ids.len() as u64)
537+
}
538+
522539
fn get_rows_by_index<K: Debug>(
523540
&self,
524541
row_key: &K,

rust/cubestore/cubestore/src/sql/mod.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1167,7 +1167,8 @@ impl SqlService for SqlServiceImpl {
11671167
priority,
11681168
value,
11691169
} => {
1170-
self.cachestore
1170+
let response = self
1171+
.cachestore
11711172
.queue_add(QueueItem::new(
11721173
key.value,
11731174
value,
@@ -1176,7 +1177,16 @@ impl SqlService for SqlServiceImpl {
11761177
))
11771178
.await?;
11781179

1179-
Ok(Arc::new(DataFrame::new(vec![], vec![])))
1180+
Ok(Arc::new(DataFrame::new(
1181+
vec![
1182+
Column::new("added".to_string(), ColumnType::Boolean, 0),
1183+
Column::new("pending".to_string(), ColumnType::Int, 1),
1184+
],
1185+
vec![Row::new(vec![
1186+
TableValue::Boolean(response.added),
1187+
TableValue::Int(response.pending as i64),
1188+
])],
1189+
)))
11801190
}
11811191
CubeStoreStatement::QueueTruncate {} => {
11821192
self.cachestore.queue_truncate().await?;

0 commit comments

Comments
 (0)