File tree Expand file tree Collapse file tree 3 files changed +23
-8
lines changed
internal-packages/run-engine/src Expand file tree Collapse file tree 3 files changed +23
-8
lines changed Original file line number Diff line number Diff line change @@ -1205,6 +1205,11 @@ export class RunEngine {
1205
1205
throw new NotImplementedError ( "There shouldn't be a heartbeat for QUEUED_EXECUTING" ) ;
1206
1206
}
1207
1207
case "PENDING_EXECUTING" : {
1208
+ this . logger . log ( "RunEngine stalled snapshot PENDING_EXECUTING" , {
1209
+ runId,
1210
+ snapshotId : latestSnapshot . id ,
1211
+ } ) ;
1212
+
1208
1213
//the run didn't start executing, we need to requeue it
1209
1214
const run = await prisma . taskRun . findFirst ( {
1210
1215
where : { id : runId } ,
Original file line number Diff line number Diff line change @@ -143,6 +143,15 @@ export class DequeueSystem {
143
143
const orgId = message . message . orgId ;
144
144
const runId = message . messageId ;
145
145
146
+ this . $ . logger . info ( "DequeueSystem.dequeueFromWorkerQueue dequeued message" , {
147
+ runId,
148
+ orgId,
149
+ environmentId : message . message . environmentId ,
150
+ environmentType : message . message . environmentType ,
151
+ workerQueueLength : message . workerQueueLength ?? 0 ,
152
+ workerQueue,
153
+ } ) ;
154
+
146
155
span . setAttribute ( "run_id" , runId ) ;
147
156
span . setAttribute ( "org_id" , orgId ) ;
148
157
span . setAttribute ( "environment_id" , message . message . environmentId ) ;
Original file line number Diff line number Diff line change @@ -1369,27 +1369,28 @@ export class RunQueue {
1369
1369
1370
1370
const pipeline = this . redis . pipeline ( ) ;
1371
1371
1372
- const workerQueueKeys = new Set < string > ( ) ;
1372
+ const operations = [ ] ;
1373
1373
1374
1374
for ( const message of messages ) {
1375
1375
const workerQueueKey = this . keys . workerQueueKey (
1376
1376
this . #getWorkerQueueFromMessage( message . message )
1377
1377
) ;
1378
1378
1379
- workerQueueKeys . add ( workerQueueKey ) ;
1380
-
1381
1379
const messageKeyValue = this . keys . messageKey ( message . message . orgId , message . messageId ) ;
1382
1380
1381
+ operations . push ( {
1382
+ workerQueueKey : workerQueueKey ,
1383
+ messageId : message . messageId ,
1384
+ } ) ;
1385
+
1383
1386
pipeline . rpush ( workerQueueKey , messageKeyValue ) ;
1384
1387
}
1385
1388
1386
- span . setAttribute ( "worker_queue_count" , workerQueueKeys . size ) ;
1387
- span . setAttribute ( "worker_queue_keys" , Array . from ( workerQueueKeys ) ) ;
1389
+ span . setAttribute ( "operations_count" , operations . length ) ;
1388
1390
1389
- this . logger . debug ( "enqueueMessagesToWorkerQueues pipeline " , {
1391
+ this . logger . info ( "enqueueMessagesToWorkerQueues" , {
1390
1392
service : this . name ,
1391
- messages,
1392
- workerQueueKeys : Array . from ( workerQueueKeys ) ,
1393
+ operations,
1393
1394
} ) ;
1394
1395
1395
1396
await pipeline . exec ( ) ;
You can’t perform that action at this time.
0 commit comments