Skip to content

Commit b58ce4e

Browse files
committed
feat: implement global FIFO queue for Evals runs
- Add Redis-based queue management with run-queue, active-run, and dispatcher lock - Modify createRun to enqueue runs instead of immediate spawning - Implement auto-advance mechanism when runs complete - Add UI status column showing Running/Queued/Completed states - Add queue position display for queued runs - Add cancel button for queued runs - Preserve per-run task concurrency via PQueue Addresses issue #7966
1 parent 9ea7173 commit b58ce4e

File tree

5 files changed

+484
-84
lines changed

5 files changed

+484
-84
lines changed

apps/web-evals/src/actions/runs.ts

Lines changed: 159 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,118 @@ import {
1515
deleteRun as _deleteRun,
1616
createTask,
1717
getExercisesForLanguage,
18+
findRun,
1819
} from "@roo-code/evals"
1920

2021
import { CreateRun } from "@/lib/schemas"
22+
import { redisClient } from "@/lib/server/redis"
2123

2224
const EVALS_REPO_PATH = path.resolve(path.dirname(fileURLToPath(import.meta.url)), "../../../../../evals")
2325

26+
// Queue management keys (matching the ones in packages/evals/src/cli/redis.ts)
27+
const getRunQueueKey = () => `evals:run-queue`
28+
const getActiveRunKey = () => `evals:active-run`
29+
const getDispatcherLockKey = () => `evals:dispatcher:lock`
30+
31+
async function spawnController(runId: number) {
32+
const isRunningInDocker = fs.existsSync("/.dockerenv")
33+
34+
const dockerArgs = [
35+
`--name evals-controller-${runId}`,
36+
"--rm",
37+
"--network evals_default",
38+
"-v /var/run/docker.sock:/var/run/docker.sock",
39+
"-v /tmp/evals:/var/log/evals",
40+
"-e HOST_EXECUTION_METHOD=docker",
41+
]
42+
43+
const cliCommand = `pnpm --filter @roo-code/evals cli --runId ${runId}`
44+
45+
const command = isRunningInDocker
46+
? `docker run ${dockerArgs.join(" ")} evals-runner sh -c "${cliCommand}"`
47+
: cliCommand
48+
49+
console.log("spawn ->", command)
50+
51+
const childProcess = spawn("sh", ["-c", command], {
52+
detached: true,
53+
stdio: ["ignore", "pipe", "pipe"],
54+
})
55+
56+
const logStream = fs.createWriteStream("/tmp/roo-code-evals.log", { flags: "a" })
57+
58+
if (childProcess.stdout) {
59+
childProcess.stdout.pipe(logStream)
60+
}
61+
62+
if (childProcess.stderr) {
63+
childProcess.stderr.pipe(logStream)
64+
}
65+
66+
childProcess.unref()
67+
}
68+
69+
export async function dispatchNextRun() {
70+
const redis = await redisClient()
71+
72+
// Try to acquire dispatcher lock (10 second TTL)
73+
const lockAcquired = await redis.set(getDispatcherLockKey(), Date.now().toString(), {
74+
NX: true,
75+
EX: 10,
76+
})
77+
78+
if (lockAcquired !== "OK") {
79+
console.log("Dispatcher lock already held, skipping dispatch")
80+
return
81+
}
82+
83+
try {
84+
// Check if there's already an active run
85+
const activeRunId = await redis.get(getActiveRunKey())
86+
if (activeRunId) {
87+
console.log(`Run ${activeRunId} is already active, skipping dispatch`)
88+
return
89+
}
90+
91+
// Pop the next run from the queue
92+
const nextRunId = await redis.lPop(getRunQueueKey())
93+
if (!nextRunId) {
94+
console.log("No runs in queue")
95+
return
96+
}
97+
98+
const runId = parseInt(nextRunId, 10)
99+
console.log(`Dispatching run ${runId}`)
100+
101+
// Set as active run with generous TTL (1 hour default, will be cleared when run completes)
102+
const setActive = await redis.set(getActiveRunKey(), runId.toString(), {
103+
NX: true,
104+
EX: 3600,
105+
})
106+
107+
if (setActive !== "OK") {
108+
// Another process may have set an active run, put this run back in the queue
109+
console.log("Failed to set active run, requeueing")
110+
await redis.lPush(getRunQueueKey(), runId.toString())
111+
return
112+
}
113+
114+
// Spawn the controller for this run
115+
try {
116+
await spawnController(runId)
117+
console.log(`Successfully spawned controller for run ${runId}`)
118+
} catch (error) {
119+
console.error(`Failed to spawn controller for run ${runId}:`, error)
120+
// Clear active run and requeue on spawn failure
121+
await redis.del(getActiveRunKey())
122+
await redis.lPush(getRunQueueKey(), runId.toString())
123+
}
124+
} finally {
125+
// Release dispatcher lock
126+
await redis.del(getDispatcherLockKey())
127+
}
128+
}
129+
24130
// eslint-disable-next-line @typescript-eslint/no-unused-vars
25131
export async function createRun({ suite, exercises = [], systemPrompt, timeout, ...values }: CreateRun) {
26132
const run = await _createRun({
@@ -51,50 +157,70 @@ export async function createRun({ suite, exercises = [], systemPrompt, timeout,
51157

52158
revalidatePath("/runs")
53159

160+
// Add run to queue
161+
const redis = await redisClient()
162+
await redis.rPush(getRunQueueKey(), run.id.toString())
163+
console.log(`Run ${run.id} added to queue`)
164+
165+
// Try to dispatch if no active run
54166
try {
55-
const isRunningInDocker = fs.existsSync("/.dockerenv")
167+
await dispatchNextRun()
168+
} catch (error) {
169+
console.error("Error dispatching run:", error)
170+
}
56171

57-
const dockerArgs = [
58-
`--name evals-controller-${run.id}`,
59-
"--rm",
60-
"--network evals_default",
61-
"-v /var/run/docker.sock:/var/run/docker.sock",
62-
"-v /tmp/evals:/var/log/evals",
63-
"-e HOST_EXECUTION_METHOD=docker",
64-
]
172+
return run
173+
}
65174

66-
const cliCommand = `pnpm --filter @roo-code/evals cli --runId ${run.id}`
175+
export async function deleteRun(runId: number) {
176+
await _deleteRun(runId)
177+
revalidatePath("/runs")
178+
}
67179

68-
const command = isRunningInDocker
69-
? `docker run ${dockerArgs.join(" ")} evals-runner sh -c "${cliCommand}"`
70-
: cliCommand
180+
export async function cancelQueuedRun(runId: number) {
181+
const redis = await redisClient()
71182

72-
console.log("spawn ->", command)
183+
// Remove from queue
184+
const removed = await redis.lRem(getRunQueueKey(), 1, runId.toString())
73185

74-
const childProcess = spawn("sh", ["-c", command], {
75-
detached: true,
76-
stdio: ["ignore", "pipe", "pipe"],
77-
})
186+
if (removed > 0) {
187+
console.log(`Removed run ${runId} from queue`)
188+
// Delete the run from database
189+
await deleteRun(runId)
190+
return true
191+
}
78192

79-
const logStream = fs.createWriteStream("/tmp/roo-code-evals.log", { flags: "a" })
193+
return false
194+
}
80195

81-
if (childProcess.stdout) {
82-
childProcess.stdout.pipe(logStream)
83-
}
196+
export async function getRunQueueStatus(runId: number) {
197+
const redis = await redisClient()
84198

85-
if (childProcess.stderr) {
86-
childProcess.stderr.pipe(logStream)
87-
}
199+
// Check if run is active
200+
const activeRunId = await redis.get(getActiveRunKey())
201+
if (activeRunId === runId.toString()) {
202+
return { status: "running" as const, position: null }
203+
}
88204

89-
childProcess.unref()
90-
} catch (error) {
91-
console.error(error)
205+
// Check position in queue
206+
const queue = await redis.lRange(getRunQueueKey(), 0, -1)
207+
const position = queue.indexOf(runId.toString())
208+
209+
if (position !== -1) {
210+
return { status: "queued" as const, position: position + 1 }
92211
}
93212

94-
return run
95-
}
213+
// Check if run has a heartbeat (running but not marked as active - edge case)
214+
const heartbeat = await redis.get(`heartbeat:${runId}`)
215+
if (heartbeat) {
216+
return { status: "running" as const, position: null }
217+
}
96218

97-
export async function deleteRun(runId: number) {
98-
await _deleteRun(runId)
99-
revalidatePath("/runs")
219+
// Run is completed or not found
220+
const run = await findRun(runId)
221+
if (run?.taskMetricsId) {
222+
return { status: "completed" as const, position: null }
223+
}
224+
225+
return { status: "unknown" as const, position: null }
100226
}

0 commit comments

Comments
 (0)