Skip to content

Commit 24b4798

Browse files
authored
feat(cubestore): Queue - expose processingId in queue_add (#6540)
1 parent 4880b1f commit 24b4798

File tree

3 files changed

+88
-11
lines changed

3 files changed

+88
-11
lines changed

rust/cubestore/cubestore-sql-tests/src/tests.rs

Lines changed: 78 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8074,30 +8074,92 @@ async fn limit_pushdown_unique_key(service: Box<dyn SqlClient>) {
80748074
}
80758075

80768076
async fn queue_full_workflow(service: Box<dyn SqlClient>) {
8077-
service
8077+
let add_response = service
80788078
.exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:1" "payload1";"#)
80798079
.await
80808080
.unwrap();
8081+
assert_queue_add_columns(&add_response);
8082+
assert_eq!(
8083+
add_response.get_rows(),
8084+
&vec![Row::new(vec![
8085+
TableValue::String("1".to_string()),
8086+
TableValue::Boolean(true),
8087+
TableValue::Int(1)
8088+
])]
8089+
);
80818090

8082-
service
8091+
let add_response = service
80838092
.exec_query(r#"QUEUE ADD PRIORITY 10 "STANDALONE#queue:2" "payload2";"#)
80848093
.await
80858094
.unwrap();
8095+
assert_queue_add_columns(&add_response);
8096+
assert_eq!(
8097+
add_response.get_rows(),
8098+
&vec![Row::new(vec![
8099+
TableValue::String("2".to_string()),
8100+
TableValue::Boolean(true),
8101+
TableValue::Int(2)
8102+
])]
8103+
);
80868104

8087-
service
8105+
let add_response = service
80888106
.exec_query(r#"QUEUE ADD PRIORITY 100 "STANDALONE#queue:3" "payload3";"#)
80898107
.await
80908108
.unwrap();
8109+
assert_queue_add_columns(&add_response);
8110+
assert_eq!(
8111+
add_response.get_rows(),
8112+
&vec![Row::new(vec![
8113+
TableValue::String("3".to_string()),
8114+
TableValue::Boolean(true),
8115+
TableValue::Int(3)
8116+
])]
8117+
);
80918118

8092-
service
8119+
let add_response = service
80938120
.exec_query(r#"QUEUE ADD PRIORITY 50 "STANDALONE#queue:4" "payload4";"#)
80948121
.await
80958122
.unwrap();
8123+
assert_queue_add_columns(&add_response);
8124+
assert_eq!(
8125+
add_response.get_rows(),
8126+
&vec![Row::new(vec![
8127+
TableValue::String("4".to_string()),
8128+
TableValue::Boolean(true),
8129+
TableValue::Int(4)
8130+
])]
8131+
);
80968132

8097-
service
8133+
let add_response = service
80988134
.exec_query(r#"QUEUE ADD PRIORITY -1 "STANDALONE#queue:5" "payload5";"#)
80998135
.await
81008136
.unwrap();
8137+
assert_queue_add_columns(&add_response);
8138+
assert_eq!(
8139+
add_response.get_rows(),
8140+
&vec![Row::new(vec![
8141+
TableValue::String("5".to_string()),
8142+
TableValue::Boolean(true),
8143+
TableValue::Int(5)
8144+
])]
8145+
);
8146+
8147+
// deduplication check
8148+
{
8149+
let add_response = service
8150+
.exec_query(r#"QUEUE ADD PRIORITY 1 "STANDALONE#queue:1" "payload1";"#)
8151+
.await
8152+
.unwrap();
8153+
assert_queue_add_columns(&add_response);
8154+
assert_eq!(
8155+
add_response.get_rows(),
8156+
&vec![Row::new(vec![
8157+
TableValue::String("1".to_string()),
8158+
TableValue::Boolean(false),
8159+
TableValue::Int(5)
8160+
])]
8161+
);
8162+
}
81018163

81028164
{
81038165
let pending_response = service
@@ -8273,6 +8335,17 @@ async fn queue_full_workflow(service: Box<dyn SqlClient>) {
82738335
}
82748336
}
82758337

8338+
fn assert_queue_add_columns(response: &Arc<DataFrame>) {
8339+
assert_eq!(
8340+
response.get_columns(),
8341+
&vec![
8342+
Column::new("id".to_string(), ColumnType::String, 0),
8343+
Column::new("added".to_string(), ColumnType::Boolean, 1),
8344+
Column::new("pending".to_string(), ColumnType::Int, 2),
8345+
]
8346+
);
8347+
}
8348+
82768349
fn assert_queue_retrieve_columns(response: &Arc<DataFrame>) {
82778350
assert_eq!(
82788351
response.get_columns(),

rust/cubestore/cubestore/src/cachestore/cache_rocksstore.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,7 @@ impl RocksCacheStore {
332332

333333
#[derive(Clone, Serialize, Deserialize, Debug, Eq, PartialEq, Hash)]
334334
pub struct QueueAddResponse {
335+
pub id: u64,
335336
pub added: bool,
336337
pub pending: u64,
337338
}
@@ -572,15 +573,16 @@ impl CacheStore for RocksCacheStore {
572573
let index_key = QueueItemIndexKey::ByPath(item.get_path());
573574
let id_row_opt = queue_schema
574575
.get_single_opt_row_by_index(&index_key, &QueueItemRocksIndex::ByPath)?;
575-
let added = if id_row_opt.is_none() {
576-
queue_schema.insert(item, batch_pipe)?;
577576

578-
true
577+
let (id, added) = if let Some(row) = id_row_opt {
578+
(row.id, false)
579579
} else {
580-
false
580+
let row = queue_schema.insert(item, batch_pipe)?;
581+
(row.id, true)
581582
};
582583

583584
Ok(QueueAddResponse {
585+
id,
584586
added,
585587
pending: if added { pending + 1 } else { pending },
586588
})

rust/cubestore/cubestore/src/sql/cachestore.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,10 +173,12 @@ impl CacheStoreSqlService {
173173
(
174174
Arc::new(DataFrame::new(
175175
vec![
176-
Column::new("added".to_string(), ColumnType::Boolean, 0),
177-
Column::new("pending".to_string(), ColumnType::Int, 1),
176+
Column::new("id".to_string(), ColumnType::String, 0),
177+
Column::new("added".to_string(), ColumnType::Boolean, 1),
178+
Column::new("pending".to_string(), ColumnType::Int, 2),
178179
],
179180
vec![Row::new(vec![
181+
TableValue::String(response.id.to_string()),
180182
TableValue::Boolean(response.added),
181183
TableValue::Int(response.pending as i64),
182184
])],

0 commit comments

Comments
 (0)