File tree Expand file tree Collapse file tree 2 files changed +35
-7
lines changed
cubejs-cubestore-driver/src
cubejs-query-orchestrator/src/orchestrator Expand file tree Collapse file tree 2 files changed +35
-7
lines changed Original file line number Diff line number Diff line change @@ -136,12 +136,31 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
136136 }
137137
138138 public async getActiveAndToProcess ( ) : Promise < GetActiveAndToProcessResponse > {
139+ const active : QueryKeysTuple [ ] = [ ] ;
140+ const toProcess : QueryKeysTuple [ ] = [ ] ;
141+
142+ const rows = await this . driver . query < CubeStoreListResponse > ( 'QUEUE LIST ?' , [
143+ this . options . redisQueuePrefix
144+ ] ) ;
145+ if ( rows . length ) {
146+ for ( const row of rows ) {
147+ if ( row . status === 'active' ) {
148+ active . push ( [
149+ row . id as QueryKeyHash ,
150+ row . queue_id ? parseInt ( row . queue_id , 10 ) : null ,
151+ ] ) ;
152+ } else {
153+ toProcess . push ( [
154+ row . id as QueryKeyHash ,
155+ row . queue_id ? parseInt ( row . queue_id , 10 ) : null ,
156+ ] ) ;
157+ }
158+ }
159+ }
160+
139161 return [
140- // We don't return active queries, because it's useless
141- // There is only one place where it's used, and it's QueryQueue.reconcileQueueImpl
142- // Cube Store provides strict guarantees that queue item cannot be active & pending in the same time
143- [ ] ,
144- await this . getToProcessQueries ( )
162+ active ,
163+ toProcess ,
145164 ] ;
146165 }
147166
Original file line number Diff line number Diff line change @@ -543,7 +543,16 @@ export class QueryQueue {
543543 }
544544 } ) ) ;
545545
546- const [ _active , toProcess ] = await queueConnection . getActiveAndToProcess ( ) ;
546+ const [ active , toProcess ] = await queueConnection . getActiveAndToProcess ( ) ;
547+
548+ /**
549+ * Important notice: Concurrency configuration works per a specific queue, not per node.
550+ *
551+ * In production clusters where it contains N nodes, it shares the same concurrency. It leads to a point
552+ * where every node tries to pick up jobs as much as concurrency is defined for the whole cluster. To minimize
553+ * the effect of competition between nodes, it's important to reduce the number of tries to process by active jobs.
554+ */
555+ const toProcessLimit = active . length >= this . concurrency ? 1 : this . concurrency - active . length ;
547556
548557 const tasks = toProcess
549558 . filter ( ( [ queryKey , _queueId ] ) => {
@@ -559,7 +568,7 @@ export class QueryQueue {
559568 return false ;
560569 }
561570 } )
562- . slice ( 0 , this . concurrency )
571+ . slice ( 0 , toProcessLimit )
563572 . map ( ( [ queryKey , queueId ] ) => this . sendProcessMessageFn ( queryKey , queueId ) ) ;
564573
565574 await Promise . all ( tasks ) ;
You can’t perform that action at this time.
0 commit comments