Skip to content

Commit c24f3a5

Browse files
committed
Reset deployed run worker and also get the warm start stuff to work properly
1 parent 8798352 commit c24f3a5

File tree

11 files changed

+114
-22
lines changed

11 files changed

+114
-22
lines changed

packages/cli-v3/src/entryPoints/dev-run-worker.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,7 @@ let _tracingSDK: TracingSDK | undefined;
250250
let _executionMeasurement: UsageMeasurement | undefined;
251251
let _cancelController = new AbortController();
252252
let _lastFlushPromise: Promise<void> | undefined;
253+
let _sharedWorkerRuntime: SharedRuntimeManager | undefined;
253254

254255
function resetExecutionEnvironment() {
255256
_execution = undefined;
@@ -265,7 +266,7 @@ function resetExecutionEnvironment() {
265266
usageTimeoutManager.reset();
266267
runMetadataManager.reset();
267268
waitUntilManager.reset();
268-
sharedWorkerRuntime.reset();
269+
_sharedWorkerRuntime?.reset();
269270
durableClock.reset();
270271
taskContext.disable();
271272

@@ -478,6 +479,8 @@ const zodIpc = new ZodIpcConnection({
478479
});
479480
}
480481
} finally {
482+
_execution = undefined;
483+
_isRunning = false;
481484
log(`[${new Date().toISOString()}] Task run completed`);
482485
}
483486
} catch (err) {
@@ -515,7 +518,7 @@ const zodIpc = new ZodIpcConnection({
515518
await flushAll(timeoutInMs);
516519
},
517520
RESOLVE_WAITPOINT: async ({ waitpoint }) => {
518-
sharedWorkerRuntime.resolveWaitpoints([waitpoint]);
521+
_sharedWorkerRuntime?.resolveWaitpoints([waitpoint]);
519522
},
520523
},
521524
});
@@ -606,8 +609,8 @@ async function flushMetadata(timeoutInMs: number = 10_000) {
606609
};
607610
}
608611

609-
const sharedWorkerRuntime = new SharedRuntimeManager(zodIpc, showInternalLogs);
610-
runtime.setGlobalRuntimeManager(sharedWorkerRuntime);
612+
_sharedWorkerRuntime = new SharedRuntimeManager(zodIpc, showInternalLogs);
613+
runtime.setGlobalRuntimeManager(_sharedWorkerRuntime);
611614

612615
const heartbeatInterval = parseInt(heartbeatIntervalMs ?? "30000", 10);
613616

packages/cli-v3/src/entryPoints/managed-run-worker.ts

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
runMetadata,
1919
runtime,
2020
runTimelineMetrics,
21+
taskContext,
2122
TaskRunErrorCodes,
2223
TaskRunExecution,
2324
timeout,
@@ -58,6 +59,7 @@ import sourceMapSupport from "source-map-support";
5859
import { env } from "std-env";
5960
import { normalizeImportPath } from "../utilities/normalizeImportPath.js";
6061
import { VERSION } from "../version.js";
62+
import { promiseWithResolvers } from "@trigger.dev/core/utils";
6163

