@@ -558,14 +558,19 @@ export class QueryQueue {
558558 } ) ) ;
559559
560560 /**
561- * There is a bug somewhere in Redis (maybe in memory too?),
562- * which doesn't remove queue item from pending, while it's in active state
563- *
564561 * TODO(ovr): Check LocalQueueDriver for strict guarantees that item cannot be in active & pending in the same time
565- * TODO(ovr): Migrate to getToProcessQueries after removal of Redis
566562 */
567563 const [ active , toProcess ] = await queueConnection . getActiveAndToProcess ( ) ;
568564
565+ /**
566+ * Important notice: Concurrency configuration works per a specific queue, not per node.
567+ *
568+ * In production clusters where it contains N nodes, it shares the same concurrency. It leads to a point
569+ * where every node tries to pick up jobs as much as concurrency is defined for the whole cluster. To minimize
570+ * the effect of competition between nodes, it's important to reduce the number of tries to process by active jobs.
571+ */
572+ const toProcessLimit = active . length >= this . concurrency ? 1 : this . concurrency - active . length ;
573+
569574 await Promise . all (
570575 R . pipe (
571576 R . filter ( ( [ queryKey , _queueId ] ) => {
@@ -585,7 +590,7 @@ export class QueryQueue {
585590 return false ;
586591 }
587592 } ) ,
588- R . take ( this . concurrency ) ,
593+ R . take ( toProcessLimit ) ,
589594 R . map ( ( ( [ queryKey , queueId ] ) => this . sendProcessMessageFn ( queryKey , queueId ) ) )
590595 ) ( toProcess )
591596 ) ;
@@ -743,8 +748,8 @@ export class QueryQueue {
743748 }
744749
745750 /**
746- * Processing query specified by the `queryKey`. This method encapsulate most
747- * of the logic related with the queues updates, heartbeat, etc.
751+ * Processing query specified by the `queryKey`. This method encapsulates most
752+ * of the logic related to the queue updates, heartbeat, etc.
748753 *
749754 * @param {QueryKeyHash } queryKeyHashed
750755 * @param {QueueId | null } queueId Supported by new Cube Store and Memory
0 commit comments