@@ -281,15 +281,19 @@ class PreAggregationLoader {
281281 if ( versionEntry . structure_version !== newVersionEntry . structure_version ) {
282282 this . logger ( 'Invalidating pre-aggregation structure' , {
283283 preAggregation : this . preAggregation ,
284- requestId : this . requestId
284+ requestId : this . requestId ,
285+ queryKey : this . preAggregationQueryKey ( invalidationKeys ) ,
286+ newVersionEntry
285287 } ) ;
286288 await this . executeInQueue ( invalidationKeys , this . priority ( 10 ) , newVersionEntry ) ;
287289 return mostRecentTargetTableName ( ) ;
288290 } else if ( versionEntry . content_version !== newVersionEntry . content_version ) {
289291 if ( this . waitForRenew ) {
290292 this . logger ( 'Waiting for pre-aggregation renew' , {
291293 preAggregation : this . preAggregation ,
292- requestId : this . requestId
294+ requestId : this . requestId ,
295+ queryKey : this . preAggregationQueryKey ( invalidationKeys ) ,
296+ newVersionEntry
293297 } ) ;
294298 await this . executeInQueue ( invalidationKeys , this . priority ( 0 ) , newVersionEntry ) ;
295299 return mostRecentTargetTableName ( ) ;
@@ -300,7 +304,9 @@ class PreAggregationLoader {
300304 } else {
301305 this . logger ( 'Creating pre-aggregation from scratch' , {
302306 preAggregation : this . preAggregation ,
303- requestId : this . requestId
307+ requestId : this . requestId ,
308+ queryKey : this . preAggregationQueryKey ( invalidationKeys ) ,
309+ newVersionEntry
304310 } ) ;
305311 await this . executeInQueue ( invalidationKeys , this . priority ( 10 ) , newVersionEntry ) ;
306312 return mostRecentTargetTableName ( ) ;
@@ -345,7 +351,9 @@ class PreAggregationLoader {
345351 scheduleRefresh ( invalidationKeys , newVersionEntry ) {
346352 this . logger ( 'Refreshing pre-aggregation content' , {
347353 preAggregation : this . preAggregation ,
348- requestId : this . requestId
354+ requestId : this . requestId ,
355+ queryKey : this . preAggregationQueryKey ( invalidationKeys ) ,
356+ newVersionEntry
349357 } ) ;
350358 this . executeInQueue ( invalidationKeys , this . priority ( 0 ) , newVersionEntry )
351359 . catch ( e => {
@@ -360,24 +368,29 @@ class PreAggregationLoader {
360368 executeInQueue ( invalidationKeys , priority , newVersionEntry ) {
361369 return this . preAggregations . getQueue ( ) . executeInQueue (
362370 'query' ,
363- [ this . preAggregation . loadSql , invalidationKeys ] ,
371+ this . preAggregationQueryKey ( invalidationKeys ) ,
364372 {
365373 preAggregation : this . preAggregation ,
366374 preAggregationsTablesToTempTables : this . preAggregationsTablesToTempTables ,
367375 newVersionEntry,
368- requestId : this . requestId
376+ requestId : this . requestId ,
377+ invalidationKeys
369378 } ,
370379 priority ,
371380 // eslint-disable-next-line no-use-before-define
372381 { stageQueryKey : PreAggregations . preAggregationQueryCacheKey ( this . preAggregation ) , requestId : this . requestId }
373382 ) ;
374383 }
375384
385+ preAggregationQueryKey ( invalidationKeys ) {
386+ return [ this . preAggregation . loadSql , invalidationKeys ] ;
387+ }
388+
376389 targetTableName ( versionEntry ) {
377390 return `${ versionEntry . table_name } _${ versionEntry . content_version } _${ versionEntry . structure_version } _${ versionEntry . last_updated_at } ` ;
378391 }
379392
380- refresh ( newVersionEntry ) {
393+ refresh ( newVersionEntry , invalidationKeys ) {
381394 return ( client ) => {
382395 let refreshStrategy = this . refreshImplStoreInSourceStrategy ;
383396 if ( this . preAggregation . external ) {
@@ -388,12 +401,23 @@ class PreAggregationLoader {
388401 this . refreshImplStreamExternalStrategy : this . refreshImplTempTableExternalStrategy ;
389402 }
390403 return cancelCombinator (
391- saveCancelFn => refreshStrategy . bind ( this ) ( client , newVersionEntry , saveCancelFn )
404+ saveCancelFn => refreshStrategy . bind ( this ) ( client , newVersionEntry , saveCancelFn , invalidationKeys )
392405 ) ;
393406 } ;
394407 }
395408
396- async refreshImplStoreInSourceStrategy ( client , newVersionEntry , saveCancelFn ) {
409+ logExecutingSql ( invalidationKeys , query , params , targetTableName , newVersionEntry ) {
410+ this . logger ( 'Executing Load Pre Aggregation SQL' , {
411+ queryKey : this . preAggregationQueryKey ( invalidationKeys ) ,
412+ query,
413+ values : params ,
414+ targetTableName,
415+ requestId : this . requestId ,
416+ newVersionEntry,
417+ } ) ;
418+ }
419+
420+ async refreshImplStoreInSourceStrategy ( client , newVersionEntry , saveCancelFn , invalidationKeys ) {
397421 const [ loadSql , params ] =
398422 Array . isArray ( this . preAggregation . loadSql ) ? this . preAggregation . loadSql : [ this . preAggregation . loadSql , [ ] ] ;
399423 const targetTableName = this . targetTableName ( newVersionEntry ) ;
@@ -402,13 +426,7 @@ class PreAggregationLoader {
402426 this . preAggregation . tableName ,
403427 targetTableName
404428 ) ;
405- this . logger ( 'Executing Load Pre Aggregation SQL' , {
406- queryKey : this . preAggregation . loadSql ,
407- query,
408- values : params ,
409- targetTableName,
410- requestId : this . requestId
411- } ) ;
429+ this . logExecutingSql ( invalidationKeys , query , params , targetTableName , newVersionEntry ) ;
412430 await saveCancelFn ( client . loadPreAggregationIntoTable (
413431 targetTableName ,
414432 query ,
@@ -420,7 +438,7 @@ class PreAggregationLoader {
420438 await this . loadCache . reset ( this . preAggregation ) ;
421439 }
422440
423- async refreshImplTempTableExternalStrategy ( client , newVersionEntry , saveCancelFn ) {
441+ async refreshImplTempTableExternalStrategy ( client , newVersionEntry , saveCancelFn , invalidationKeys ) {
424442 const [ loadSql , params ] =
425443 Array . isArray ( this . preAggregation . loadSql ) ? this . preAggregation . loadSql : [ this . preAggregation . loadSql , [ ] ] ;
426444 await client . createSchemaIfNotExists ( this . preAggregation . preAggregationsSchema ) ;
@@ -430,13 +448,7 @@ class PreAggregationLoader {
430448 this . preAggregation . tableName ,
431449 targetTableName
432450 ) ;
433- this . logger ( 'Executing Load Pre Aggregation SQL' , {
434- queryKey : this . preAggregation . loadSql ,
435- query,
436- values : params ,
437- targetTableName,
438- requestId : this . requestId
439- } ) ;
451+ this . logExecutingSql ( invalidationKeys , query , params , targetTableName , newVersionEntry ) ;
440452 await saveCancelFn ( client . loadPreAggregationIntoTable (
441453 targetTableName ,
442454 query ,
@@ -448,13 +460,14 @@ class PreAggregationLoader {
448460 await this . dropOrphanedTables ( client , targetTableName , saveCancelFn ) ;
449461 }
450462
451- async refreshImplStreamExternalStrategy ( client , newVersionEntry , saveCancelFn ) {
463+ async refreshImplStreamExternalStrategy ( client , newVersionEntry , saveCancelFn , invalidationKeys ) {
452464 const [ sql , params ] =
453465 Array . isArray ( this . preAggregation . sql ) ? this . preAggregation . sql : [ this . preAggregation . sql , [ ] ] ;
454466 if ( ! client . downloadQueryResults ) {
455467 throw new Error ( `Can't load external pre-aggregation: source driver doesn't support downloadQueryResults()` ) ;
456468 }
457469
470+ this . logExecutingSql ( invalidationKeys , sql , params , this . targetTableName ( newVersionEntry ) , newVersionEntry ) ;
458471 this . logger ( 'Downloading external pre-aggregation via query' , {
459472 preAggregation : this . preAggregation ,
460473 requestId : this . requestId
@@ -614,7 +627,7 @@ class PreAggregations {
614627 if ( ! this . queue ) {
615628 this . queue = QueryCache . createQueue ( `SQL_PRE_AGGREGATIONS_${ this . redisPrefix } ` , this . driverFactory , ( client , q ) => {
616629 const {
617- preAggregation, preAggregationsTablesToTempTables, newVersionEntry, requestId
630+ preAggregation, preAggregationsTablesToTempTables, newVersionEntry, requestId, invalidationKeys
618631 } = q ;
619632 const loader = new PreAggregationLoader (
620633 this . redisPrefix ,
@@ -627,7 +640,7 @@ class PreAggregations {
627640 new PreAggregationLoadCache ( this . redisPrefix , this . driverFactory , this . queryCache , this , { requestId } ) ,
628641 { requestId }
629642 ) ;
630- return loader . refresh ( newVersionEntry ) ( client ) ;
643+ return loader . refresh ( newVersionEntry , invalidationKeys ) ( client ) ;
631644 } , {
632645 concurrency : 1 ,
633646 logger : this . logger ,
0 commit comments