diff --git a/apps/web-evals/src/actions/runs.ts b/apps/web-evals/src/actions/runs.ts index 2eae1f6804..fa61ca8478 100644 --- a/apps/web-evals/src/actions/runs.ts +++ b/apps/web-evals/src/actions/runs.ts @@ -1,9 +1,7 @@ "use server" import * as path from "path" -import fs from "fs" import { fileURLToPath } from "url" -import { spawn } from "child_process" import { revalidatePath } from "next/cache" import pMap from "p-map" @@ -15,18 +13,25 @@ import { deleteRun as _deleteRun, createTask, getExercisesForLanguage, + updateRun as _updateRun, } from "@roo-code/evals" import { CreateRun } from "@/lib/schemas" +import { enqueueRun, dequeueRun } from "@/lib/server/queue" +import { startQueueProcessor } from "@/lib/server/queue-processor" const EVALS_REPO_PATH = path.resolve(path.dirname(fileURLToPath(import.meta.url)), "../../../../../evals") +// Start the queue processor when the server starts +startQueueProcessor().catch(console.error) + // eslint-disable-next-line @typescript-eslint/no-unused-vars export async function createRun({ suite, exercises = [], systemPrompt, timeout, ...values }: CreateRun) { const run = await _createRun({ ...values, timeout, socketPath: "", // TODO: Get rid of this. + status: "queued", // Set initial status to queued }) if (suite === "partial") { @@ -49,52 +54,29 @@ export async function createRun({ suite, exercises = [], systemPrompt, timeout, } } - revalidatePath("/runs") - - try { - const isRunningInDocker = fs.existsSync("/.dockerenv") - - const dockerArgs = [ - `--name evals-controller-${run.id}`, - "--rm", - "--network evals_default", - "-v /var/run/docker.sock:/var/run/docker.sock", - "-v /tmp/evals:/var/log/evals", - "-e HOST_EXECUTION_METHOD=docker", - ] - - const cliCommand = `pnpm --filter @roo-code/evals cli --runId ${run.id}` + // Add run to queue and get position + const queuePosition = await enqueueRun(run.id) - const command = isRunningInDocker - ? `docker run ${dockerArgs.join(" ")} evals-runner sh -c "${cliCommand}"` - : cliCommand - - console.log("spawn ->", command) - - const childProcess = spawn("sh", ["-c", command], { - detached: true, - stdio: ["ignore", "pipe", "pipe"], - }) - - const logStream = fs.createWriteStream("/tmp/roo-code-evals.log", { flags: "a" }) - - if (childProcess.stdout) { - childProcess.stdout.pipe(logStream) - } + // Update run with queue position + await _updateRun(run.id, { queuePosition }) - if (childProcess.stderr) { - childProcess.stderr.pipe(logStream) - } - - childProcess.unref() - } catch (error) { - console.error(error) - } + revalidatePath("/runs") - return run + return { ...run, queuePosition } } export async function deleteRun(runId: number) { + // Try to remove from queue if it's queued + await dequeueRun(runId) + await _deleteRun(runId) revalidatePath("/runs") } + +export async function cancelRun(runId: number) { + // Import the cancelQueuedRun function + const { cancelQueuedRun } = await import("@/lib/server/queue-processor") + + await cancelQueuedRun(runId) + revalidatePath("/runs") +} diff --git a/apps/web-evals/src/app/api/queue/status/route.ts b/apps/web-evals/src/app/api/queue/status/route.ts new file mode 100644 index 0000000000..4c3b21c0af --- /dev/null +++ b/apps/web-evals/src/app/api/queue/status/route.ts @@ -0,0 +1,12 @@ +import { NextResponse } from "next/server" +import { getQueueStats } from "@/lib/server/queue" + +export async function GET() { + try { + const stats = await getQueueStats() + return NextResponse.json(stats) + } catch (error) { + console.error("Error fetching queue stats:", error) + return NextResponse.json({ error: "Failed to fetch queue status" }, { status: 500 }) + } +} diff --git a/apps/web-evals/src/components/home/run.tsx b/apps/web-evals/src/components/home/run.tsx index c35673885c..adb725080c 100644 --- a/apps/web-evals/src/components/home/run.tsx +++ b/apps/web-evals/src/components/home/run.tsx @@ -1,10 +1,10 @@ import { useCallback, useState, useRef } from "react" import Link from "next/link" -import { Ellipsis, ClipboardList, Copy, Check, LoaderCircle, Trash } from "lucide-react" +import { Ellipsis, ClipboardList, Copy, Check, LoaderCircle, Trash, Clock, Play, X } from "lucide-react" import type { Run as EvalsRun, TaskMetrics as EvalsTaskMetrics } from "@roo-code/evals" -import { deleteRun } from "@/actions/runs" +import { deleteRun, cancelRun } from "@/actions/runs" import { formatCurrency, formatDuration, formatTokens, formatToolUsageSuccessRate } from "@/lib/formatters" import { useCopyRun } from "@/hooks/use-copy-run" import { @@ -23,16 +23,19 @@ import { AlertDialogFooter, AlertDialogHeader, AlertDialogTitle, + Badge, } from "@/components/ui" type RunProps = { - run: EvalsRun + run: EvalsRun & { status?: string; queuePosition?: number | null } taskMetrics: EvalsTaskMetrics | null } export function Run({ run, taskMetrics }: RunProps) { const [deleteRunId, setDeleteRunId] = useState() + const [cancelRunId, setCancelRunId] = useState() const continueRef = useRef(null) + const cancelRef = useRef(null) const { isPending, copyRun, copied } = useCopyRun(run.id) const onConfirmDelete = useCallback(async () => { @@ -48,16 +51,62 @@ export function Run({ run, taskMetrics }: RunProps) { } }, [deleteRunId]) + const onConfirmCancel = useCallback(async () => { + if (!cancelRunId) { + return + } + + try { + await cancelRun(cancelRunId) + setCancelRunId(undefined) + } catch (error) { + console.error(error) + } + }, [cancelRunId]) + + const getStatusBadge = () => { + if (run.status === "queued") { + return ( + + + Queued {run.queuePosition ? `#${run.queuePosition}` : ""} + + ) + } else if (run.status === "running") { + return ( + + + Running + + ) + } else if (run.status === "cancelled") { + return ( + + + Cancelled + + ) + } + return null + } + return ( <> - {run.model} - {run.passed} - {run.failed} - {run.passed + run.failed > 0 && ( - {((run.passed / (run.passed + run.failed)) * 100).toFixed(1)}% - )} +
+ {run.model} + {getStatusBadge()} +
+
+ {run.status === "completed" || run.status === "failed" ? run.passed : "-"} + {run.status === "completed" || run.status === "failed" ? run.failed : "-"} + + {run.status === "completed" || run.status === "failed" + ? run.passed + run.failed > 0 && ( + {((run.passed / (run.passed + run.failed)) * 100).toFixed(1)}% + ) + : "-"} {taskMetrics && ( @@ -116,6 +165,18 @@ export function Run({ run, taskMetrics }: RunProps) { )} + {run.status === "queued" && ( + { + setCancelRunId(run.id) + setTimeout(() => cancelRef.current?.focus(), 0) + }}> +
+ +
Cancel
+
+
+ )} { setDeleteRunId(run.id) @@ -144,6 +205,22 @@ export function Run({ run, taskMetrics }: RunProps) { + setCancelRunId(undefined)}> + + + Cancel queued run? + + This will remove the run from the queue. The run will not be executed. + + + + Keep in Queue + + Cancel Run + + + + ) } diff --git a/apps/web-evals/src/lib/server/queue-processor.ts b/apps/web-evals/src/lib/server/queue-processor.ts new file mode 100644 index 0000000000..3f30d09c3c --- /dev/null +++ b/apps/web-evals/src/lib/server/queue-processor.ts @@ -0,0 +1,218 @@ +import { spawn } from "child_process" +import fs from "fs" + +import { updateRun, findRun } from "@roo-code/evals" + +import { + getNextRun, + setActiveRun, + clearActiveRun, + acquireQueueLock, + releaseQueueLock, + getActiveRun, + getQueuedRuns, + dequeueRun, +} from "./queue" + +const POLL_INTERVAL = 5000 // 5 seconds + +let isProcessing = false + +/** + * Start processing the queue + */ +export async function startQueueProcessor(): Promise { + if (isProcessing) { + console.log("Queue processor is already running") + return + } + + isProcessing = true + console.log("Starting queue processor...") + + // Process queue in a loop + while (isProcessing) { + try { + await processNextInQueue() + } catch (error) { + console.error("Error processing queue:", error) + } + + // Wait before checking again + await new Promise((resolve) => setTimeout(resolve, POLL_INTERVAL)) + } +} + +/** + * Stop processing the queue + */ +export function stopQueueProcessor(): void { + console.log("Stopping queue processor...") + isProcessing = false +} + +/** + * Process the next run in the queue + */ +async function processNextInQueue(): Promise { + // Try to acquire lock + const hasLock = await acquireQueueLock() + if (!hasLock) { + // Another processor has the lock + return + } + + try { + // Check if there's already an active run + const activeRun = await getActiveRun() + if (activeRun) { + // Check if the run is still actually running + const run = await findRun(activeRun) + if (run.status === "running") { + // Still running, wait + return + } else { + // Run finished but wasn't cleared, clear it + await clearActiveRun() + } + } + + // Get next run from queue + const queuedRun = await getNextRun() + if (!queuedRun) { + // Queue is empty + return + } + + // Update queue positions for remaining runs + await updateQueuePositions() + + // Start the run + await startRun(queuedRun.runId) + } finally { + await releaseQueueLock() + } +} + +/** + * Start executing a run + */ +async function startRun(runId: number): Promise { + console.log(`Starting run ${runId}...`) + + try { + // Mark run as active + await setActiveRun(runId) + + // Update run status to running + await updateRun(runId, { + status: "running", + queuePosition: null, + }) + + // Execute the run (similar to existing createRun logic) + const isRunningInDocker = fs.existsSync("/.dockerenv") + + const dockerArgs = [ + `--name evals-controller-${runId}`, + "--rm", + "--network evals_default", + "-v /var/run/docker.sock:/var/run/docker.sock", + "-v /tmp/evals:/var/log/evals", + "-e HOST_EXECUTION_METHOD=docker", + ] + + const cliCommand = `pnpm --filter @roo-code/evals cli --runId ${runId}` + + const command = isRunningInDocker + ? `docker run ${dockerArgs.join(" ")} evals-runner sh -c "${cliCommand}"` + : cliCommand + + console.log("spawn ->", command) + + const childProcess = spawn("sh", ["-c", command], { + detached: true, + stdio: ["ignore", "pipe", "pipe"], + }) + + const logStream = fs.createWriteStream("/tmp/roo-code-evals.log", { flags: "a" }) + + if (childProcess.stdout) { + childProcess.stdout.pipe(logStream) + } + + if (childProcess.stderr) { + childProcess.stderr.pipe(logStream) + } + + // When process exits, update status and clear active run + childProcess.on("exit", async (code) => { + console.log(`Run ${runId} exited with code ${code}`) + + try { + // Update run status + await updateRun(runId, { + status: code === 0 ? "completed" : "failed", + }) + + // Clear active run + await clearActiveRun() + } catch (error) { + console.error(`Error updating run ${runId} status:`, error) + } + }) + + childProcess.unref() + } catch (error) { + console.error(`Error starting run ${runId}:`, error) + + // Update status to failed and clear active run + await updateRun(runId, { status: "failed" }) + await clearActiveRun() + + throw error + } +} + +/** + * Update queue positions for all queued runs + */ +async function updateQueuePositions(): Promise { + const queuedRuns = await getQueuedRuns() + + // Update each run's queue position + for (let i = 0; i < queuedRuns.length; i++) { + const position = i + 1 + const queuedRun = queuedRuns[i] + if (queuedRun) { + await updateRun(queuedRun.runId, { queuePosition: position }) + } + } +} + +/** + * Cancel a queued run + */ +export async function cancelQueuedRun(runId: number): Promise { + const run = await findRun(runId) + + if (run.status !== "queued") { + throw new Error(`Run ${runId} is not queued (status: ${run.status})`) + } + + // Remove from queue + const removed = await dequeueRun(runId) + + if (removed) { + // Update run status + await updateRun(runId, { + status: "cancelled", + queuePosition: null, + }) + + // Update positions for remaining queued runs + await updateQueuePositions() + } + + return removed +} diff --git a/apps/web-evals/src/lib/server/queue.ts b/apps/web-evals/src/lib/server/queue.ts new file mode 100644 index 0000000000..37f48de121 --- /dev/null +++ b/apps/web-evals/src/lib/server/queue.ts @@ -0,0 +1,168 @@ +import { redisClient } from "./redis" + +const QUEUE_KEY = "evals:run:queue" +const ACTIVE_RUN_KEY = "evals:run:active" +const QUEUE_LOCK_KEY = "evals:queue:lock" +const LOCK_TTL = 60 // seconds + +export interface QueuedRun { + runId: number + addedAt: number +} + +/** + * Add a run to the queue + */ +export async function enqueueRun(runId: number): Promise { + const redis = await redisClient() + const queuedRun: QueuedRun = { + runId, + addedAt: Date.now(), + } + + // Add to queue and return position (1-based) + await redis.rPush(QUEUE_KEY, JSON.stringify(queuedRun)) + const position = await redis.lLen(QUEUE_KEY) + + return position +} + +/** + * Remove a run from the queue (for cancellation) + */ +export async function dequeueRun(runId: number): Promise { + const redis = await redisClient() + + // Get all items in queue + const items = await redis.lRange(QUEUE_KEY, 0, -1) + + // Find and remove the run + for (const item of items) { + const queuedRun: QueuedRun = JSON.parse(item) + if (queuedRun.runId === runId) { + await redis.lRem(QUEUE_KEY, 1, item) + return true + } + } + + return false +} + +/** + * Get the next run from the queue + */ +export async function getNextRun(): Promise { + const redis = await redisClient() + + // Pop from the front of the queue + const item = await redis.lPop(QUEUE_KEY) + if (!item) { + return null + } + + return JSON.parse(item) as QueuedRun +} + +/** + * Get current queue position for a run + */ +export async function getQueuePosition(runId: number): Promise { + const redis = await redisClient() + + // Get all items in queue + const items = await redis.lRange(QUEUE_KEY, 0, -1) + + // Find position (1-based) + for (let i = 0; i < items.length; i++) { + const item = items[i] + if (item) { + const queuedRun: QueuedRun = JSON.parse(item) + if (queuedRun.runId === runId) { + return i + 1 + } + } + } + + return null +} + +/** + * Get all queued runs + */ +export async function getQueuedRuns(): Promise { + const redis = await redisClient() + + const items = await redis.lRange(QUEUE_KEY, 0, -1) + return items.map((item) => JSON.parse(item) as QueuedRun) +} + +/** + * Set the active run + */ +export async function setActiveRun(runId: number): Promise { + const redis = await redisClient() + await redis.set(ACTIVE_RUN_KEY, runId.toString()) +} + +/** + * Get the active run + */ +export async function getActiveRun(): Promise { + const redis = await redisClient() + const runId = await redis.get(ACTIVE_RUN_KEY) + return runId ? parseInt(runId, 10) : null +} + +/** + * Clear the active run + */ +export async function clearActiveRun(): Promise { + const redis = await redisClient() + await redis.del(ACTIVE_RUN_KEY) +} + +/** + * Try to acquire a lock for queue processing + */ +export async function acquireQueueLock(): Promise { + const redis = await redisClient() + + // Try to set lock with NX (only if not exists) and EX (expiry) + const result = await redis.set(QUEUE_LOCK_KEY, "1", { + NX: true, + EX: LOCK_TTL, + }) + + return result === "OK" +} + +/** + * Release the queue processing lock + */ +export async function releaseQueueLock(): Promise { + const redis = await redisClient() + await redis.del(QUEUE_LOCK_KEY) +} + +/** + * Get queue statistics + */ +export async function getQueueStats(): Promise<{ + queueLength: number + activeRun: number | null + queuedRuns: QueuedRun[] +}> { + const redis = await redisClient() + + const [queueLength, activeRun, queuedRuns] = await Promise.all([ + redis.lLen(QUEUE_KEY), + getActiveRun(), + getQueuedRuns(), + ]) + + return { + queueLength, + activeRun, + queuedRuns, + } +} diff --git a/packages/evals/src/db/migrations/0003_add_queue_fields_to_runs.sql b/packages/evals/src/db/migrations/0003_add_queue_fields_to_runs.sql new file mode 100644 index 0000000000..eadc61a80b --- /dev/null +++ b/packages/evals/src/db/migrations/0003_add_queue_fields_to_runs.sql @@ -0,0 +1,13 @@ +-- Add status and queuePosition fields to runs table +ALTER TABLE runs +ADD COLUMN status TEXT DEFAULT 'queued' NOT NULL, +ADD COLUMN queue_position INTEGER; + +-- Update existing runs to have 'completed' status +UPDATE runs +SET status = 'completed' +WHERE passed > 0 OR failed > 0; + +-- Add index for status field for better query performance +CREATE INDEX idx_runs_status ON runs(status); +CREATE INDEX idx_runs_queue_position ON runs(queue_position); \ No newline at end of file diff --git a/packages/evals/src/db/schema.ts b/packages/evals/src/db/schema.ts index 66588c792c..695121331a 100644 --- a/packages/evals/src/db/schema.ts +++ b/packages/evals/src/db/schema.ts @@ -27,6 +27,8 @@ export const runs = pgTable("runs", { timeout: integer().default(5).notNull(), passed: integer().default(0).notNull(), failed: integer().default(0).notNull(), + status: text().default("queued").notNull(), // 'queued', 'running', 'completed', 'failed' + queuePosition: integer("queue_position"), createdAt: timestamp("created_at").notNull(), })