File tree Expand file tree Collapse file tree 3 files changed +23
-8
lines changed
internal-packages/run-engine/src Expand file tree Collapse file tree 3 files changed +23
-8
lines changed Original file line number Diff line number Diff line change @@ -1414,6 +1414,11 @@ export class RunEngine {
1414
1414
throw new NotImplementedError ( "There shouldn't be a heartbeat for QUEUED_EXECUTING" ) ;
1415
1415
}
1416
1416
case "PENDING_EXECUTING" : {
1417
+ this . logger . log ( "RunEngine stalled snapshot PENDING_EXECUTING" , {
1418
+ runId,
1419
+ snapshotId : latestSnapshot . id ,
1420
+ } ) ;
1421
+
1417
1422
//the run didn't start executing, we need to requeue it
1418
1423
const run = await prisma . taskRun . findFirst ( {
1419
1424
where : { id : runId } ,
Original file line number Diff line number Diff line change @@ -143,6 +143,15 @@ export class DequeueSystem {
143
143
const orgId = message . message . orgId ;
144
144
const runId = message . messageId ;
145
145
146
+ this . $ . logger . info ( "DequeueSystem.dequeueFromWorkerQueue dequeued message" , {
147
+ runId,
148
+ orgId,
149
+ environmentId : message . message . environmentId ,
150
+ environmentType : message . message . environmentType ,
151
+ workerQueueLength : message . workerQueueLength ?? 0 ,
152
+ workerQueue,
153
+ } ) ;
154
+
146
155
span . setAttribute ( "run_id" , runId ) ;
147
156
span . setAttribute ( "org_id" , orgId ) ;
148
157
span . setAttribute ( "environment_id" , message . message . environmentId ) ;
Original file line number Diff line number Diff line change @@ -1417,27 +1417,28 @@ export class RunQueue {
1417
1417
1418
1418
const pipeline = this . redis . pipeline ( ) ;
1419
1419
1420
- const workerQueueKeys = new Set < string > ( ) ;
1420
+ const operations = [ ] ;
1421
1421
1422
1422
for ( const message of messages ) {
1423
1423
const workerQueueKey = this . keys . workerQueueKey (
1424
1424
this . #getWorkerQueueFromMessage( message . message )
1425
1425
) ;
1426
1426
1427
- workerQueueKeys . add ( workerQueueKey ) ;
1428
-
1429
1427
const messageKeyValue = this . keys . messageKey ( message . message . orgId , message . messageId ) ;
1430
1428
1429
+ operations . push ( {
1430
+ workerQueueKey : workerQueueKey ,
1431
+ messageId : message . messageId ,
1432
+ } ) ;
1433
+
1431
1434
pipeline . rpush ( workerQueueKey , messageKeyValue ) ;
1432
1435
}
1433
1436
1434
- span . setAttribute ( "worker_queue_count" , workerQueueKeys . size ) ;
1435
- span . setAttribute ( "worker_queue_keys" , Array . from ( workerQueueKeys ) ) ;
1437
+ span . setAttribute ( "operations_count" , operations . length ) ;
1436
1438
1437
- this . logger . debug ( "enqueueMessagesToWorkerQueues pipeline " , {
1439
+ this . logger . info ( "enqueueMessagesToWorkerQueues" , {
1438
1440
service : this . name ,
1439
- messages,
1440
- workerQueueKeys : Array . from ( workerQueueKeys ) ,
1441
+ operations,
1441
1442
} ) ;
1442
1443
1443
1444
await pipeline . exec ( ) ;
You can’t perform that action at this time.
0 commit comments