Skip to content

Commit bd489cd

Browse files
authored
feat(cubestore): Queue - correct sort over priority (+created) (#6094)
1 parent d79d3aa commit bd489cd

File tree

8 files changed

+148
-44
lines changed

8 files changed

+148
-44
lines changed

packages/cubejs-query-orchestrator/src/orchestrator/QueryQueue.js

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import R from 'ramda';
22
import { getEnv, getProcessUid } from '@cubejs-backend/shared';
3-
import { QueueDriverInterface } from '@cubejs-backend/base-driver';
3+
import { QueueDriverInterface, QueryKey, QueryKeyHash } from '@cubejs-backend/base-driver';
44
import { CubeStoreQueueDriver } from '@cubejs-backend/cubestore-driver';
55

66
import { TimeoutError } from './TimeoutError';
@@ -139,7 +139,7 @@ export class QueryQueue {
139139
/**
140140
* Returns stream object which will be used to pipe data from data source.
141141
*
142-
* @param {*} queryKeyHash
142+
* @param {QueryKeyHash} queryKeyHash
143143
*/
144144
getQueryStream(queryKeyHash) {
145145
if (!this.streams.queued.has(queryKeyHash)) {
@@ -453,6 +453,10 @@ export class QueryQueue {
453453
async shutdown() {
454454
if (this.reconcilePromise) {
455455
await this.reconcilePromise;
456+
457+
return true;
458+
} else {
459+
return false;
456460
}
457461
}
458462

@@ -741,7 +745,7 @@ export class QueryQueue {
741745
* Processing query specified by the `queryKey`. This method incapsulate most
742746
* of the logic related with the queues updates, heartbeating, etc.
743747
*
744-
* @param {string} queryKeyHashed
748+
* @param {QueryKeyHash} queryKeyHashed
745749
* @return {Promise<{ result: undefined | Object, error: string | undefined }>}
746750
*/
747751
async processQuery(queryKeyHashed) {
@@ -971,8 +975,8 @@ export class QueryQueue {
971975
/**
972976
* Returns hash sum of the specified `queryKey`.
973977
*
974-
* @param {*} queryKey
975-
* @returns {string}
978+
* @param {QueryKey} queryKey
979+
* @returns {QueryKeyHash}
976980
*/
977981
redisHash(queryKey) {
978982
return this.queueDriver.redisHash(queryKey);

packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,14 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {}
6161
});
6262

6363
async function awaitProcessing() {
64-
await queue.shutdown();
65-
await Promise.all(processMessagePromises);
66-
await Promise.all(processCancelPromises);
64+
// process query can call reconcileQueue
65+
while (await queue.shutdown() || processMessagePromises.length || processCancelPromises.length) {
66+
await Promise.all(processMessagePromises);
67+
processMessagePromises = [];
6768

68-
processMessagePromises = [];
69-
processCancelPromises = [];
69+
await Promise.all(processCancelPromises);
70+
processCancelPromises = [];
71+
}
7072
}
7173

7274
afterEach(async () => {
@@ -211,7 +213,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {}
211213
});
212214

213215
test('queue hash process persistent flag properly', () => {
214-
const query = ['select * from table'];
216+
const query: QueryKey = ['select * from table', []];
215217
const key1 = queue.redisHash(query);
216218
// @ts-ignore
217219
query.persistent = false;
@@ -250,7 +252,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {}
250252
});
251253

252254
test('removed before reconciled', async () => {
253-
const query = ['select * from'];
255+
const query: QueryKey = ['select * from', []];
254256
const key = queue.redisHash(query);
255257
await queue.processQuery(key);
256258
const result = await queue.executeInQueue('foo', key, query);

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ pub trait CacheStore: DIService + Send + Sync {
328328
&self,
329329
prefix: String,
330330
orphaned_timeout: Option<u32>,
331-
stalled_timeout: Option<u32>,
331+
heartbeat_timeout: Option<u32>,
332332
) -> Result<Vec<IdRow<QueueItem>>, CubeError>;
333333
async fn queue_list(
334334
&self,
@@ -555,7 +555,7 @@ impl CacheStore for RocksCacheStore {
555555
&self,
556556
prefix: String,
557557
orphaned_timeout: Option<u32>,
558-
stalled_timeout: Option<u32>,
558+
heartbeat_timeout: Option<u32>,
559559
) -> Result<Vec<IdRow<QueueItem>>, CubeError> {
560560
self.store
561561
.read_operation(move |db_ref| {
@@ -568,22 +568,26 @@ impl CacheStore for RocksCacheStore {
568568

569569
let res = items.into_iter().filter(|item| {
570570
if item.get_row().get_status() == &QueueItemStatus::Pending {
571-
if let Some(stalled_timeout) = stalled_timeout {
571+
return if let Some(orphaned_timeout) = orphaned_timeout {
572572
let elapsed = now - item.get_row().get_created().clone();
573-
if elapsed.num_milliseconds() > stalled_timeout as i64 {
574-
return true;
573+
if elapsed.num_milliseconds() > orphaned_timeout as i64 {
574+
true
575+
} else {
576+
false
575577
}
576-
}
578+
} else {
579+
false
580+
};
577581
}
578582

579583
if item.get_row().get_status() == &QueueItemStatus::Active {
580-
if let Some(orphaned_timeout) = orphaned_timeout {
584+
if let Some(heartbeat_timeout) = heartbeat_timeout {
581585
let elapsed = if let Some(heartbeat) = item.get_row().get_heartbeat() {
582586
now - heartbeat.clone()
583587
} else {
584588
now - item.get_row().get_created().clone()
585589
};
586-
if elapsed.num_milliseconds() > orphaned_timeout as i64 {
590+
if elapsed.num_milliseconds() > heartbeat_timeout as i64 {
587591
return true;
588592
}
589593
}
@@ -619,9 +623,7 @@ impl CacheStore for RocksCacheStore {
619623
if priority_sort {
620624
Ok(items
621625
.into_iter()
622-
.sorted_by(|a, b| {
623-
b.get_row().get_priority().cmp(a.get_row().get_priority())
624-
})
626+
.sorted_by(|a, b| b.row.cmp(&a.row))
625627
.collect())
626628
} else {
627629
Ok(items)
@@ -966,7 +968,7 @@ crate::di_service!(ClusterCacheStoreClient, [CacheStore]);
966968

967969
#[cfg(test)]
968970
mod tests {
969-
use crate::cachestore::*;
971+
use super::*;
970972
use crate::CubeError;
971973

972974
#[tokio::test]

rust/cubestore/cubestore/src/cachestore/lazy.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -220,11 +220,11 @@ impl CacheStore for LazyRocksCacheStore {
220220
&self,
221221
prefix: String,
222222
orphaned_timeout: Option<u32>,
223-
stalled_timeout: Option<u32>,
223+
heartbeat_timeout: Option<u32>,
224224
) -> Result<Vec<IdRow<QueueItem>>, CubeError> {
225225
self.init()
226226
.await?
227-
.queue_to_cancel(prefix, orphaned_timeout, stalled_timeout)
227+
.queue_to_cancel(prefix, orphaned_timeout, heartbeat_timeout)
228228
.await
229229
}
230230

rust/cubestore/cubestore/src/cachestore/queue_item.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use crate::{base_rocks_secondary_index, rocks_table_new, CubeError};
66
use chrono::serde::ts_seconds;
77
use chrono::{DateTime, Duration, Utc};
88
use rocksdb::WriteBatch;
9+
use std::cmp::Ordering;
910

1011
use serde::{Deserialize, Deserializer, Serialize};
1112

@@ -78,6 +79,22 @@ pub struct QueueItem {
7879

7980
impl RocksEntity for QueueItem {}
8081

82+
impl Ord for QueueItem {
83+
fn cmp(&self, other: &Self) -> Ordering {
84+
if self.priority == other.priority {
85+
other.created.cmp(&self.created)
86+
} else {
87+
self.priority.cmp(&other.priority)
88+
}
89+
}
90+
}
91+
92+
impl PartialOrd for QueueItem {
93+
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
94+
Some(self.cmp(other))
95+
}
96+
}
97+
8198
impl QueueItem {
8299
pub fn new(path: String, value: String, status: QueueItemStatus, priority: i64) -> Self {
83100
let parts: Vec<&str> = path.rsplitn(2, ":").collect();
@@ -328,3 +345,82 @@ impl RocksSecondaryIndex<QueueItem, QueueItemIndexKey> for QueueItemRocksIndex {
328345
*self as IndexId
329346
}
330347
}
348+
349+
#[cfg(test)]
350+
mod tests {
351+
use super::*;
352+
use crate::CubeError;
353+
use itertools::Itertools;
354+
355+
#[test]
356+
fn test_queue_item_sort() -> Result<(), CubeError> {
357+
let priority0_1 =
358+
QueueItem::new("1".to_string(), "1".to_string(), QueueItemStatus::Active, 0);
359+
let priority0_2 =
360+
QueueItem::new("2".to_string(), "2".to_string(), QueueItemStatus::Active, 0);
361+
let priority0_3 =
362+
QueueItem::new("3".to_string(), "3".to_string(), QueueItemStatus::Active, 0);
363+
let priority10_4 = QueueItem::new(
364+
"4".to_string(),
365+
"4".to_string(),
366+
QueueItemStatus::Active,
367+
10,
368+
);
369+
let priority0_5 =
370+
QueueItem::new("5".to_string(), "5".to_string(), QueueItemStatus::Active, 0);
371+
let priority_n5_6 = QueueItem::new(
372+
"6".to_string(),
373+
"6".to_string(),
374+
QueueItemStatus::Active,
375+
-5,
376+
);
377+
378+
assert_eq!(
379+
vec![
380+
priority0_1.clone(),
381+
priority0_2.clone(),
382+
priority0_3.clone(),
383+
priority10_4.clone(),
384+
priority_n5_6.clone(),
385+
priority0_5.clone()
386+
]
387+
.into_iter()
388+
.sorted_by(|a, b| b.cmp(&a))
389+
.map(|item| item.get_key().clone())
390+
.collect::<Vec<String>>(),
391+
vec![
392+
"4".to_string(),
393+
"1".to_string(),
394+
"2".to_string(),
395+
"3".to_string(),
396+
"5".to_string(),
397+
"6".to_string()
398+
]
399+
);
400+
401+
assert_eq!(
402+
vec![
403+
priority10_4,
404+
priority0_1,
405+
priority0_5,
406+
priority0_2,
407+
priority0_3,
408+
priority_n5_6
409+
]
410+
.into_iter()
411+
.sorted_by(|a, b| b.cmp(&a))
412+
.map(|item| item.get_key().clone())
413+
.collect::<Vec<String>>(),
414+
vec![
415+
"4".to_string(),
416+
"1".to_string(),
417+
"2".to_string(),
418+
"3".to_string(),
419+
"5".to_string(),
420+
"6".to_string()
421+
]
422+
);
423+
424+
Ok(())
425+
}
426+
}

rust/cubestore/cubestore/src/sql/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1268,12 +1268,12 @@ impl SqlService for SqlServiceImpl {
12681268
}
12691269
QueueCommand::ToCancel {
12701270
prefix,
1271+
heartbeat_timeout,
12711272
orphaned_timeout,
1272-
stalled_timeout,
12731273
} => {
12741274
let rows = self
12751275
.cachestore
1276-
.queue_to_cancel(prefix.value, orphaned_timeout, stalled_timeout)
1276+
.queue_to_cancel(prefix.value, orphaned_timeout, heartbeat_timeout)
12771277
.await?;
12781278

12791279
let columns = vec![Column::new("id".to_string(), ColumnType::String, 0)];

rust/cubestore/cubestore/src/sql/parser.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,8 @@ pub enum QueueCommand {
102102
},
103103
ToCancel {
104104
prefix: Ident,
105+
heartbeat_timeout: Option<u32>,
105106
orphaned_timeout: Option<u32>,
106-
stalled_timeout: Option<u32>,
107107
},
108108
List {
109109
prefix: Ident,
@@ -380,31 +380,31 @@ impl<'a> CubeStoreParser<'a> {
380380
key: self.parser.parse_identifier()?,
381381
},
382382
"stalled" => {
383-
let stalled_timeout = self.parse_integer("stalled timeout", false)?;
383+
let heartbeat_timeout = Some(self.parse_integer("heartbeat timeout", false)?);
384384

385385
QueueCommand::ToCancel {
386386
prefix: self.parser.parse_identifier()?,
387387
orphaned_timeout: None,
388-
stalled_timeout: Some(stalled_timeout),
388+
heartbeat_timeout,
389389
}
390390
}
391391
"orphaned" => {
392-
let orphaned_timeout = self.parse_integer("orphaned timeout", false)?;
392+
let orphaned_timeout = Some(self.parse_integer("orphaned timeout", false)?);
393393

394394
QueueCommand::ToCancel {
395395
prefix: self.parser.parse_identifier()?,
396-
orphaned_timeout: Some(orphaned_timeout),
397-
stalled_timeout: None,
396+
heartbeat_timeout: None,
397+
orphaned_timeout,
398398
}
399399
}
400400
"to_cancel" => {
401-
let stalled_timeout = self.parse_integer("stalled timeout", false)?;
402-
let orphaned_timeout = self.parse_integer("orphaned timeout", false)?;
401+
let heartbeat_timeout = Some(self.parse_integer("heartbeat timeout", false)?);
402+
let orphaned_timeout = Some(self.parse_integer("orphaned timeout", false)?);
403403

404404
QueueCommand::ToCancel {
405405
prefix: self.parser.parse_identifier()?,
406-
orphaned_timeout: Some(stalled_timeout),
407-
stalled_timeout: Some(orphaned_timeout),
406+
heartbeat_timeout,
407+
orphaned_timeout,
408408
}
409409
}
410410
"pending" => {

yarn.lock

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10242,14 +10242,14 @@ caniuse-api@^3.0.0:
1024210242
lodash.uniq "^4.5.0"
1024310243

1024410244
caniuse-lite@^1.0.0, caniuse-lite@^1.0.30000981, caniuse-lite@^1.0.30001032, caniuse-lite@^1.0.30001109, caniuse-lite@^1.0.30001125, caniuse-lite@^1.0.30001286:
10245-
version "1.0.30001373"
10246-
resolved "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001373.tgz"
10247-
integrity sha512-pJYArGHrPp3TUqQzFYRmP/lwJlj8RCbVe3Gd3eJQkAV8SAC6b19XS9BjMvRdvaS8RMkaTN8ZhoHP6S1y8zzwEQ==
10245+
version "1.0.30001450"
10246+
resolved "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001450.tgz"
10247+
integrity sha512-qMBmvmQmFXaSxexkjjfMvD5rnDL0+m+dUMZKoDYsGG8iZN29RuYh9eRoMvKsT6uMAWlyUUGDEQGJJYjzCIO9ew==
1024810248

1024910249
caniuse-lite@^1.0.30001400:
10250-
version "1.0.30001446"
10251-
resolved "https://registry.yarnpkg.com/caniuse-lite/-/caniuse-lite-1.0.30001446.tgz#6d4ba828ab19f49f9bcd14a8430d30feebf1e0c5"
10252-
integrity sha512-fEoga4PrImGcwUUGEol/PoFCSBnSkA9drgdkxXkJLsUBOnJ8rs3zDv6ApqYXGQFOyMPsjh79naWhF4DAxbF8rw==
10250+
version "1.0.30001450"
10251+
resolved "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001450.tgz"
10252+
integrity sha512-qMBmvmQmFXaSxexkjjfMvD5rnDL0+m+dUMZKoDYsGG8iZN29RuYh9eRoMvKsT6uMAWlyUUGDEQGJJYjzCIO9ew==
1025310253

1025410254
1025510255
version "1.0.0"

0 commit comments

Comments
 (0)