-
Notifications
You must be signed in to change notification settings - Fork 2.5k
feat: global FIFO queue for Evals runs (#7966) #7981
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,113 @@ | ||||||||||||||||||
| "use server" | ||||||||||||||||||
|
|
||||||||||||||||||
| import fs from "fs" | ||||||||||||||||||
| import { spawn } from "child_process" | ||||||||||||||||||
| import { revalidatePath } from "next/cache" | ||||||||||||||||||
|
|
||||||||||||||||||
| import { deleteRun as _deleteRun } from "@roo-code/evals" | ||||||||||||||||||
|
|
||||||||||||||||||
| import { redisClient } from "@/lib/server/redis" | ||||||||||||||||||
|
|
||||||||||||||||||
| const RUN_QUEUE_KEY = "evals:run-queue" | ||||||||||||||||||
| const ACTIVE_RUN_KEY = "evals:active-run" | ||||||||||||||||||
| const DISPATCH_LOCK_KEY = "evals:dispatcher:lock" | ||||||||||||||||||
| const ACTIVE_RUN_TTL_SECONDS = 60 * 60 * 12 // 12 hours | ||||||||||||||||||
| const DISPATCH_LOCK_TTL_SECONDS = 30 | ||||||||||||||||||
|
|
||||||||||||||||||
| async function spawnController(runId: number) { | ||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function is duplicated in |
||||||||||||||||||
| const isRunningInDocker = fs.existsSync("/.dockerenv") | ||||||||||||||||||
|
|
||||||||||||||||||
| const dockerArgs = [ | ||||||||||||||||||
| `--name evals-controller-${runId}`, | ||||||||||||||||||
| "--rm", | ||||||||||||||||||
| "--network evals_default", | ||||||||||||||||||
| "-v /var/run/docker.sock:/var/run/docker.sock", | ||||||||||||||||||
| "-v /tmp/evals:/var/log/evals", | ||||||||||||||||||
| "-e HOST_EXECUTION_METHOD=docker", | ||||||||||||||||||
| ] | ||||||||||||||||||
|
|
||||||||||||||||||
| const cliCommand = `pnpm --filter @roo-code/evals cli --runId ${runId}` | ||||||||||||||||||
|
|
||||||||||||||||||
| const command = isRunningInDocker | ||||||||||||||||||
| ? `docker run ${dockerArgs.join(" ")} evals-runner sh -c "${cliCommand}"` | ||||||||||||||||||
| : cliCommand | ||||||||||||||||||
|
|
||||||||||||||||||
| const childProcess = spawn("sh", ["-c", command], { | ||||||||||||||||||
| detached: true, | ||||||||||||||||||
| stdio: ["ignore", "pipe", "pipe"], | ||||||||||||||||||
| }) | ||||||||||||||||||
|
|
||||||||||||||||||
| // Best-effort logging of controller output | ||||||||||||||||||
| try { | ||||||||||||||||||
| const logStream = fs.createWriteStream("/tmp/roo-code-evals.log", { flags: "a" }) | ||||||||||||||||||
| childProcess.stdout?.pipe(logStream) | ||||||||||||||||||
| childProcess.stderr?.pipe(logStream) | ||||||||||||||||||
| } catch (_error) { | ||||||||||||||||||
| // Intentionally ignore logging pipe errors | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| childProcess.unref() | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| /** | ||||||||||||||||||
| * Enqueue a run into the global FIFO (idempotent). | ||||||||||||||||||
| */ | ||||||||||||||||||
| export async function enqueueRun(runId: number) { | ||||||||||||||||||
| const redis = await redisClient() | ||||||||||||||||||
| const exists = await redis.lPos(RUN_QUEUE_KEY, runId.toString()) | ||||||||||||||||||
| if (exists === null) { | ||||||||||||||||||
| await redis.rPush(RUN_QUEUE_KEY, runId.toString()) | ||||||||||||||||||
| } | ||||||||||||||||||
| revalidatePath("/runs") | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| /** | ||||||||||||||||||
| * Dispatcher: if no active run, pop next from queue and start controller. | ||||||||||||||||||
| * Uses a short-lived lock to avoid races between concurrent dispatchers. | ||||||||||||||||||
| */ | ||||||||||||||||||
| export async function dispatchNextRun() { | ||||||||||||||||||
| const redis = await redisClient() | ||||||||||||||||||
|
|
||||||||||||||||||
| // Try to acquire dispatcher lock | ||||||||||||||||||
| const locked = await redis.set(DISPATCH_LOCK_KEY, "1", { NX: true, EX: DISPATCH_LOCK_TTL_SECONDS }) | ||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a potential race condition here? If the dispatcher lock expires (30s) while we're still processing, another dispatcher could start processing the same queue. Consider either:
|
||||||||||||||||||
| if (!locked) return | ||||||||||||||||||
|
|
||||||||||||||||||
| try { | ||||||||||||||||||
| // If an active run is present, nothing to do. | ||||||||||||||||||
| const active = await redis.get(ACTIVE_RUN_KEY) | ||||||||||||||||||
| if (active) return | ||||||||||||||||||
|
|
||||||||||||||||||
| const nextId = await redis.lPop(RUN_QUEUE_KEY) | ||||||||||||||||||
| if (!nextId) return | ||||||||||||||||||
|
|
||||||||||||||||||
| const ok = await redis.set(ACTIVE_RUN_KEY, nextId, { NX: true, EX: ACTIVE_RUN_TTL_SECONDS }) | ||||||||||||||||||
| if (!ok) { | ||||||||||||||||||
| // put it back to preserve order and exit | ||||||||||||||||||
| await redis.lPush(RUN_QUEUE_KEY, nextId) | ||||||||||||||||||
| return | ||||||||||||||||||
| } | ||||||||||||||||||
|
|
||||||||||||||||||
| await spawnController(Number(nextId)) | ||||||||||||||||||
| } finally { | ||||||||||||||||||
| await redis.del(DISPATCH_LOCK_KEY).catch(() => {}) | ||||||||||||||||||
|
||||||||||||||||||
| await redis.del(DISPATCH_LOCK_KEY).catch(() => {}) | |
| await redis.del(DISPATCH_LOCK_KEY).catch((err) => { | |
| console.error("Failed to delete dispatcher lock key:", err) | |
| }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with Copilot - could we add error logging here for debugging purposes? Silent failures make troubleshooting difficult in production.
| await redis.del(DISPATCH_LOCK_KEY).catch(() => {}) | |
| await redis.del(DISPATCH_LOCK_KEY).catch((err) => { | |
| console.error("Failed to delete dispatcher lock:", err) | |
| }) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,9 +1,9 @@ | ||
| "use server" | ||
|
|
||
| import * as path from "path" | ||
| import fs from "fs" | ||
| import { fileURLToPath } from "url" | ||
| import { spawn } from "child_process" | ||
|
|
||
| import { enqueueRun, dispatchNextRun } from "@/actions/queue" | ||
|
|
||
| import { revalidatePath } from "next/cache" | ||
| import pMap from "p-map" | ||
|
|
@@ -52,41 +52,9 @@ export async function createRun({ suite, exercises = [], systemPrompt, timeout, | |
| revalidatePath("/runs") | ||
|
|
||
| try { | ||
| const isRunningInDocker = fs.existsSync("/.dockerenv") | ||
|
|
||
| const dockerArgs = [ | ||
| `--name evals-controller-${run.id}`, | ||
| "--rm", | ||
| "--network evals_default", | ||
| "-v /var/run/docker.sock:/var/run/docker.sock", | ||
| "-v /tmp/evals:/var/log/evals", | ||
| "-e HOST_EXECUTION_METHOD=docker", | ||
| ] | ||
|
|
||
| const cliCommand = `pnpm --filter @roo-code/evals cli --runId ${run.id}` | ||
|
|
||
| const command = isRunningInDocker | ||
| ? `docker run ${dockerArgs.join(" ")} evals-runner sh -c "${cliCommand}"` | ||
| : cliCommand | ||
|
|
||
| console.log("spawn ->", command) | ||
|
|
||
| const childProcess = spawn("sh", ["-c", command], { | ||
| detached: true, | ||
| stdio: ["ignore", "pipe", "pipe"], | ||
| }) | ||
|
|
||
| const logStream = fs.createWriteStream("/tmp/roo-code-evals.log", { flags: "a" }) | ||
|
|
||
| if (childProcess.stdout) { | ||
| childProcess.stdout.pipe(logStream) | ||
| } | ||
|
|
||
| if (childProcess.stderr) { | ||
| childProcess.stderr.pipe(logStream) | ||
| } | ||
|
|
||
| childProcess.unref() | ||
| // Enqueue the run and attempt to dispatch if no active run exists. | ||
| await enqueueRun(run.id) | ||
| await dispatchNextRun() | ||
| } catch (error) { | ||
| console.error(error) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This error handling only logs to console. Should we consider:
Silent failures could leave users confused about why their run isn't queued. |
||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,10 +1,13 @@ | ||||||||||
| import { useCallback, useState, useRef } from "react" | ||||||||||
| import Link from "next/link" | ||||||||||
| import { Ellipsis, ClipboardList, Copy, Check, LoaderCircle, Trash } from "lucide-react" | ||||||||||
| import { useQuery } from "@tanstack/react-query" | ||||||||||
| import { Ellipsis, ClipboardList, Copy, Check, LoaderCircle, Trash, XCircle } from "lucide-react" | ||||||||||
|
|
||||||||||
| import type { Run as EvalsRun, TaskMetrics as EvalsTaskMetrics } from "@roo-code/evals" | ||||||||||
|
|
||||||||||
| import { deleteRun } from "@/actions/runs" | ||||||||||
| import { getHeartbeat } from "@/actions/heartbeat" | ||||||||||
| import { getQueuePosition, cancelQueuedRun } from "@/actions/queue" | ||||||||||
| import { formatCurrency, formatDuration, formatTokens, formatToolUsageSuccessRate } from "@/lib/formatters" | ||||||||||
| import { useCopyRun } from "@/hooks/use-copy-run" | ||||||||||
| import { | ||||||||||
|
|
@@ -35,6 +38,23 @@ export function Run({ run, taskMetrics }: RunProps) { | |||||||||
| const continueRef = useRef<HTMLButtonElement>(null) | ||||||||||
| const { isPending, copyRun, copied } = useCopyRun(run.id) | ||||||||||
|
|
||||||||||
| // Poll heartbeat and queue position for status column | ||||||||||
| const { data: heartbeat } = useQuery({ | ||||||||||
| queryKey: ["getHeartbeat", run.id], | ||||||||||
| queryFn: () => getHeartbeat(run.id), | ||||||||||
| refetchInterval: 10_000, | ||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The 10-second polling interval might be excessive for long-running queues. Consider making this configurable or using a progressive interval (e.g., start at 10s, increase to 30s after a few polls)? |
||||||||||
| }) | ||||||||||
|
|
||||||||||
| const { data: queuePosition } = useQuery({ | ||||||||||
| queryKey: ["getQueuePosition", run.id], | ||||||||||
| queryFn: () => getQueuePosition(run.id), | ||||||||||
| refetchInterval: 10_000, | ||||||||||
| }) | ||||||||||
|
|
||||||||||
| const isCompleted = !!run.taskMetricsId | ||||||||||
| const isRunning = !!heartbeat | ||||||||||
| const isQueued = !isCompleted && !isRunning && queuePosition !== null && queuePosition !== undefined | ||||||||||
|
||||||||||
| const isQueued = !isCompleted && !isRunning && queuePosition !== null && queuePosition !== undefined | |
| const isQueued = !isCompleted && !isRunning && queuePosition != null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with Copilot's suggestion here. Could we simplify this to use queuePosition != null which checks for both null and undefined?
| const isQueued = !isCompleted && !isRunning && queuePosition !== null && queuePosition !== undefined | |
| const isQueued = !isCompleted && !isRunning && queuePosition != null |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,89 @@ | ||
| import fs from "node:fs" | ||
| import { spawn } from "node:child_process" | ||
|
|
||
| import { redisClient } from "./redis.js" | ||
| import { isDockerContainer } from "./utils.js" | ||
|
|
||
| const RUN_QUEUE_KEY = "evals:run-queue" | ||
| const ACTIVE_RUN_KEY = "evals:active-run" | ||
| const DISPATCH_LOCK_KEY = "evals:dispatcher:lock" | ||
| const ACTIVE_RUN_TTL_SECONDS = 60 * 60 * 12 // 12 hours | ||
| const DISPATCH_LOCK_TTL_SECONDS = 30 | ||
|
|
||
| async function spawnController(runId: number) { | ||
| const containerized = isDockerContainer() | ||
|
|
||
| const dockerArgs = [ | ||
| `--name evals-controller-${runId}`, | ||
| "--rm", | ||
| "--network evals_default", | ||
| "-v /var/run/docker.sock:/var/run/docker.sock", | ||
| "-v /tmp/evals:/var/log/evals", | ||
| "-e HOST_EXECUTION_METHOD=docker", | ||
| ] | ||
|
|
||
| const cliCommand = `pnpm --filter @roo-code/evals cli --runId ${runId}` | ||
| const command = containerized ? `docker run ${dockerArgs.join(" ")} evals-runner sh -c "${cliCommand}"` : cliCommand | ||
|
|
||
| const childProcess = spawn("sh", ["-c", command], { | ||
| detached: true, | ||
| stdio: ["ignore", "pipe", "pipe"], | ||
| }) | ||
|
|
||
| // Best-effort logging of controller output (host path or container path) | ||
| try { | ||
| const logStream = fs.createWriteStream("/tmp/roo-code-evals.log", { flags: "a" }) | ||
| childProcess.stdout?.pipe(logStream) | ||
| childProcess.stderr?.pipe(logStream) | ||
| } catch { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Empty catch blocks here and at line 85. For consistency with the web implementation, should we at least add a comment explaining why errors are ignored, or consider logging them? |
||
| // ignore logging errors | ||
| } | ||
|
|
||
| childProcess.unref() | ||
| } | ||
|
|
||
| /** | ||
| * Clear the active-run marker (if any) and try to dispatch the next run in FIFO order. | ||
| * Uses a short-lived lock to avoid races with other dispatchers (web app or other controllers). | ||
| */ | ||
| export async function finishActiveRunAndDispatch() { | ||
| const redis = await redisClient() | ||
|
|
||
| // Clear the active run marker first (if exists). We do not care if it was already expired. | ||
| try { | ||
| await redis.del(ACTIVE_RUN_KEY) | ||
| } catch { | ||
| // ignore | ||
| } | ||
|
|
||
| // Try to acquire dispatcher lock (NX+EX). If we don't get it, another dispatcher will handle it. | ||
| const locked = await redis.set(DISPATCH_LOCK_KEY, "1", { NX: true, EX: DISPATCH_LOCK_TTL_SECONDS }) | ||
| if (!locked) return | ||
|
|
||
| try { | ||
| // If another process re-marked active-run meanwhile, bail out. | ||
| const active = await redis.get(ACTIVE_RUN_KEY) | ||
| if (active) return | ||
|
|
||
| // Pop next run id from the head of the queue. | ||
| const nextId = await redis.lPop(RUN_QUEUE_KEY) | ||
| if (!nextId) return | ||
|
|
||
| // Mark as active (with TTL) to provide crash safety. | ||
| const ok = await redis.set(ACTIVE_RUN_KEY, nextId, { NX: true, EX: ACTIVE_RUN_TTL_SECONDS }) | ||
| if (!ok) { | ||
| // Could not set active (race). Push id back to the head to preserve order and exit. | ||
| await redis.lPush(RUN_QUEUE_KEY, nextId) | ||
| return | ||
| } | ||
|
|
||
| // Spawn the next controller in background. | ||
| await spawnController(Number(nextId)) | ||
| } finally { | ||
| try { | ||
| await redis.del(DISPATCH_LOCK_KEY) | ||
| } catch { | ||
| // ignore | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The 12-hour TTL seems quite generous. If a run crashes without clearing the active marker, the queue could be blocked for up to 12 hours. Consider: