-
Notifications
You must be signed in to change notification settings - Fork 2.5k
feat: implement global FIFO queue for Evals runs #7971
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Add Redis-based queue management with run-queue, active-run, and dispatcher lock - Modify createRun to enqueue runs instead of immediate spawning - Implement auto-advance mechanism when runs complete - Add UI status column showing Running/Queued/Completed states - Add queue position display for queued runs - Add cancel button for queued runs - Preserve per-run task concurrency via PQueue Addresses issue #7966
| } | ||
| }, [run.id]) | ||
|
|
||
| const getStatusBadge = () => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
User-facing strings (e.g. 'Loading...', 'Running', 'Queued', 'Completed', 'Unknown') are hardcoded. Consider using the i18n translation function to support multiple languages.
This comment was generated because it violated a code review rule: irule_C0ez7Rji6ANcGkkX.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed my own code and found bugs I created 5 minutes ago. Classic.
| if (setActive !== "OK") { | ||
| // Another process may have set an active run, put this run back in the queue | ||
| console.log("Failed to set active run, requeueing") | ||
| await redis.lPush(getRunQueueKey(), runId.toString()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this intentional? Using lPush here means the run gets added to the front of the queue (LIFO) instead of the back (FIFO). This breaks the FIFO ordering when requeueing. Should this be rPush to maintain FIFO order?
| await redis.lPush(getRunQueueKey(), runId.toString()) | |
| await redis.rPush(getRunQueueKey(), runId.toString()) |
| console.error(`Failed to spawn controller for run ${runId}:`, error) | ||
| // Clear active run and requeue on spawn failure | ||
| await redis.del(getActiveRunKey()) | ||
| await redis.lPush(getRunQueueKey(), runId.toString()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar issue here - using lPush for requeueing on spawn failure breaks FIFO order. Consider using rPush to maintain the queue order:
| await redis.lPush(getRunQueueKey(), runId.toString()) | |
| await redis.rPush(getRunQueueKey(), runId.toString()) |
| // Set as active run with generous TTL (1 hour) | ||
| const setActive = await setActiveRun(nextRunId, 3600) | ||
|
|
||
| if (!setActive) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If setting the active run fails here, the run has already been dequeued but isn't being requeued. This could cause the run to be lost. Consider adding error recovery:
| if (!setActive) { | |
| if (!setActive) { | |
| // This shouldn't happen but handle it gracefully | |
| logger.error(`Failed to set run ${nextRunId} as active, requeueing`) | |
| // Requeue the run at the front since it was just dequeued | |
| const redis = await redisClient() | |
| await redis.lPush(getRunQueueKey(), nextRunId.toString()) | |
| return | |
| } |
|
|
||
| fetchStatus() | ||
| // Refresh status every 5 seconds for non-completed runs | ||
| const interval = !run.taskMetricsId ? setInterval(fetchStatus, 5000) : null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we clear the interval immediately when run.taskMetricsId becomes truthy to prevent potential memory leaks? The current logic might continue polling briefly after completion:
| const interval = !run.taskMetricsId ? setInterval(fetchStatus, 5000) : null | |
| // Refresh status every 5 seconds for non-completed runs | |
| let interval: NodeJS.Timeout | null = null | |
| if (!run.taskMetricsId) { | |
| interval = setInterval(() => { | |
| // Check if run completed during interval | |
| if (run.taskMetricsId && interval) { | |
| clearInterval(interval) | |
| interval = null | |
| } else { | |
| fetchStatus() | |
| } | |
| }, 5000) | |
| } |
| childProcess.unref() | ||
| } | ||
|
|
||
| export async function dispatchNextRun() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The dispatch logic is duplicated between this file and packages/evals/src/cli/runEvals.ts. Could we extract this to a shared module to avoid maintenance issues and ensure consistency? This would make future updates easier and reduce the risk of the implementations diverging.
| return activeRunId ? parseInt(activeRunId, 10) : null | ||
| } | ||
|
|
||
| export const setActiveRun = async (runId: number, ttlSeconds: number = 3600): Promise<boolean> => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default TTL of 3600 seconds (1 hour) is hardcoded here and in multiple other places. Consider defining this as a constant at the module level for easier configuration:
| export const setActiveRun = async (runId: number, ttlSeconds: number = 3600): Promise<boolean> => { | |
| const DEFAULT_ACTIVE_RUN_TTL = 3600 // 1 hour in seconds | |
| export const setActiveRun = async (runId: number, ttlSeconds: number = DEFAULT_ACTIVE_RUN_TTL): Promise<boolean> => { |
hannesrudolph
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestions: 1) Dispatcher lock: use a tokenized lock (store random token as value; release only if token matches) and increase/renew TTL to cover spawn time; see packages/evals/src/cli/runEvals.ts (https://github.com/RooCodeInc/Roo-Code/blob/b58ce4eecc598c5c554cfaab8d1a5c61743c7772/packages/evals/src/cli/runEvals.ts). 2) Atomicity: make dequeue -> setActive -> spawn atomic (WATCH/MULTI or Lua); consider BLMOVE/BRPOPLPUSH; see apps/web-evals/src/actions/runs.ts (https://github.com/RooCodeInc/Roo-Code/blob/b58ce4eecc598c5c554cfaab8d1a5c61743c7772/apps/web-evals/src/actions/runs.ts). 3) Active-run TTL: refresh alongside heartbeat so TTL cannot expire mid-run. 4) UI: avoid window.location.reload in cancel flow; prefer router.refresh or revalidatePath; see apps/web-evals/src/components/home/run.tsx. 5) Observability: add logs/metrics around dispatch decisions and lock acquisition.
Summary
This PR implements a global FIFO queue for evaluation runs as requested in #7966. The implementation ensures only one run executes at a time, with additional runs queued automatically.
Changes
Queue Management (Redis-based)
packages/evals/src/cli/redis.ts:evals:run-queue(LIST) for FIFO queue of run IDsevals:active-run(STRING) for currently executing run with TTL for crash safetyevals:dispatcher:lock(STRING) for serializing dispatch operationsRun Creation & Dispatch
createRun()inapps/web-evals/src/actions/runs.ts:dispatchNextRun()function that handles queue processingAuto-advance Mechanism
runEvals()inpackages/evals/src/cli/runEvals.ts:UI Updates
Key Features
✅ Global FIFO queue - Only one run executes at a time
✅ Automatic queue advancement - Next run starts when current completes
✅ Crash safety - TTL on active run and dispatcher lock
✅ Race condition handling - Distributed locking pattern
✅ Minimal UI changes - Status column and cancel button
✅ Preserved concurrency - Per-run task parallelism unchanged
Testing
pnpm check-types)pnpm lint)Notes
runs.statuscolumn for analyticsCloses #7966
cc @hannesrudolph
Important
Implement a global FIFO queue for evaluation runs using Redis, ensuring single execution at a time with UI updates for real-time status.
redis.tsfor FIFO queue (evals:run-queue), active run (evals:active-run), and dispatcher lock (evals:dispatcher:lock).createRun()inruns.tsto enqueue runs instead of immediate execution.dispatchNextRun()inruns.tsandrunEvals.tsfor queue processing with distributed locking.runEvals()inrunEvals.tsto clear active run status and dispatch next run on completion.run.tsxandruns.tsxto show run status (Running, Queued, Completed).run.tsx.run.tsx.This description was created by
for b58ce4e. You can customize this summary. It will automatically update as commits are pushed.