6264
sourceMapSupport.install({
6365
handleUncaughtExceptions: false,
@@ -110,15 +112,18 @@ lifecycleHooks.setGlobalLifecycleHooksManager(standardLifecycleHooksManager);
110112
const standardRunTimelineMetricsManager = new StandardRunTimelineMetricsManager();
111113
runTimelineMetrics.setGlobalManager(standardRunTimelineMetricsManager);
112114

113-
resourceCatalog.setGlobalResourceCatalog(new StandardResourceCatalog());
115+
const standardResourceCatalog = new StandardResourceCatalog();
116+
resourceCatalog.setGlobalResourceCatalog(standardResourceCatalog);
114117

115118
const durableClock = new DurableClock();
116119
clock.setGlobalClock(durableClock);
120+
117121
const runMetadataManager = new StandardMetadataManager(
118122
apiClientManager.clientOrThrow(),
119123
getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev"
120124
);
121125
runMetadata.setGlobalManager(runMetadataManager);
126+
122127
const waitUntilManager = new StandardWaitUntilManager();
123128
waitUntil.setGlobalManager(waitUntilManager);
124129
// Wait for all streams to finish before completing the run
@@ -236,7 +241,30 @@ let _isRunning = false;
236241
let _isCancelled = false;
237242
let _tracingSDK: TracingSDK | undefined;
238243
let _executionMeasurement: UsageMeasurement | undefined;
239-
const cancelController = new AbortController();
244+
let _cancelController = new AbortController();
245+
let _lastFlushPromise: Promise<void> | undefined;
246+
let _sharedWorkerRuntime: SharedRuntimeManager | undefined;
247+
248+
function resetExecutionEnvironment() {
249+
_execution = undefined;
250+
_isRunning = false;
251+
_isCancelled = false;
252+
_executionMeasurement = undefined;
253+
_cancelController = new AbortController();
254+
255+
standardLocalsManager.reset();
256+
standardLifecycleHooksManager.reset();
257+
standardRunTimelineMetricsManager.reset();
258+
usage.reset();
259+
timeout.reset();
260+
runMetadataManager.reset();
261+
waitUntilManager.reset();
262+
_sharedWorkerRuntime?.reset();
263+
durableClock.reset();
264+
taskContext.disable();
265+
266+
console.log(`[${new Date().toISOString()}] Reset execution environment`);
267+
}
240268

241269
const zodIpc = new ZodIpcConnection({
242270
listenSchema: WorkerToExecutorMessageCatalog,
@@ -253,6 +281,22 @@ const zodIpc = new ZodIpcConnection({
253281
});
254282
}
255283

284+
console.log(
285+
`[${new Date().toISOString()}] Received EXECUTE_TASK_RUN isWarmStart ${String(isWarmStart)}`
286+
);
287+
288+
if (_lastFlushPromise) {
289+
const now = performance.now();
290+
291+
await _lastFlushPromise;
292+
293+
const duration = performance.now() - now;
294+
295+
console.log(`[${new Date().toISOString()}] Awaited last flush in ${duration}ms`);
296+
}
297+
298+
resetExecutionEnvironment();
299+
256300
initializeUsageManager({
257301
usageIntervalMs: getEnvVar("USAGE_HEARTBEAT_INTERVAL_MS"),
258302
usageEventUrl: getEnvVar("USAGE_EVENT_URL"),
@@ -421,7 +465,7 @@ const zodIpc = new ZodIpcConnection({
421465

422466
const timeoutController = timeout.abortAfterTimeout(execution.run.maxDuration);
423467

424-
const signal = AbortSignal.any([cancelController.signal, timeoutController.signal]);
468+
const signal = AbortSignal.any([_cancelController.signal, timeoutController.signal]);
425469

426470
const { result } = await executor.execute(execution, metadata, traceContext, signal);
427471

@@ -442,6 +486,8 @@ const zodIpc = new ZodIpcConnection({
442486
} finally {
443487
_execution = undefined;
444488
_isRunning = false;
489+
490+
console.log(`[${new Date().toISOString()}] Task run completed`);
445491
}
446492
} catch (err) {
447493
console.error("Failed to execute task", err);
@@ -467,7 +513,7 @@ const zodIpc = new ZodIpcConnection({
467513
},
468514
CANCEL: async ({ timeoutInMs }) => {
469515
_isCancelled = true;
470-
cancelController.abort("run cancelled");
516+
_cancelController.abort("run cancelled");
471517
await callCancelHooks(timeoutInMs);
472518
if (_executionMeasurement) {
473519
usage.stop(_executionMeasurement);
@@ -478,7 +524,7 @@ const zodIpc = new ZodIpcConnection({
478524
await flushAll(timeoutInMs);
479525
},
480526
RESOLVE_WAITPOINT: async ({ waitpoint }) => {
481-
sharedWorkerRuntime.resolveWaitpoints([waitpoint]);
527+
_sharedWorkerRuntime?.resolveWaitpoints([waitpoint]);
482528
},
483529
},
484530
});
@@ -498,6 +544,10 @@ async function callCancelHooks(timeoutInMs: number = 10_000) {
498544
async function flushAll(timeoutInMs: number = 10_000) {
499545
const now = performance.now();
500546

547+
const { promise, resolve } = promiseWithResolvers<void>();
548+
549+
_lastFlushPromise = promise;
550+
501551
const results = await Promise.allSettled([
502552
flushUsage(timeoutInMs),
503553
flushTracingSDK(timeoutInMs),
@@ -530,6 +580,9 @@ async function flushAll(timeoutInMs: number = 10_000) {
530580
const duration = performance.now() - now;
531581

532582
console.log(`Flushed all in ${duration}ms`);
583+
584+
// Resolve the last flush promise
585+
resolve();
533586
}
534587

535588
async function flushUsage(timeoutInMs: number = 10_000) {
@@ -597,9 +650,8 @@ function initializeUsageManager({
597650
timeout.setGlobalManager(new UsageTimeoutManager(devUsageManager));
598651
}
599652

600-
const sharedWorkerRuntime = new SharedRuntimeManager(zodIpc, true);
601-
602-
runtime.setGlobalRuntimeManager(sharedWorkerRuntime);
653+
_sharedWorkerRuntime = new SharedRuntimeManager(zodIpc, true);
654+
runtime.setGlobalRuntimeManager(_sharedWorkerRuntime);
603655

604656
process.title = "trigger-managed-worker";
605657

packages/cli-v3/src/entryPoints/managed/controller.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,9 @@ export class ManagedRunController {
318318
logger: this.logger,
319319
supervisorSocket: this.socket,
320320
taskRunProcessProvider: this.taskRunProcessProvider,
321-
}).prepareForExecution({
321+
});
322+
323+
await this.currentExecution.prepareForExecution({
322324
taskRunEnv: previousTaskRunEnv,
323325
});
324326
}

packages/cli-v3/src/entryPoints/managed/execution.ts

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ export class RunExecution {
128128
* Prepares the execution with task run environment variables.
129129
* This should be called before executing, typically after a successful run to prepare for the next one.
130130
*/
131-
public prepareForExecution(opts: RunExecutionPrepareOptions): this {
131+
public async prepareForExecution(opts: RunExecutionPrepareOptions) {
132132
if (this.isShuttingDown) {
133133
throw new Error("prepareForExecution called after execution shut down");
134134
}
@@ -137,11 +137,10 @@ export class RunExecution {
137137
throw new Error("prepareForExecution called after process was already created");
138138
}
139139

140-
// Store the environment for later use, don't create process yet
141-
// The process will be created when needed in executeRun
142-
this.currentTaskRunEnv = opts.taskRunEnv;
143-
144-
return this;
140+
this.taskRunProcess = await this.taskRunProcessProvider.getProcess({
141+
taskRunEnv: opts.taskRunEnv,
142+
isWarmStart: true,
143+
});
145144
}
146145

147146
private attachTaskRunProcessHandlers(taskRunProcess: TaskRunProcess): void {
@@ -183,6 +182,10 @@ export class RunExecution {
183182
* Returns true if no run has been started yet and we're prepared for the next run.
184183
*/
185184
get canExecute(): boolean {
185+
if (this.taskRunProcessProvider.hasPersistentProcess) {
186+
return true;
187+
}
188+
186189
// If we've ever had a run ID, this execution can't be reused
187190
if (this._runFriendlyId) {
188191
return false;
@@ -359,7 +362,7 @@ export class RunExecution {
359362
throw new Error("Cannot start attempt: missing run or snapshot manager");
360363
}
361364

362-
this.sendDebugLog("starting attempt");
365+
this.sendDebugLog("starting attempt", { isWarmStart: String(isWarmStart) });
363366

364367
const attemptStartedAt = Date.now();
365368

@@ -404,7 +407,7 @@ export class RunExecution {
404407
podScheduledAt: this.podScheduledAt?.getTime(),
405408
});
406409

407-
this.sendDebugLog("started attempt");
410+
this.sendDebugLog("started attempt", { start: start.data });
408411

409412
return { ...start.data, metrics };
410413
}
@@ -464,7 +467,9 @@ export class RunExecution {
464467
return;
465468
}
466469

467-
const [executeError] = await tryCatch(this.executeRunWrapper(start));
470+
const [executeError] = await tryCatch(
471+
this.executeRunWrapper({ ...start, isWarmStart: runOpts.isWarmStart })
472+
);
468473

469474
if (executeError) {
470475
this.sendDebugLog("failed to execute run", { error: executeError.message });

packages/cli-v3/src/entryPoints/managed/taskRunProcessProvider.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ export class TaskRunProcessProvider {
3535
this.processKeepAliveMaxExecutionCount = opts.processKeepAliveMaxExecutionCount;
3636
}
3737

38+
get hasPersistentProcess(): boolean {
39+
return !!this.persistentProcess;
40+
}
41+
3842
async handleImmediateRetry(): Promise<void> {
3943
if (!this.processKeepAliveEnabled) {
4044
// For immediate retries, we need to ensure we have a clean process

packages/core/src/v3/timeout/api.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ class NoopTimeoutManager implements TimeoutManager {
77
abortAfterTimeout(timeoutInSeconds?: number): AbortController {
88
return new AbortController();
99
}
10+
11+
reset() {}
1012
}
1113

1214
const NOOP_TIMEOUT_MANAGER = new NoopTimeoutManager();
@@ -40,6 +42,11 @@ export class TimeoutAPI implements TimeoutManager {
4042
unregisterGlobal(API_NAME);
4143
}
4244

45+
public reset() {
46+
this.#getManager().reset();
47+
this.disable();
48+
}
49+
4350
#getManager(): TimeoutManager {
4451
return getGlobal(API_NAME) ?? NOOP_TIMEOUT_MANAGER;
4552
}

packages/core/src/v3/timeout/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
export interface TimeoutManager {
22
abortAfterTimeout: (timeoutInSeconds?: number) => AbortController;
33
signal?: AbortSignal;
4+
reset: () => void;
45
}
56

67
export class TaskRunExceededMaxDuration extends Error {

packages/core/src/v3/usage/api.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ export class UsageAPI implements UsageManager {
4848
return this.#getUsageManager().flush();
4949
}
5050

51+
public reset() {
52+
this.#getUsageManager().reset();
53+
this.disable();
54+
}
55+
5156
#getUsageManager(): UsageManager {
5257
return getGlobal(API_NAME) ?? NOOP_USAGE_MANAGER;
5358
}

packages/core/src/v3/usage/noopUsageManager.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,8 @@ export class NoopUsageManager implements UsageManager {
2626
sample(): UsageSample | undefined {
2727
return undefined;
2828
}
29+
30+
reset(): void {
31+
// Noop
32+
}
2933
}

packages/core/src/v3/usage/prodUsageManager.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,14 @@ export class ProdUsageManager implements UsageManager {
2727
return typeof this._usageClient !== "undefined";
2828
}
2929

30+
reset(): void {
31+
this.delegageUsageManager.reset();
32+
this._abortController = new AbortController();
33+
this._usageClient = undefined;
34+
this._measurement = undefined;
35+
this._lastSample = undefined;
36+
}
37+
3038
disable(): void {
3139
this.delegageUsageManager.disable();
3240
this._abortController?.abort();

0 commit comments

Comments
 (0)