Skip to content

Commit 1d0c889

Browse files
leosvelperezFrozenPandaz
authored andcommitted
fix(core): ensure postTasksExecution fires on SIGINT for continuous tasks (#34876)
## Current Behavior When pressing Ctrl+C on continuous tasks, `postTasksExecution` never fires because `process.exit()` in `running-tasks.ts` SIGINT handlers kills the process before the orchestrator can run async cleanup and lifecycle hooks. ## Expected Behavior `postTasksExecution` fires reliably on Ctrl+C with complete task results, without causing stale `RunningTasksService` DB entries that block subsequent `nx` invocations with "Waiting for ... in another nx process" messages. ## Technical Details Re-applies #34623 (reverted in #34869) with fixes for the issues that caused CI failures. **Root cause of the revert**: Removing `process.exit()` from SIGINT handlers left the nx process alive during async cleanup. The `running_tasks` DB entry persisted with a still-alive PID, so new nx processes saw it as a running task and hung. **Fix 1 — Early DB cleanup**: Synchronously remove all owned DB entries in the SIGINT handler *before* starting async cleanup. This replicates the cleanup that `process.exit()` + Rust `Drop` previously provided — from any external observer's perspective, the tasks are gone immediately. **Fix 2 — Always register onExit in startContinuousTask**: The original PR added a guard that skipped `onExit` registration for initiating tasks, assuming `executeNextBatchOfTasksUsingTaskSchedule` would handle it. But `runContinuousTasks()` (used by DTE agents and Playwright) calls `startContinuousTask` directly without going through `run()`. The missing handler meant task exit was never processed — no DB cleanup, no lifecycle hooks. This caused verdaccio (local-registry) to become unreachable on DTE agents. The fix always registers `onExit` in `startContinuousTask` and simplifies the `executeNextBatchOfTasksUsingTaskSchedule` handler to only unblock the thread. Changes: - Remove `process.exit()` from `running-tasks.ts` SIGINT handlers (lets orchestrator run) - Replace `cleanupDone` boolean with `cleanupPromise` (fixes SIGINT/SIGTERM race) - Set `stopRequested = true` for SIGTERM/SIGHUP (correct task classification) - Early synchronous DB cleanup in non-TUI SIGINT handler - Always register `onExit` handler in `startContinuousTask` for both initiating and non-initiating tasks - Simplify initiating task handler in `executeNextBatchOfTasksUsingTaskSchedule` to only call `res()`
1 parent 31cc85e commit 1d0c889

File tree

2 files changed

+79
-51
lines changed

2 files changed

+79
-51
lines changed

packages/nx/src/executors/run-commands/running-tasks.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -525,8 +525,6 @@ class RunningNodeProcess implements RunningTask {
525525
});
526526
process.on('SIGINT', () => {
527527
this.kill('SIGTERM');
528-
// we exit here because we don't need to write anything to cache.
529-
process.exit(signalToCode('SIGINT'));
530528
});
531529
process.on('SIGTERM', () => {
532530
this.kill('SIGTERM');
@@ -710,8 +708,6 @@ function registerProcessListener(
710708
});
711709
process.on('SIGINT', () => {
712710
runningTask.kill('SIGTERM');
713-
// we exit here because we don't need to write anything to cache.
714-
process.exit(signalToCode('SIGINT'));
715711
});
716712
process.on('SIGTERM', () => {
717713
runningTask.kill('SIGTERM');

packages/nx/src/tasks-runner/task-orchestrator.ts

Lines changed: 79 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ export class TaskOrchestrator {
110110
{ runningTask: RunningTask; stopping: boolean }
111111
>();
112112
private discreteTaskExitHandled = new Map<string, Promise<void>>();
113-
private cleanupDone = false;
113+
private continuousTaskExitHandled = new Map<string, Promise<void>>();
114+
private cleanupPromise: Promise<void> | null = null;
114115

115116
// endregion internal state
116117

@@ -232,16 +233,7 @@ export class TaskOrchestrator {
232233
const runningTask = await this.startContinuousTask(task, groupId);
233234

234235
if (this.initializingTaskIds.has(task.id)) {
235-
await new Promise<void>((res) => {
236-
runningTask.onExit((code) => {
237-
if (!this.tuiEnabled) {
238-
if (code > 128) {
239-
process.exit(code);
240-
}
241-
}
242-
res();
243-
});
244-
});
236+
await this.continuousTaskExitHandled.get(task.id);
245237
}
246238
} else {
247239
await this.applyFromCacheOrRunTask(doNotSkipCache, task, groupId);
@@ -944,7 +936,15 @@ export class TaskOrchestrator {
944936
groupId,
945937
ownsRunningTasksService: false,
946938
});
947-
this.registerContinuousTaskExitHandler(runningTask, task, groupId, false);
939+
this.continuousTaskExitHandled.set(
940+
task.id,
941+
new Promise<void>((resolve) => {
942+
runningTask.onExit(async (code) => {
943+
await this.handleContinuousTaskExit(code, task, groupId, false);
944+
resolve();
945+
});
946+
})
947+
);
948948

949949
// task is already running by another process, we schedule the next tasks
950950
// and release the threads
@@ -997,7 +997,15 @@ export class TaskOrchestrator {
997997
groupId,
998998
ownsRunningTasksService: true,
999999
});
1000-
this.registerContinuousTaskExitHandler(childProcess, task, groupId, true);
1000+
this.continuousTaskExitHandled.set(
1001+
task.id,
1002+
new Promise<void>((resolve) => {
1003+
childProcess.onExit(async (code) => {
1004+
await this.handleContinuousTaskExit(code, task, groupId, true);
1005+
resolve();
1006+
});
1007+
})
1008+
);
10011009
await this.scheduleNextTasksAndReleaseThreads();
10021010

10031011
return childProcess;
@@ -1259,42 +1267,40 @@ export class TaskOrchestrator {
12591267

12601268
// endregion utils
12611269

1262-
private registerContinuousTaskExitHandler(
1263-
runningTask: RunningTask,
1270+
private async handleContinuousTaskExit(
1271+
code: number,
12641272
task: Task,
12651273
groupId: number,
12661274
ownsRunningTasksService: boolean
12671275
) {
1268-
runningTask.onExit(async (code) => {
1269-
// If cleanup already completed this task, nothing left to do
1270-
if (this.completedTasks[task.id] !== undefined) {
1271-
return;
1272-
}
1276+
// If cleanup already completed this task, nothing left to do
1277+
if (this.completedTasks[task.id] !== undefined) {
1278+
return;
1279+
}
12731280

1274-
const stoppingReason = this.runningContinuousTasks.get(
1275-
task.id
1276-
)?.stoppingReason;
1277-
if (stoppingReason || EXPECTED_TERMINATION_SIGNALS.has(code)) {
1278-
const reason =
1279-
stoppingReason === 'fulfilled' ? 'fulfilled' : 'interrupted';
1280-
await this.completeContinuousTask(
1281-
task,
1282-
groupId,
1283-
ownsRunningTasksService,
1284-
reason
1285-
);
1286-
} else {
1287-
console.error(
1288-
`Task "${task.id}" is continuous but exited with code ${code}`
1289-
);
1290-
await this.completeContinuousTask(
1291-
task,
1292-
groupId,
1293-
ownsRunningTasksService,
1294-
'crashed'
1295-
);
1296-
}
1297-
});
1281+
const stoppingReason = this.runningContinuousTasks.get(
1282+
task.id
1283+
)?.stoppingReason;
1284+
if (stoppingReason || EXPECTED_TERMINATION_SIGNALS.has(code)) {
1285+
const reason =
1286+
stoppingReason === 'fulfilled' ? 'fulfilled' : 'interrupted';
1287+
await this.completeContinuousTask(
1288+
task,
1289+
groupId,
1290+
ownsRunningTasksService,
1291+
reason
1292+
);
1293+
} else {
1294+
console.error(
1295+
`Task "${task.id}" is continuous but exited with code ${code}`
1296+
);
1297+
await this.completeContinuousTask(
1298+
task,
1299+
groupId,
1300+
ownsRunningTasksService,
1301+
'crashed'
1302+
);
1303+
}
12981304
}
12991305

13001306
private async completeContinuousTask(
@@ -1330,11 +1336,14 @@ export class TaskOrchestrator {
13301336
}
13311337

13321338
private async cleanup() {
1333-
if (this.cleanupDone) {
1334-
return;
1339+
if (this.cleanupPromise) {
1340+
return this.cleanupPromise;
13351341
}
1336-
this.cleanupDone = true;
1342+
this.cleanupPromise = this.performCleanup();
1343+
return this.cleanupPromise;
1344+
}
13371345

1346+
private async performCleanup() {
13381347
// Mark all running tasks for intentional stop
13391348
const reason = this.stopRequested ? 'interrupted' : 'fulfilled';
13401349
for (const entry of this.runningContinuousTasks.values()) {
@@ -1396,6 +1405,27 @@ export class TaskOrchestrator {
13961405
private setupSignalHandlers() {
13971406
process.once('SIGINT', () => {
13981407
this.stopRequested = true;
1408+
if (!this.tuiEnabled) {
1409+
// Synchronously remove DB entries before async cleanup to prevent
1410+
// new nx processes from seeing stale "Waiting for ..." messages.
1411+
// This replicates the cleanup that process.exit() + Rust Drop
1412+
// previously provided.
1413+
for (const [taskId, { ownsRunningTasksService }] of this
1414+
.runningContinuousTasks) {
1415+
if (ownsRunningTasksService) {
1416+
this.runningTasksService?.removeRunningTask(taskId);
1417+
}
1418+
}
1419+
// Silence output — pnpm (and similar wrappers) may exit before nx
1420+
// finishes cleanup, returning the shell prompt. Any output after
1421+
// that point would appear after the prompt.
1422+
const noop = (_chunk, _encoding, callback) => {
1423+
if (callback) callback();
1424+
return true;
1425+
};
1426+
process.stdout.write = noop as any;
1427+
process.stderr.write = noop as any;
1428+
}
13991429
this.cleanup().finally(() => {
14001430
if (this.resolveStopPromise) {
14011431
this.resolveStopPromise();
@@ -1405,13 +1435,15 @@ export class TaskOrchestrator {
14051435
});
14061436
});
14071437
process.once('SIGTERM', () => {
1438+
this.stopRequested = true;
14081439
this.cleanup().finally(() => {
14091440
if (this.resolveStopPromise) {
14101441
this.resolveStopPromise();
14111442
}
14121443
});
14131444
});
14141445
process.once('SIGHUP', () => {
1446+
this.stopRequested = true;
14151447
this.cleanup().finally(() => {
14161448
if (this.resolveStopPromise) {
14171449
this.resolveStopPromise();

0 commit comments

Comments
 (0)