diff --git a/runner/orchestration/build-repair.ts b/runner/orchestration/build-repair.ts index acfe55f..f5f7eeb 100644 --- a/runner/orchestration/build-repair.ts +++ b/runner/orchestration/build-repair.ts @@ -71,6 +71,7 @@ export async function repairAndBuild( rootPromptDef, directory, workerConcurrencyQueue, + abortSignal, attempts, progress ); @@ -89,6 +90,7 @@ async function handleRepairResponse( rootPromptDef: RootPromptDefinition, directory: string, workerConcurrencyQueue: PQueue, + abortSignal: AbortSignal, attempts: number, progress: ProgressLogger ) { @@ -107,9 +109,15 @@ async function handleRepairResponse( mergeRepairFiles(repairResponse.outputFiles, finalOutputFiles); writeResponseFiles(directory, finalOutputFiles, env, rootPromptDef.name); - const buildResult = await workerConcurrencyQueue.add( - () => runBuild(evalID, gateway, directory, env, rootPromptDef, progress), - { throwOnTimeout: true } + const buildResult = await runBuild( + evalID, + gateway, + directory, + env, + rootPromptDef, + abortSignal, + workerConcurrencyQueue, + progress ); return { diff --git a/runner/orchestration/build-serve-loop.ts b/runner/orchestration/build-serve-loop.ts index 0e9bebf..5df5e2e 100644 --- a/runner/orchestration/build-serve-loop.ts +++ b/runner/orchestration/build-serve-loop.ts @@ -5,9 +5,7 @@ import { Environment } from '../configuration/environment.js'; import { AttemptDetails, LlmContextFile, - LlmResponseFile, RootPromptDefinition, - Usage, } from '../shared-interfaces.js'; import { DEFAULT_MAX_REPAIR_ATTEMPTS } from '../configuration/constants.js'; import { ProgressLogger } from '../progress/progress-logger.js'; @@ -59,9 +57,15 @@ export async function attemptBuild( const finalOutputFiles = initialResponse.files.map((file) => ({ ...file, })); - const initialBuildResult = await workerConcurrencyQueue.add( - () => runBuild(evalID, gateway, directory, env, rootPromptDef, progress), - { throwOnTimeout: true } + const initialBuildResult = await runBuild( + evalID, + gateway, + directory, + env, + rootPromptDef, + abortSignal, + workerConcurrencyQueue, + progress ); let repairAttempts = 0; const maxRepairAttempts = gateway.shouldRetryFailedBuilds(evalID) @@ -122,6 +126,8 @@ export async function attemptBuild( directory, env, rootPromptDef, + workerConcurrencyQueue, + abortSignal, progress, skipScreenshots, skipAxeTesting, @@ -176,6 +182,8 @@ export async function attemptBuild( directory, env, rootPromptDef, + workerConcurrencyQueue, + abortSignal, progress, skipScreenshots, skipAxeTesting, diff --git a/runner/orchestration/build-worker.ts b/runner/orchestration/build-worker.ts index 006659c..927f7ba 100644 --- a/runner/orchestration/build-worker.ts +++ b/runner/orchestration/build-worker.ts @@ -6,6 +6,7 @@ import { Environment } from '../configuration/environment.js'; import { ProgressLogger } from '../progress/progress-logger.js'; import { RootPromptDefinition } from '../shared-interfaces.js'; import { EvalID, Gateway } from './gateway.js'; +import PQueue from 'p-queue'; /** Attempts to build the code. */ export async function runBuild( @@ -14,6 +15,8 @@ export async function runBuild( appDirectoryPath: string, env: Environment, rootPromptDef: RootPromptDefinition, + abortSignal: AbortSignal, + workerConcurrencyQueue: PQueue, progress: ProgressLogger ): Promise { progress.log(rootPromptDef, 'build', `Building the app`); @@ -24,6 +27,8 @@ export async function runBuild( env, appDirectoryPath, rootPromptDef, + workerConcurrencyQueue, + abortSignal, progress ); if (result.status === BuildResultStatus.SUCCESS) { diff --git a/runner/orchestration/gateway.ts b/runner/orchestration/gateway.ts index 6519310..da631c0 100644 --- a/runner/orchestration/gateway.ts +++ b/runner/orchestration/gateway.ts @@ -1,3 +1,4 @@ +import PQueue from 'p-queue'; import { LlmGenerateFilesContext } from '../codegen/llm-runner.js'; import { Environment } from '../configuration/environment.js'; import { ProgressLogger } from '../progress/progress-logger.js'; @@ -41,6 +42,8 @@ export interface Gateway { env: Env, appDirectoryPath: string, rootPromptDef: RootPromptDefinition, + workerConcurrencyQueue: PQueue, + abortSignal: AbortSignal, progress: ProgressLogger ): Promise; diff --git a/runner/orchestration/gateways/local_gateway.ts b/runner/orchestration/gateways/local_gateway.ts index 1cb69fd..b39a0e3 100644 --- a/runner/orchestration/gateways/local_gateway.ts +++ b/runner/orchestration/gateways/local_gateway.ts @@ -21,6 +21,7 @@ import { killChildProcessGracefully } from '../../utils/kill-gracefully.js'; import { ProgressLogger } from '../../progress/progress-logger.js'; import { serveApp } from '../../workers/serve-testing/serve-app.js'; import { LocalEnvironment } from '../../configuration/environment-local.js'; +import PQueue from 'p-queue'; let uniqueIDs = 0; @@ -70,6 +71,8 @@ export class LocalGateway implements Gateway { env: LocalEnvironment, appDirectoryPath: string, rootPromptDef: RootPromptDefinition, + workerConcurrencyQueue: PQueue, + abortSignal: AbortSignal, progress: ProgressLogger ): Promise { const buildParams: BuildWorkerMessage = { @@ -78,21 +81,29 @@ export class LocalGateway implements Gateway { buildCommand: env.buildCommand, }; - return new Promise((resolve, reject) => { - const child: ChildProcess = fork( - path.resolve(import.meta.dirname, '../../workers/builder/worker.js') - ); - child.send(buildParams); + return workerConcurrencyQueue.add( + () => + new Promise((resolve, reject) => { + const child: ChildProcess = fork( + path.resolve( + import.meta.dirname, + '../../workers/builder/worker.js' + ), + { signal: abortSignal } + ); + child.send(buildParams); - child.on('message', async (result: BuildWorkerResponseMessage) => { - await killChildProcessGracefully(child); - resolve(result.payload); - }); - child.on('error', async (err) => { - await killChildProcessGracefully(child); - reject(err); - }); - }); + child.on('message', async (result: BuildWorkerResponseMessage) => { + await killChildProcessGracefully(child); + resolve(result.payload); + }); + child.on('error', async (err) => { + await killChildProcessGracefully(child); + reject(err); + }); + }), + { throwOnTimeout: true } + ); } async serveBuild( diff --git a/runner/orchestration/serve-testing-worker.ts b/runner/orchestration/serve-testing-worker.ts index 7a19f0f..ca5b369 100644 --- a/runner/orchestration/serve-testing-worker.ts +++ b/runner/orchestration/serve-testing-worker.ts @@ -11,6 +11,7 @@ import { } from '../workers/serve-testing/worker-types.js'; import { EvalID, Gateway } from './gateway.js'; import { BrowserAgentTaskInput } from '../testing/browser-agent/models.js'; +import PQueue from 'p-queue'; /** Attempts to run & test an eval app. */ export async function serveAndTestApp( @@ -19,6 +20,8 @@ export async function serveAndTestApp( appDirectoryPath: string, env: Environment, rootPromptDef: RootPromptDefinition, + workerConcurrencyQueue: PQueue, + abortSignal: AbortSignal, progress: ProgressLogger, skipScreenshots: boolean, skipAxeTesting: boolean, @@ -43,37 +46,41 @@ export async function serveAndTestApp( userJourneyAgentTaskInput, }; - return await new Promise((resolve, reject) => { - const child: ChildProcess = fork( - path.resolve( - import.meta.dirname, - '../workers/serve-testing/worker.js' - ) - ); - child.send(serveParams); + return await workerConcurrencyQueue.add( + () => + new Promise((resolve, reject) => { + const child: ChildProcess = fork( + path.resolve( + import.meta.dirname, + '../workers/serve-testing/worker.js' + ), + { signal: abortSignal } + ); + child.send(serveParams); - child.on( - 'message', - async (result: ServeTestingWorkerResponseMessage) => { - if (result.type === 'result') { + child.on( + 'message', + async (result: ServeTestingWorkerResponseMessage) => { + if (result.type === 'result') { + await killChildProcessGracefully(child); + resolve(result.payload); + } else { + progress.log( + rootPromptDef, + result.payload.state, + result.payload.message, + result.payload.details + ); + } + } + ); + child.on('error', async (err) => { await killChildProcessGracefully(child); - resolve(result.payload); - } else { - progress.log( - rootPromptDef, - result.payload.state, - result.payload.message, - result.payload.details - ); - } - } - ); - child.on('error', async (err) => { - console.error('Caught error'); - await killChildProcessGracefully(child); - reject(err); - }); - }); + reject(err); + }); + }), + { throwOnTimeout: true } + ); } );