Skip to content

Commit 743b8db

Browse files
authored
chore(run-engine): add additional logging around dequeueing and worker queues (#2562)
1 parent eb0263e commit 743b8db

File tree

3 files changed

+23
-8
lines changed

3 files changed

+23
-8
lines changed

internal-packages/run-engine/src/engine/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1414,6 +1414,11 @@ export class RunEngine {
14141414
throw new NotImplementedError("There shouldn't be a heartbeat for QUEUED_EXECUTING");
14151415
}
14161416
case "PENDING_EXECUTING": {
1417+
this.logger.log("RunEngine stalled snapshot PENDING_EXECUTING", {
1418+
runId,
1419+
snapshotId: latestSnapshot.id,
1420+
});
1421+
14171422
//the run didn't start executing, we need to requeue it
14181423
const run = await prisma.taskRun.findFirst({
14191424
where: { id: runId },

internal-packages/run-engine/src/engine/systems/dequeueSystem.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,15 @@ export class DequeueSystem {
143143
const orgId = message.message.orgId;
144144
const runId = message.messageId;
145145

146+
this.$.logger.info("DequeueSystem.dequeueFromWorkerQueue dequeued message", {
147+
runId,
148+
orgId,
149+
environmentId: message.message.environmentId,
150+
environmentType: message.message.environmentType,
151+
workerQueueLength: message.workerQueueLength ?? 0,
152+
workerQueue,
153+
});
154+
146155
span.setAttribute("run_id", runId);
147156
span.setAttribute("org_id", orgId);
148157
span.setAttribute("environment_id", message.message.environmentId);

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1417,27 +1417,28 @@ export class RunQueue {
14171417

14181418
const pipeline = this.redis.pipeline();
14191419

1420-
const workerQueueKeys = new Set<string>();
1420+
const operations = [];
14211421

14221422
for (const message of messages) {
14231423
const workerQueueKey = this.keys.workerQueueKey(
14241424
this.#getWorkerQueueFromMessage(message.message)
14251425
);
14261426

1427-
workerQueueKeys.add(workerQueueKey);
1428-
14291427
const messageKeyValue = this.keys.messageKey(message.message.orgId, message.messageId);
14301428

1429+
operations.push({
1430+
workerQueueKey: workerQueueKey,
1431+
messageId: message.messageId,
1432+
});
1433+
14311434
pipeline.rpush(workerQueueKey, messageKeyValue);
14321435
}
14331436

1434-
span.setAttribute("worker_queue_count", workerQueueKeys.size);
1435-
span.setAttribute("worker_queue_keys", Array.from(workerQueueKeys));
1437+
span.setAttribute("operations_count", operations.length);
14361438

1437-
this.logger.debug("enqueueMessagesToWorkerQueues pipeline", {
1439+
this.logger.info("enqueueMessagesToWorkerQueues", {
14381440
service: this.name,
1439-
messages,
1440-
workerQueueKeys: Array.from(workerQueueKeys),
1441+
operations,
14411442
});
14421443

14431444
await pipeline.exec();

0 commit comments

Comments
 (0)