Skip to content

Commit 6cf046d

Browse files
committed
fix: improve stability by properly limiting worker instances
With the remote environment refactorings, we no longer did guard the serve testing logic via the `workerConcurrencyQueue`. This commit makes sure this is still the case + properly passes around abort signals.
1 parent 9098b1f commit 6cf046d

File tree

6 files changed

+93
-51
lines changed

6 files changed

+93
-51
lines changed

runner/orchestration/build-repair.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ export async function repairAndBuild(
7171
rootPromptDef,
7272
directory,
7373
workerConcurrencyQueue,
74+
abortSignal,
7475
attempts,
7576
progress
7677
);
@@ -89,6 +90,7 @@ async function handleRepairResponse(
8990
rootPromptDef: RootPromptDefinition,
9091
directory: string,
9192
workerConcurrencyQueue: PQueue,
93+
abortSignal: AbortSignal,
9294
attempts: number,
9395
progress: ProgressLogger
9496
) {
@@ -107,9 +109,15 @@ async function handleRepairResponse(
107109
mergeRepairFiles(repairResponse.outputFiles, finalOutputFiles);
108110
writeResponseFiles(directory, finalOutputFiles, env, rootPromptDef.name);
109111

110-
const buildResult = await workerConcurrencyQueue.add(
111-
() => runBuild(evalID, gateway, directory, env, rootPromptDef, progress),
112-
{ throwOnTimeout: true }
112+
const buildResult = await runBuild(
113+
evalID,
114+
gateway,
115+
directory,
116+
env,
117+
rootPromptDef,
118+
abortSignal,
119+
workerConcurrencyQueue,
120+
progress
113121
);
114122

115123
return {

runner/orchestration/build-serve-loop.ts

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,7 @@ import { Environment } from '../configuration/environment.js';
55
import {
66
AttemptDetails,
77
LlmContextFile,
8-
LlmResponseFile,
98
RootPromptDefinition,
10-
Usage,
119
} from '../shared-interfaces.js';
1210
import { DEFAULT_MAX_REPAIR_ATTEMPTS } from '../configuration/constants.js';
1311
import { ProgressLogger } from '../progress/progress-logger.js';
@@ -59,9 +57,15 @@ export async function attemptBuild(
5957
const finalOutputFiles = initialResponse.files.map((file) => ({
6058
...file,
6159
}));
62-
const initialBuildResult = await workerConcurrencyQueue.add(
63-
() => runBuild(evalID, gateway, directory, env, rootPromptDef, progress),
64-
{ throwOnTimeout: true }
60+
const initialBuildResult = await runBuild(
61+
evalID,
62+
gateway,
63+
directory,
64+
env,
65+
rootPromptDef,
66+
abortSignal,
67+
workerConcurrencyQueue,
68+
progress
6569
);
6670
let repairAttempts = 0;
6771
const maxRepairAttempts = gateway.shouldRetryFailedBuilds(evalID)
@@ -122,6 +126,8 @@ export async function attemptBuild(
122126
directory,
123127
env,
124128
rootPromptDef,
129+
workerConcurrencyQueue,
130+
abortSignal,
125131
progress,
126132
skipScreenshots,
127133
skipAxeTesting,
@@ -176,6 +182,8 @@ export async function attemptBuild(
176182
directory,
177183
env,
178184
rootPromptDef,
185+
workerConcurrencyQueue,
186+
abortSignal,
179187
progress,
180188
skipScreenshots,
181189
skipAxeTesting,

runner/orchestration/build-worker.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { Environment } from '../configuration/environment.js';
66
import { ProgressLogger } from '../progress/progress-logger.js';
77
import { RootPromptDefinition } from '../shared-interfaces.js';
88
import { EvalID, Gateway } from './gateway.js';
9+
import PQueue from 'p-queue';
910

1011
/** Attempts to build the code. */
1112
export async function runBuild(
@@ -14,6 +15,8 @@ export async function runBuild(
1415
appDirectoryPath: string,
1516
env: Environment,
1617
rootPromptDef: RootPromptDefinition,
18+
abortSignal: AbortSignal,
19+
workerConcurrencyQueue: PQueue,
1720
progress: ProgressLogger
1821
): Promise<BuildResult> {
1922
progress.log(rootPromptDef, 'build', `Building the app`);
@@ -24,6 +27,8 @@ export async function runBuild(
2427
env,
2528
appDirectoryPath,
2629
rootPromptDef,
30+
workerConcurrencyQueue,
31+
abortSignal,
2732
progress
2833
);
2934
if (result.status === BuildResultStatus.SUCCESS) {

runner/orchestration/gateway.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import PQueue from 'p-queue';
12
import { LlmGenerateFilesContext } from '../codegen/llm-runner.js';
23
import { Environment } from '../configuration/environment.js';
34
import { ProgressLogger } from '../progress/progress-logger.js';
@@ -41,6 +42,8 @@ export interface Gateway<Env extends Environment> {
4142
env: Env,
4243
appDirectoryPath: string,
4344
rootPromptDef: RootPromptDefinition,
45+
workerConcurrencyQueue: PQueue,
46+
abortSignal: AbortSignal,
4447
progress: ProgressLogger
4548
): Promise<BuildResult>;
4649

runner/orchestration/gateways/local_gateway.ts

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import { killChildProcessGracefully } from '../../utils/kill-gracefully.js';
2121
import { ProgressLogger } from '../../progress/progress-logger.js';
2222
import { serveApp } from '../../workers/serve-testing/serve-app.js';
2323
import { LocalEnvironment } from '../../configuration/environment-local.js';
24+
import PQueue from 'p-queue';
2425

2526
let uniqueIDs = 0;
2627

@@ -70,6 +71,8 @@ export class LocalGateway implements Gateway<LocalEnvironment> {
7071
env: LocalEnvironment,
7172
appDirectoryPath: string,
7273
rootPromptDef: RootPromptDefinition,
74+
workerConcurrencyQueue: PQueue,
75+
abortSignal: AbortSignal,
7376
progress: ProgressLogger
7477
): Promise<BuildResult> {
7578
const buildParams: BuildWorkerMessage = {
@@ -78,21 +81,29 @@ export class LocalGateway implements Gateway<LocalEnvironment> {
7881
buildCommand: env.buildCommand,
7982
};
8083

81-
return new Promise<BuildResult>((resolve, reject) => {
82-
const child: ChildProcess = fork(
83-
path.resolve(import.meta.dirname, '../../workers/builder/worker.js')
84-
);
85-
child.send(buildParams);
84+
return workerConcurrencyQueue.add(
85+
() =>
86+
new Promise<BuildResult>((resolve, reject) => {
87+
const child: ChildProcess = fork(
88+
path.resolve(
89+
import.meta.dirname,
90+
'../../workers/builder/worker.js'
91+
),
92+
{ signal: abortSignal }
93+
);
94+
child.send(buildParams);
8695

87-
child.on('message', async (result: BuildWorkerResponseMessage) => {
88-
await killChildProcessGracefully(child);
89-
resolve(result.payload);
90-
});
91-
child.on('error', async (err) => {
92-
await killChildProcessGracefully(child);
93-
reject(err);
94-
});
95-
});
96+
child.on('message', async (result: BuildWorkerResponseMessage) => {
97+
await killChildProcessGracefully(child);
98+
resolve(result.payload);
99+
});
100+
child.on('error', async (err) => {
101+
await killChildProcessGracefully(child);
102+
reject(err);
103+
});
104+
}),
105+
{ throwOnTimeout: true }
106+
);
96107
}
97108

98109
async serveBuild<T>(

runner/orchestration/serve-testing-worker.ts

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
} from '../workers/serve-testing/worker-types.js';
1212
import { EvalID, Gateway } from './gateway.js';
1313
import { BrowserAgentTaskInput } from '../testing/browser-agent/models.js';
14+
import PQueue from 'p-queue';
1415

1516
/** Attempts to run & test an eval app. */
1617
export async function serveAndTestApp(
@@ -19,6 +20,8 @@ export async function serveAndTestApp(
1920
appDirectoryPath: string,
2021
env: Environment,
2122
rootPromptDef: RootPromptDefinition,
23+
workerConcurrencyQueue: PQueue,
24+
abortSignal: AbortSignal,
2225
progress: ProgressLogger,
2326
skipScreenshots: boolean,
2427
skipAxeTesting: boolean,
@@ -43,37 +46,41 @@ export async function serveAndTestApp(
4346
userJourneyAgentTaskInput,
4447
};
4548

46-
return await new Promise<ServeTestingResult>((resolve, reject) => {
47-
const child: ChildProcess = fork(
48-
path.resolve(
49-
import.meta.dirname,
50-
'../workers/serve-testing/worker.js'
51-
)
52-
);
53-
child.send(serveParams);
49+
return await workerConcurrencyQueue.add(
50+
() =>
51+
new Promise<ServeTestingResult>((resolve, reject) => {
52+
const child: ChildProcess = fork(
53+
path.resolve(
54+
import.meta.dirname,
55+
'../workers/serve-testing/worker.js'
56+
),
57+
{ signal: abortSignal }
58+
);
59+
child.send(serveParams);
5460

55-
child.on(
56-
'message',
57-
async (result: ServeTestingWorkerResponseMessage) => {
58-
if (result.type === 'result') {
61+
child.on(
62+
'message',
63+
async (result: ServeTestingWorkerResponseMessage) => {
64+
if (result.type === 'result') {
65+
await killChildProcessGracefully(child);
66+
resolve(result.payload);
67+
} else {
68+
progress.log(
69+
rootPromptDef,
70+
result.payload.state,
71+
result.payload.message,
72+
result.payload.details
73+
);
74+
}
75+
}
76+
);
77+
child.on('error', async (err) => {
5978
await killChildProcessGracefully(child);
60-
resolve(result.payload);
61-
} else {
62-
progress.log(
63-
rootPromptDef,
64-
result.payload.state,
65-
result.payload.message,
66-
result.payload.details
67-
);
68-
}
69-
}
70-
);
71-
child.on('error', async (err) => {
72-
console.error('Caught error');
73-
await killChildProcessGracefully(child);
74-
reject(err);
75-
});
76-
});
79+
reject(err);
80+
});
81+
}),
82+
{ throwOnTimeout: true }
83+
);
7784
}
7885
);
7986

0 commit comments

Comments
 (0)