Skip to content

Commit ef4b530

Browse files
committed
feat: global FIFO queue for Evals runs (#7966)
1 parent 72bc790 commit ef4b530

File tree

7 files changed

+340
-39
lines changed

7 files changed

+340
-39
lines changed
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
"use server"
2+
3+
import fs from "fs"
4+
import { spawn } from "child_process"
5+
import { revalidatePath } from "next/cache"
6+
7+
import { deleteRun as _deleteRun } from "@roo-code/evals"
8+
9+
import { redisClient } from "@/lib/server/redis"
10+
11+
const RUN_QUEUE_KEY = "evals:run-queue"
12+
const ACTIVE_RUN_KEY = "evals:active-run"
13+
const DISPATCH_LOCK_KEY = "evals:dispatcher:lock"
14+
const ACTIVE_RUN_TTL_SECONDS = 60 * 60 * 12 // 12 hours
15+
const DISPATCH_LOCK_TTL_SECONDS = 30
16+
17+
async function spawnController(runId: number) {
18+
const isRunningInDocker = fs.existsSync("/.dockerenv")
19+
20+
const dockerArgs = [
21+
`--name evals-controller-${runId}`,
22+
"--rm",
23+
"--network evals_default",
24+
"-v /var/run/docker.sock:/var/run/docker.sock",
25+
"-v /tmp/evals:/var/log/evals",
26+
"-e HOST_EXECUTION_METHOD=docker",
27+
]
28+
29+
const cliCommand = `pnpm --filter @roo-code/evals cli --runId ${runId}`
30+
31+
const command = isRunningInDocker
32+
? `docker run ${dockerArgs.join(" ")} evals-runner sh -c "${cliCommand}"`
33+
: cliCommand
34+
35+
const childProcess = spawn("sh", ["-c", command], {
36+
detached: true,
37+
stdio: ["ignore", "pipe", "pipe"],
38+
})
39+
40+
// Best-effort logging of controller output
41+
try {
42+
const logStream = fs.createWriteStream("/tmp/roo-code-evals.log", { flags: "a" })
43+
childProcess.stdout?.pipe(logStream)
44+
childProcess.stderr?.pipe(logStream)
45+
} catch (_error) {
46+
// Intentionally ignore logging pipe errors
47+
}
48+
49+
childProcess.unref()
50+
}
51+
52+
/**
53+
* Enqueue a run into the global FIFO (idempotent).
54+
*/
55+
export async function enqueueRun(runId: number) {
56+
const redis = await redisClient()
57+
const exists = await redis.lPos(RUN_QUEUE_KEY, runId.toString())
58+
if (exists === null) {
59+
await redis.rPush(RUN_QUEUE_KEY, runId.toString())
60+
}
61+
revalidatePath("/runs")
62+
}
63+
64+
/**
65+
* Dispatcher: if no active run, pop next from queue and start controller.
66+
* Uses a short-lived lock to avoid races between concurrent dispatchers.
67+
*/
68+
export async function dispatchNextRun() {
69+
const redis = await redisClient()
70+
71+
// Try to acquire dispatcher lock
72+
const locked = await redis.set(DISPATCH_LOCK_KEY, "1", { NX: true, EX: DISPATCH_LOCK_TTL_SECONDS })
73+
if (!locked) return
74+
75+
try {
76+
// If an active run is present, nothing to do.
77+
const active = await redis.get(ACTIVE_RUN_KEY)
78+
if (active) return
79+
80+
const nextId = await redis.lPop(RUN_QUEUE_KEY)
81+
if (!nextId) return
82+
83+
const ok = await redis.set(ACTIVE_RUN_KEY, nextId, { NX: true, EX: ACTIVE_RUN_TTL_SECONDS })
84+
if (!ok) {
85+
// put it back to preserve order and exit
86+
await redis.lPush(RUN_QUEUE_KEY, nextId)
87+
return
88+
}
89+
90+
await spawnController(Number(nextId))
91+
} finally {
92+
await redis.del(DISPATCH_LOCK_KEY).catch(() => {})
93+
}
94+
}
95+
96+
/**
97+
* Return 1-based position in the global FIFO queue, or null if not queued.
98+
*/
99+
export async function getQueuePosition(runId: number): Promise<number | null> {
100+
const redis = await redisClient()
101+
const idx = await redis.lPos(RUN_QUEUE_KEY, runId.toString())
102+
return idx === null ? null : idx + 1
103+
}
104+
105+
/**
106+
* Remove a queued run from the FIFO queue and delete the run record.
107+
*/
108+
export async function cancelQueuedRun(runId: number) {
109+
const redis = await redisClient()
110+
await redis.lRem(RUN_QUEUE_KEY, 1, runId.toString())
111+
await _deleteRun(runId)
112+
revalidatePath("/runs")
113+
}

apps/web-evals/src/actions/runs.ts

Lines changed: 5 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
"use server"
22

33
import * as path from "path"
4-
import fs from "fs"
54
import { fileURLToPath } from "url"
6-
import { spawn } from "child_process"
5+
6+
import { enqueueRun, dispatchNextRun } from "@/actions/queue"
77

88
import { revalidatePath } from "next/cache"
99
import pMap from "p-map"
@@ -52,41 +52,9 @@ export async function createRun({ suite, exercises = [], systemPrompt, timeout,
5252
revalidatePath("/runs")
5353

5454
try {
55-
const isRunningInDocker = fs.existsSync("/.dockerenv")
56-
57-
const dockerArgs = [
58-
`--name evals-controller-${run.id}`,
59-
"--rm",
60-
"--network evals_default",
61-
"-v /var/run/docker.sock:/var/run/docker.sock",
62-
"-v /tmp/evals:/var/log/evals",
63-
"-e HOST_EXECUTION_METHOD=docker",
64-
]
65-
66-
const cliCommand = `pnpm --filter @roo-code/evals cli --runId ${run.id}`
67-
68-
const command = isRunningInDocker
69-
? `docker run ${dockerArgs.join(" ")} evals-runner sh -c "${cliCommand}"`
70-
: cliCommand
71-
72-
console.log("spawn ->", command)
73-
74-
const childProcess = spawn("sh", ["-c", command], {
75-
detached: true,
76-
stdio: ["ignore", "pipe", "pipe"],
77-
})
78-
79-
const logStream = fs.createWriteStream("/tmp/roo-code-evals.log", { flags: "a" })
80-
81-
if (childProcess.stdout) {
82-
childProcess.stdout.pipe(logStream)
83-
}
84-
85-
if (childProcess.stderr) {
86-
childProcess.stderr.pipe(logStream)
87-
}
88-
89-
childProcess.unref()
55+
// Enqueue the run and attempt to dispatch if no active run exists.
56+
await enqueueRun(run.id)
57+
await dispatchNextRun()
9058
} catch (error) {
9159
console.error(error)
9260
}

apps/web-evals/src/components/home/run.tsx

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
import { useCallback, useState, useRef } from "react"
22
import Link from "next/link"
3-
import { Ellipsis, ClipboardList, Copy, Check, LoaderCircle, Trash } from "lucide-react"
3+
import { useQuery } from "@tanstack/react-query"
4+
import { Ellipsis, ClipboardList, Copy, Check, LoaderCircle, Trash, XCircle } from "lucide-react"
45

56
import type { Run as EvalsRun, TaskMetrics as EvalsTaskMetrics } from "@roo-code/evals"
67

78
import { deleteRun } from "@/actions/runs"
9+
import { getHeartbeat } from "@/actions/heartbeat"
10+
import { getQueuePosition, cancelQueuedRun } from "@/actions/queue"
811
import { formatCurrency, formatDuration, formatTokens, formatToolUsageSuccessRate } from "@/lib/formatters"
912
import { useCopyRun } from "@/hooks/use-copy-run"
1013
import {
@@ -35,6 +38,23 @@ export function Run({ run, taskMetrics }: RunProps) {
3538
const continueRef = useRef<HTMLButtonElement>(null)
3639
const { isPending, copyRun, copied } = useCopyRun(run.id)
3740

41+
// Poll heartbeat and queue position for status column
42+
const { data: heartbeat } = useQuery({
43+
queryKey: ["getHeartbeat", run.id],
44+
queryFn: () => getHeartbeat(run.id),
45+
refetchInterval: 10_000,
46+
})
47+
48+
const { data: queuePosition } = useQuery({
49+
queryKey: ["getQueuePosition", run.id],
50+
queryFn: () => getQueuePosition(run.id),
51+
refetchInterval: 10_000,
52+
})
53+
54+
const isCompleted = !!run.taskMetricsId
55+
const isRunning = !!heartbeat
56+
const isQueued = !isCompleted && !isRunning && queuePosition !== null && queuePosition !== undefined
57+
3858
const onConfirmDelete = useCallback(async () => {
3959
if (!deleteRunId) {
4060
return
@@ -51,6 +71,9 @@ export function Run({ run, taskMetrics }: RunProps) {
5171
return (
5272
<>
5373
<TableRow>
74+
<TableCell>
75+
{isCompleted ? "Completed" : isRunning ? "Running" : isQueued ? <>Queued (#{queuePosition})</> : ""}
76+
</TableCell>
5477
<TableCell>{run.model}</TableCell>
5578
<TableCell>{run.passed}</TableCell>
5679
<TableCell>{run.failed}</TableCell>
@@ -116,6 +139,21 @@ export function Run({ run, taskMetrics }: RunProps) {
116139
</div>
117140
</DropdownMenuItem>
118141
)}
142+
{isQueued && (
143+
<DropdownMenuItem
144+
onClick={async () => {
145+
try {
146+
await cancelQueuedRun(run.id)
147+
} catch (error) {
148+
console.error(error)
149+
}
150+
}}>
151+
<div className="flex items-center gap-1">
152+
<XCircle />
153+
<div>Cancel</div>
154+
</div>
155+
</DropdownMenuItem>
156+
)}
119157
<DropdownMenuItem
120158
onClick={() => {
121159
setDeleteRunId(run.id)

apps/web-evals/src/components/home/runs.tsx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ export function Runs({ runs }: { runs: RunWithTaskMetrics[] }) {
1818
<Table className="border border-t-0">
1919
<TableHeader>
2020
<TableRow>
21+
<TableHead>Status</TableHead>
2122
<TableHead>Model</TableHead>
2223
<TableHead>Passed</TableHead>
2324
<TableHead>Failed</TableHead>
@@ -34,7 +35,7 @@ export function Runs({ runs }: { runs: RunWithTaskMetrics[] }) {
3435
runs.map(({ taskMetrics, ...run }) => <Row key={run.id} run={run} taskMetrics={taskMetrics} />)
3536
) : (
3637
<TableRow>
37-
<TableCell colSpan={9} className="text-center">
38+
<TableCell colSpan={10} className="text-center">
3839
No eval runs yet.
3940
<Button variant="link" onClick={() => router.push("/runs/new")}>
4041
Launch

packages/evals/src/cli/queue.ts

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import fs from "node:fs"
2+
import { spawn } from "node:child_process"
3+
4+
import { redisClient } from "./redis.js"
5+
import { isDockerContainer } from "./utils.js"
6+
7+
const RUN_QUEUE_KEY = "evals:run-queue"
8+
const ACTIVE_RUN_KEY = "evals:active-run"
9+
const DISPATCH_LOCK_KEY = "evals:dispatcher:lock"
10+
const ACTIVE_RUN_TTL_SECONDS = 60 * 60 * 12 // 12 hours
11+
const DISPATCH_LOCK_TTL_SECONDS = 30
12+
13+
async function spawnController(runId: number) {
14+
const containerized = isDockerContainer()
15+
16+
const dockerArgs = [
17+
`--name evals-controller-${runId}`,
18+
"--rm",
19+
"--network evals_default",
20+
"-v /var/run/docker.sock:/var/run/docker.sock",
21+
"-v /tmp/evals:/var/log/evals",
22+
"-e HOST_EXECUTION_METHOD=docker",
23+
]
24+
25+
const cliCommand = `pnpm --filter @roo-code/evals cli --runId ${runId}`
26+
const command = containerized ? `docker run ${dockerArgs.join(" ")} evals-runner sh -c "${cliCommand}"` : cliCommand
27+
28+
const childProcess = spawn("sh", ["-c", command], {
29+
detached: true,
30+
stdio: ["ignore", "pipe", "pipe"],
31+
})
32+
33+
// Best-effort logging of controller output (host path or container path)
34+
try {
35+
const logStream = fs.createWriteStream("/tmp/roo-code-evals.log", { flags: "a" })
36+
childProcess.stdout?.pipe(logStream)
37+
childProcess.stderr?.pipe(logStream)
38+
} catch {
39+
// ignore logging errors
40+
}
41+
42+
childProcess.unref()
43+
}
44+
45+
/**
46+
* Clear the active-run marker (if any) and try to dispatch the next run in FIFO order.
47+
* Uses a short-lived lock to avoid races with other dispatchers (web app or other controllers).
48+
*/
49+
export async function finishActiveRunAndDispatch() {
50+
const redis = await redisClient()
51+
52+
// Clear the active run marker first (if exists). We do not care if it was already expired.
53+
try {
54+
await redis.del(ACTIVE_RUN_KEY)
55+
} catch {
56+
// ignore
57+
}
58+
59+
// Try to acquire dispatcher lock (NX+EX). If we don't get it, another dispatcher will handle it.
60+
const locked = await redis.set(DISPATCH_LOCK_KEY, "1", { NX: true, EX: DISPATCH_LOCK_TTL_SECONDS })
61+
if (!locked) return
62+
63+
try {
64+
// If another process re-marked active-run meanwhile, bail out.
65+
const active = await redis.get(ACTIVE_RUN_KEY)
66+
if (active) return
67+
68+
// Pop next run id from the head of the queue.
69+
const nextId = await redis.lPop(RUN_QUEUE_KEY)
70+
if (!nextId) return
71+
72+
// Mark as active (with TTL) to provide crash safety.
73+
const ok = await redis.set(ACTIVE_RUN_KEY, nextId, { NX: true, EX: ACTIVE_RUN_TTL_SECONDS })
74+
if (!ok) {
75+
// Could not set active (race). Push id back to the head to preserve order and exit.
76+
await redis.lPush(RUN_QUEUE_KEY, nextId)
77+
return
78+
}
79+
80+
// Spawn the next controller in background.
81+
await spawnController(Number(nextId))
82+
} finally {
83+
try {
84+
await redis.del(DISPATCH_LOCK_KEY)
85+
} catch {
86+
// ignore
87+
}
88+
}
89+
}

packages/evals/src/cli/runEvals.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { EVALS_REPO_PATH } from "../exercises/index.js"
66
import { Logger, getTag, isDockerContainer, resetEvalsRepo, commitEvalsRepoChanges } from "./utils.js"
77
import { startHeartbeat, stopHeartbeat } from "./redis.js"
88
import { processTask, processTaskInContainer } from "./runTask.js"
9+
import { finishActiveRunAndDispatch } from "./queue.js"
910

1011
export const runEvals = async (runId: number) => {
1112
const run = await findRun(runId)
@@ -67,6 +68,7 @@ export const runEvals = async (runId: number) => {
6768
} finally {
6869
logger.info("cleaning up")
6970
stopHeartbeat(run.id, heartbeat)
71+
await finishActiveRunAndDispatch()
7072
logger.close()
7173
}
7274
}

0 commit comments

Comments
 (0)