@@ -77,7 +77,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
7777 this . prefixKey ( queryKey )
7878 ] ) ;
7979 if ( rows && rows . length ) {
80- return this . decodeQueryDefFromRow ( rows [ 0 ] ) ;
80+ return this . decodeQueryDefFromRow ( rows [ 0 ] , 'cancelQuery' ) ;
8181 }
8282
8383 return null ;
@@ -148,7 +148,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
148148
149149 for ( const row of rows ) {
150150 if ( ! onlyKeys ) {
151- defs [ row . id ] = this . decodeQueryDefFromRow ( row ) ;
151+ defs [ row . id ] = this . decodeQueryDefFromRow ( row , 'getQueryStageState' ) ;
152152 }
153153
154154 if ( row . status === 'pending' ) {
@@ -168,7 +168,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
168168 this . prefixKey ( this . redisHash ( queryKey ) ) ,
169169 ] ) ;
170170 if ( rows && rows . length ) {
171- return JSON . parse ( rows [ 0 ] . value ) ;
171+ return this . decodeQueryDefFromRow ( rows [ 0 ] , 'getResult' ) ;
172172 }
173173
174174 return null ;
@@ -199,7 +199,11 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
199199 return rows . map ( ( row ) => row . id ) ;
200200 }
201201
202- protected decodeQueryDefFromRow ( row : any ) : QueryDef {
202+ protected decodeQueryDefFromRow ( row : { payload : string , extra ?: string } , method : string ) : QueryDef {
203+ if ( ! row . payload ) {
204+ throw new Error ( `Field payload is empty, incorrect response for ${ method } method` ) ;
205+ }
206+
203207 const payload = JSON . parse ( row . payload ) ;
204208
205209 if ( row . extra ) {
@@ -214,7 +218,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
214218 this . prefixKey ( this . redisHash ( queryKey ) )
215219 ] ) ;
216220 if ( rows && rows . length ) {
217- return this . decodeQueryDefFromRow ( rows [ 0 ] ) ;
221+ return this . decodeQueryDefFromRow ( rows [ 0 ] , 'getQueryDef' ) ;
218222 }
219223
220224 return null ;
@@ -243,7 +247,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
243247 const active = [ this . redisHash ( queryKey ) ] ;
244248 const toProcess = 0 ;
245249 const lockAcquired = true ;
246- const def = this . decodeQueryDefFromRow ( rows [ 0 ] ) ;
250+ const def = this . decodeQueryDefFromRow ( rows [ 0 ] , 'retrieveForProcessing' ) ;
247251
248252 return [
249253 addedCount , null , active , toProcess , def , lockAcquired
@@ -259,7 +263,7 @@ class CubestoreQueueDriverConnection implements QueueDriverConnectionInterface {
259263 this . prefixKey ( this . redisHash ( queryKey ) ) ,
260264 ] ) ;
261265 if ( rows && rows . length ) {
262- return this . decodeQueryDefFromRow ( rows [ 0 ] ) ;
266+ return this . decodeQueryDefFromRow ( rows [ 0 ] , 'getResultBlocking' ) ;
263267 }
264268
265269 return null ;
0 commit comments