diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index 81cbb0379c..c7d0855c53 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -1862,10 +1862,12 @@ export class RunQueue { // Call this every 10 minutes private async scanConcurrencySets() { if (this.abortController.signal.aborted) { + this.logger.info("Abort signal received, skipping concurrency scan"); + return; } - this.logger.debug("Scanning concurrency sets for completed runs"); + this.logger.info("Scanning concurrency sets for completed runs"); const stats = { streamCallbacks: 0, @@ -1918,7 +1920,7 @@ export class RunQueue { return; } - this.logger.debug("Processing concurrency keys from stream", { + this.logger.info("Processing concurrency keys from stream", { keys: uniqueKeys, }); @@ -1988,27 +1990,29 @@ export class RunQueue { } private async processCurrentConcurrencyRunIds(concurrencyKey: string, runIds: string[]) { - this.logger.debug(`Processing concurrency set with ${runIds.length} runs`, { + this.logger.info("Processing concurrency set with runs", { concurrencyKey, - runIds: runIds.slice(0, 5), // Log first 5 for debugging + runIds: runIds.slice(0, 5), // Log first 5 for debugging, + runIdsLength: runIds.length, }); // Call the callback to determine which runs are completed const completedRuns = await this.options.concurrencySweeper?.callback(runIds); if (!completedRuns) { - this.logger.debug("No completed runs found in concurrency set", { concurrencyKey }); + this.logger.info("No completed runs found in concurrency set", { concurrencyKey }); return; } if (completedRuns.length === 0) { - this.logger.debug("No completed runs found in concurrency set", { concurrencyKey }); + this.logger.info("No completed runs found in concurrency set", { concurrencyKey }); return; } - this.logger.debug(`Found ${completedRuns.length} completed runs to mark for ack`, { + this.logger.info("Found completed runs to mark for ack", { concurrencyKey, completedRunIds: completedRuns.map((r) => r.id).slice(0, 5), + completedRunIdsLength: completedRuns.length, }); // Mark the completed runs for acknowledgment @@ -2032,7 +2036,7 @@ export class RunQueue { const count = await this.redis.zadd(markedForAckKey, ...args); - this.logger.debug(`Marked ${count} runs for acknowledgment`, { + this.logger.info("Marked runs for acknowledgment", { markedForAckKey, count, });