11import R from 'ramda' ;
22import { EventEmitter } from 'events' ;
33import { getEnv , getProcessUid } from '@cubejs-backend/shared' ;
4- import { QueueDriverInterface , QueryKey , QueryKeyHash , QueueId , QueryDef } from '@cubejs-backend/base-driver' ;
4+ import {
5+ QueueDriverInterface ,
6+ QueryKey ,
7+ QueryKeyHash ,
8+ QueueId ,
9+ QueryDef ,
10+ QueryStageStateResponse
11+ } from '@cubejs-backend/base-driver' ;
512import { CubeStoreQueueDriver } from '@cubejs-backend/cubestore-driver' ;
613
714import { TimeoutError } from './TimeoutError' ;
@@ -557,31 +564,20 @@ export class QueryQueue {
557564 }
558565 } ) ) ;
559566
560- /**
561- * There is a bug somewhere in Redis (maybe in memory too?),
562- * which doesn't remove queue item from pending, while it's in active state
563- *
564- * TODO(ovr): Check LocalQueueDriver for strict guarantees that item cannot be in active & pending in the same time
565- * TODO(ovr): Migrate to getToProcessQueries after removal of Redis
566- */
567- const [ active , toProcess ] = await queueConnection . getActiveAndToProcess ( ) ;
567+ const [ _active , toProcess ] = await queueConnection . getActiveAndToProcess ( ) ;
568568
569569 await Promise . all (
570570 R . pipe (
571571 R . filter ( ( [ queryKey , _queueId ] ) => {
572- if ( active . findIndex ( ( [ p , _a ] ) => p === queryKey ) === - 1 ) {
573- const subKeys = queryKey . split ( '@' ) ;
574- if ( subKeys . length === 1 ) {
575- // common queries
576- return true ;
577- } else if ( subKeys [ 1 ] === this . processUid ) {
578- // current process persistent queries
579- return true ;
580- } else {
581- // other processes persistent queries
582- return false ;
583- }
572+ const subKeys = queryKey . split ( '@' ) ;
573+ if ( subKeys . length === 1 ) {
574+ // common queries
575+ return true ;
576+ } else if ( subKeys [ 1 ] === this . processUid ) {
577+ // current process persistent queries
578+ return true ;
584579 } else {
580+ // other processes persistent queries
585581 return false ;
586582 }
587583 } ) ,
@@ -627,7 +623,7 @@ export class QueryQueue {
627623 * Returns the list of queries planned to be processed and the list of active
628624 * queries.
629625 *
630- * @returns {Array }
626+ * @returns {Promise<QueryStageStateResponse> }
631627 */
632628 async fetchQueryStageState ( ) {
633629 const queueConnection = await this . queueDriver . createConnection ( ) ;
@@ -644,26 +640,25 @@ export class QueryQueue {
644640 *
645641 * @param {* } stageQueryKey
646642 * @param {number= } priorityFilter
647- * @param {Array = } queryStageState
643+ * @param {QueryStageStateResponse = } queryStageState
648644 * @returns {Promise<undefined> | Promise<{ stage: string, timeElapsed: number }> }
649645 */
650646 async getQueryStage ( stageQueryKey , priorityFilter , queryStageState ) {
651647 const [ active , toProcess , allQueryDefs ] = queryStageState || await this . fetchQueryStageState ( ) ;
652648
653- const queryDefs = toProcess . map ( k => allQueryDefs [ k ] ) . filter ( q => ! ! q ) ;
654- const queryInQueue = queryDefs . find (
649+ const queryInQueue = Object . values ( allQueryDefs ) . find (
655650 q => this . redisHash ( q . stageQueryKey ) === this . redisHash ( stageQueryKey ) &&
656651 ( priorityFilter != null ? q . priority === priorityFilter : true )
657652 ) ;
658-
659653 if ( queryInQueue ) {
660654 if ( active . indexOf ( this . redisHash ( queryInQueue . queryKey ) ) !== - 1 ) {
661655 return {
662656 stage : 'Executing query' ,
663657 timeElapsed : queryInQueue . startQueryTime ? new Date ( ) . getTime ( ) - queryInQueue . startQueryTime : undefined
664658 } ;
665659 }
666- const index = queryDefs . filter ( q => active . indexOf ( this . redisHash ( q . queryKey ) ) === - 1 ) . indexOf ( queryInQueue ) ;
660+
661+ const index = toProcess . indexOf ( this . redisHash ( queryInQueue . queryKey ) ) ;
667662 if ( index !== - 1 ) {
668663 return index !== - 1 ? { stage : `#${ index + 1 } in queue` } : undefined ;
669664 }
0 commit comments