Skip to content

Commit 9924a1a

Browse files
authored
feat(cubestore): Support negative priority for queue (#6031)
1 parent f05d244 commit 9924a1a

File tree

4 files changed

+83
-83
lines changed

4 files changed

+83
-83
lines changed

packages/cubejs-cubestore-driver/src/CubeStoreQueueDriver.ts

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,6 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
3939
priority: number,
4040
options: AddToQueueOptions
4141
): Promise<AddToQueueResponse> {
42-
// TODO: Fix sqlparser, support negative number
43-
priority = priority < 0 ? 0 : priority;
44-
4542
const data = {
4643
queryHandler,
4744
query,

packages/cubejs-query-orchestrator/test/unit/QueryQueue.abstract.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ export const QueryQueueTest = (name: string, options: QueryQueueTestOptions = {}
132132
expect(await queue.getQueryStage('12')).toEqual(undefined);
133133
});
134134

135-
nonCubestoreTest('negative priority', async () => {
135+
test('negative priority', async () => {
136136
delayCount = 0;
137137
const results = [];
138138

rust/cubestore/cubestore-sql-tests/src/tests.rs

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6430,7 +6430,12 @@ async fn queue_full_workflow(service: Box<dyn SqlClient>) {
64306430
.unwrap();
64316431

64326432
service
6433-
.exec_query(r#"QUEUE ADD PRIORITY 50 "STANDALONE#queue:4" "payload3";"#)
6433+
.exec_query(r#"QUEUE ADD PRIORITY 50 "STANDALONE#queue:4" "payload4";"#)
6434+
.await
6435+
.unwrap();
6436+
6437+
service
6438+
.exec_query(r#"QUEUE ADD PRIORITY -1 "STANDALONE#queue:5" "payload5";"#)
64346439
.await
64356440
.unwrap();
64366441

@@ -6470,6 +6475,11 @@ async fn queue_full_workflow(service: Box<dyn SqlClient>) {
64706475
TableValue::String("pending".to_string()),
64716476
TableValue::Null
64726477
]),
6478+
Row::new(vec![
6479+
TableValue::String("5".to_string()),
6480+
TableValue::String("pending".to_string()),
6481+
TableValue::Null
6482+
]),
64736483
]
64746484
);
64756485
}
@@ -6484,7 +6494,7 @@ async fn queue_full_workflow(service: Box<dyn SqlClient>) {
64846494

64856495
{
64866496
let retrieve_response = service
6487-
.exec_query(r#"QUEUE RETRIEVE CONCURRENCY 2 "STANDALONE#queue:3""#)
6497+
.exec_query(r#"QUEUE RETRIEVE CONCURRENCY 1 "STANDALONE#queue:3""#)
64886498
.await
64896499
.unwrap();
64906500
assert_eq!(
@@ -6503,6 +6513,22 @@ async fn queue_full_workflow(service: Box<dyn SqlClient>) {
65036513
);
65046514
}
65056515

6516+
{
6517+
// concurrency limit
6518+
let retrieve_response = service
6519+
.exec_query(r#"QUEUE RETRIEVE CONCURRENCY 1 "STANDALONE#queue:4""#)
6520+
.await
6521+
.unwrap();
6522+
assert_eq!(
6523+
retrieve_response.get_columns(),
6524+
&vec![
6525+
Column::new("payload".to_string(), ColumnType::String, 0),
6526+
Column::new("extra".to_string(), ColumnType::String, 1),
6527+
]
6528+
);
6529+
assert_eq!(retrieve_response.get_rows().len(), 0);
6530+
}
6531+
65066532
{
65076533
let active_response = service
65086534
.exec_query(r#"QUEUE ACTIVE "STANDALONE#queue""#)

rust/cubestore/cubestore/src/sql/parser.rs

Lines changed: 54 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ impl<'a> CubeStoreParser<'a> {
225225
"set" => {
226226
let nx = self.parse_custom_token(&"nx");
227227
let ttl = if self.parse_custom_token(&"ttl") {
228-
Some(self.parse_number("ttl")?)
228+
Some(self.parse_integer("ttl", false)?)
229229
} else {
230230
None
231231
};
@@ -257,17 +257,47 @@ impl<'a> CubeStoreParser<'a> {
257257
}
258258
}
259259

260-
fn parse_number(&mut self, var_name: &str) -> Result<u32, ParserError> {
260+
fn parse_integer<R: num::Integer + std::str::FromStr>(
261+
&mut self,
262+
var_name: &str,
263+
allow_negative: bool,
264+
) -> Result<R, ParserError>
265+
where
266+
<R as std::str::FromStr>::Err: std::fmt::Display,
267+
{
268+
let is_negative = match self.parser.peek_token() {
269+
Token::Minus => {
270+
self.parser.next_token();
271+
true
272+
}
273+
_ => false,
274+
};
275+
261276
match self.parser.parse_number_value()? {
262-
Value::Number(var, false) => var.parse::<u32>().map_err(|err| {
263-
ParserError::ParserError(format!(
264-
"{} must be a positive integer, error: {}",
265-
var_name, err
266-
))
267-
}),
277+
Value::Number(var, false) => {
278+
let value = if is_negative {
279+
"-".to_string() + &var
280+
} else {
281+
var
282+
};
283+
284+
if is_negative && !allow_negative {
285+
return Err(ParserError::ParserError(format!(
286+
"{} must be a positive integer, actual: {}",
287+
var_name, value
288+
)));
289+
}
290+
291+
value.parse::<R>().map_err(|err| {
292+
ParserError::ParserError(format!(
293+
"{} must be a valid integer, error: {}",
294+
var_name, err
295+
))
296+
})
297+
}
268298
x => {
269299
return Err(ParserError::ParserError(format!(
270-
"{} must be a positive integer, actual: {:?}",
300+
"{} must be a valid integer, actual: {:?}",
271301
var_name, x
272302
)))
273303
}
@@ -276,22 +306,11 @@ impl<'a> CubeStoreParser<'a> {
276306

277307
pub fn parse_metastore(&mut self) -> Result<Statement, ParserError> {
278308
if self.parse_custom_token("set_current") {
279-
match self.parser.parse_number_value()? {
280-
Value::Number(id, _) => Ok(Statement::System(SystemCommand::Metastore(
281-
MetastoreCommand::SetCurrent {
282-
id: id.parse::<u128>().map_err(|e| {
283-
ParserError::ParserError(format!(
284-
"Can't parse metastore snapshot id: {}",
285-
e
286-
))
287-
})?,
288-
},
289-
))),
290-
x => Err(ParserError::ParserError(format!(
291-
"Snapshot id expected but {:?} found",
292-
x
293-
))),
294-
}
309+
Ok(Statement::System(SystemCommand::Metastore(
310+
MetastoreCommand::SetCurrent {
311+
id: self.parse_integer("metastore snapshot id", false)?,
312+
},
313+
)))
295314
} else {
296315
Err(ParserError::ParserError(
297316
"Unknown metastore command".to_string(),
@@ -312,24 +331,7 @@ impl<'a> CubeStoreParser<'a> {
312331
match command.as_str() {
313332
"add" => {
314333
let priority = if self.parse_custom_token(&"priority") {
315-
match self.parser.parse_number_value()? {
316-
Value::Number(priority, _) => {
317-
let r = priority.parse::<i64>().map_err(|err| {
318-
ParserError::ParserError(format!(
319-
"priority must be a positive integer, error: {}",
320-
err
321-
))
322-
})?;
323-
324-
r
325-
}
326-
x => {
327-
return Err(ParserError::ParserError(format!(
328-
"priority must be a positive integer, actual: {:?}",
329-
x
330-
)))
331-
}
332-
}
334+
self.parse_integer(&"priority", true)?
333335
} else {
334336
0
335337
};
@@ -358,7 +360,7 @@ impl<'a> CubeStoreParser<'a> {
358360
key: self.parser.parse_identifier()?,
359361
}),
360362
"stalled" => {
361-
let stalled_timeout = self.parse_number("stalled timeout")?;
363+
let stalled_timeout = self.parse_integer("stalled timeout", false)?;
362364

363365
Ok(Statement::QueueToCancel {
364366
prefix: self.parser.parse_identifier()?,
@@ -367,7 +369,7 @@ impl<'a> CubeStoreParser<'a> {
367369
})
368370
}
369371
"orphaned" => {
370-
let orphaned_timeout = self.parse_number("orphaned timeout")?;
372+
let orphaned_timeout = self.parse_integer("orphaned timeout", false)?;
371373

372374
Ok(Statement::QueueToCancel {
373375
prefix: self.parser.parse_identifier()?,
@@ -376,8 +378,8 @@ impl<'a> CubeStoreParser<'a> {
376378
})
377379
}
378380
"to_cancel" => {
379-
let stalled_timeout = self.parse_number("stalled timeout")?;
380-
let orphaned_timeout = self.parse_number("orphaned timeout")?;
381+
let stalled_timeout = self.parse_integer("stalled timeout", false)?;
382+
let orphaned_timeout = self.parse_integer("orphaned timeout", false)?;
381383

382384
Ok(Statement::QueueToCancel {
383385
prefix: self.parser.parse_identifier()?,
@@ -417,7 +419,7 @@ impl<'a> CubeStoreParser<'a> {
417419
}
418420
"retrieve" => {
419421
let concurrency = if self.parse_custom_token(&"concurrency") {
420-
self.parse_number("concurrency")?
422+
self.parse_integer("concurrency", false)?
421423
} else {
422424
1
423425
};
@@ -431,24 +433,7 @@ impl<'a> CubeStoreParser<'a> {
431433
key: self.parser.parse_identifier()?,
432434
}),
433435
"result_blocking" => {
434-
let timeout = match self.parser.parse_number_value()? {
435-
Value::Number(concurrency, false) => {
436-
let r = concurrency.parse::<u64>().map_err(|err| {
437-
ParserError::ParserError(format!(
438-
"TIMEOUT must be a positive integer, error: {}",
439-
err
440-
))
441-
})?;
442-
443-
r
444-
}
445-
x => {
446-
return Err(ParserError::ParserError(format!(
447-
"TIMEOUT must be a positive integer, actual: {:?}",
448-
x
449-
)))
450-
}
451-
};
436+
let timeout = self.parse_integer(&"timeout", false)?;
452437

453438
Ok(Statement::QueueResultBlocking {
454439
timeout,
@@ -470,17 +455,9 @@ impl<'a> CubeStoreParser<'a> {
470455
{
471456
Ok(Statement::System(SystemCommand::KillAllJobs))
472457
} else if self.parse_custom_token("repartition") {
473-
match self.parser.parse_number_value()? {
474-
Value::Number(id, _) => Ok(Statement::System(SystemCommand::Repartition {
475-
partition_id: id.parse::<u64>().map_err(|e| {
476-
ParserError::ParserError(format!("Can't parse partition id: {}", e))
477-
})?,
478-
})),
479-
x => Err(ParserError::ParserError(format!(
480-
"Partition id expected but {:?} found",
481-
x
482-
))),
483-
}
458+
Ok(Statement::System(SystemCommand::Repartition {
459+
partition_id: self.parse_integer("partition id", false)?,
460+
}))
484461
} else if self.parse_custom_token("metastore") {
485462
self.parse_metastore()
486463
} else if self.parse_custom_token("panic") && self.parse_custom_token("worker") {

0 commit comments

Comments
 (0)