Skip to content

Commit bc32cf1

Browse files
feat(workflow): Automatically cleanup AsyncLocalStorage on workflow context disposal (#1871)
Co-authored-by: Chris Olszewski <chris.olszewski@temporal.io>
1 parent 868d330 commit bc32cf1

File tree

15 files changed

+309
-85
lines changed

15 files changed

+309
-85
lines changed

packages/interceptors-opentelemetry/src/workflow/context-manager.ts

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
1+
import { type AsyncLocalStorage } from 'async_hooks';
12
import * as otel from '@opentelemetry/api';
2-
import { ensureWorkflowModuleLoaded, getWorkflowModuleIfAvailable } from './workflow-module-loader';
3-
4-
const AsyncLocalStorage = getWorkflowModuleIfAvailable()?.AsyncLocalStorage;
3+
import { ensureWorkflowModuleLoaded } from './workflow-module-loader';
54

65
export class ContextManager implements otel.ContextManager {
7-
// If `@temporalio/workflow` is not available, ignore for now.
8-
// When ContextManager is constructed module resolution error will be thrown.
9-
protected storage = AsyncLocalStorage ? new AsyncLocalStorage<otel.Context>() : undefined;
6+
// The workflow sandbox provides AsyncLocalStorage through globalThis.
7+
protected storage: AsyncLocalStorage<otel.Context> = new (globalThis as any).AsyncLocalStorage();
108

119
public constructor() {
1210
ensureWorkflowModuleLoaded();

packages/test/src/test-interceptors.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@ import { cleanOptionalStackTrace, compareStackTrace, RUN_INTEGRATION_TESTS, Work
1616
import { defaultOptions } from './mock-native-worker';
1717
import {
1818
checkDisposeRan,
19+
conditionWithTimeoutAfterDisposal,
1920
continueAsNewToDifferentWorkflow,
2021
initAndResetFlag,
2122
interceptorExample,
2223
internalsInterceptorExample,
24+
successString,
2325
unblockOrCancel,
2426
} from './workflows';
2527
import { getSecretQuery, unblockWithSecretSignal } from './workflows/interceptor-example';
@@ -312,4 +314,41 @@ if (RUN_INTEGRATION_TESTS) {
312314
t.true(disposeFlagSetNow);
313315
});
314316
});
317+
318+
// Test to trigger GH #1866
319+
// When `reuseV8Context: true`, dispose() calls disableStorage() which disables the
320+
// AsyncLocalStorage instance that stores cancellation scope.
321+
// This causes CancellationScope.current() to return rootScope instead of the correct
322+
// inner scope for workflows that continue afterward.
323+
//
324+
// The bug manifests in condition() with timeout: the finally block calls
325+
// CancellationScope.current().cancel() to clean up.
326+
// When storage is disabled, this incorrectly cancels the rootScope, failing the workflow with "Workflow cancelled".
327+
test.serial('workflow disposal does not break CancellationScope in other workflows in reusable vm', async (t) => {
328+
const taskQueue = 'test-reusable-vm-disposal-cancellation-scope';
329+
const worker = await Worker.create({
330+
...defaultOptions,
331+
taskQueue,
332+
});
333+
334+
const client = new WorkflowClient();
335+
const result = await worker.runUntil(async () => {
336+
// Fill the cache with workflow that complete immediately
337+
await client.execute(successString, { taskQueue, workflowId: uuid4() });
338+
339+
// Start the condition workflow
340+
const conditionHandle = await client.start(conditionWithTimeoutAfterDisposal, {
341+
taskQueue,
342+
workflowId: uuid4(),
343+
});
344+
345+
// Run another workflow to trigger an evictions and disposal() while
346+
// conditionWithTimeoutAfterDisposal is cached and waiting
347+
await client.execute(successString, { taskQueue, workflowId: uuid4() });
348+
349+
// If dispose incorrectly disables the cancellation scope storage, then it will fail with CancelledFailure: "Workflow cancelled"
350+
return await conditionHandle.result();
351+
});
352+
t.is(result, 'done');
353+
});
315354
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
import type { AsyncLocalStorage } from 'async_hooks';
2+
import * as workflow from '@temporalio/workflow';
3+
import { helpers, makeTestFunction } from './helpers-integration';
4+
import { unblockSignal } from './workflows/testenv-test-workflows';
5+
6+
const test = makeTestFunction({
7+
workflowsPath: __filename,
8+
workflowInterceptorModules: [require.resolve('./workflows/otel-interceptors')],
9+
});
10+
11+
export async function asyncLocalStorageWorkflow(explicitlyDisable: boolean): Promise<void> {
12+
const myAls: AsyncLocalStorage<unknown> = new (globalThis as any).AsyncLocalStorage('My Workflow ALS');
13+
try {
14+
await myAls.run({}, async () => {
15+
let signalReceived = false;
16+
workflow.setHandler(unblockSignal, () => {
17+
signalReceived = true;
18+
});
19+
await workflow.condition(() => signalReceived);
20+
});
21+
} finally {
22+
if (explicitlyDisable) {
23+
myAls.disable();
24+
}
25+
}
26+
}
27+
28+
test("AsyncLocalStorage in workflow context doesn't throw when disabled", async (t) => {
29+
const { createWorker, startWorkflow } = helpers(t);
30+
31+
const worker = await createWorker({
32+
// We disable the workflow cache to ensure that some workflow executions will get
33+
// evicted from cache, forcing early disposal of the corresponding workflow vms.
34+
maxCachedWorkflows: 0,
35+
maxConcurrentWorkflowTaskExecutions: 1,
36+
37+
sinks: {
38+
exporters: {
39+
export: {
40+
fn: () => void 0,
41+
},
42+
},
43+
},
44+
});
45+
46+
await worker.runUntil(async () => {
47+
const wfs = await Promise.all([
48+
startWorkflow(asyncLocalStorageWorkflow, { args: [true] }),
49+
startWorkflow(asyncLocalStorageWorkflow, { args: [false] }),
50+
startWorkflow(asyncLocalStorageWorkflow, { args: [true] }),
51+
startWorkflow(asyncLocalStorageWorkflow, { args: [false] }),
52+
]);
53+
54+
await Promise.all([wfs[0].signal(unblockSignal), wfs[1].signal(unblockSignal)]);
55+
await Promise.all([wfs[0].result(), wfs[1].result()]);
56+
});
57+
58+
// We're only asserting that no error is thrown. There's unfortunately no way
59+
// to programmatically confirm that ALS instances were properly disposed.
60+
t.pass();
61+
});

packages/test/src/test-workflows.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,10 @@ test.beforeEach(async (t) => {
9797
};
9898
});
9999

100+
test.afterEach(async (t) => {
101+
await t.context.workflow?.dispose();
102+
});
103+
100104
async function createWorkflow(
101105
workflowType: string,
102106
runId: string,

packages/test/src/workflows/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ export * from './promise-then-promise';
5959
export * from './race';
6060
export * from './random';
6161
export * from './reject-promise';
62+
export * from './reusable-vm-disposal-bug';
6263
export * from './run-activity-in-different-task-queue';
6364
export * from './scope-cancelled-while-waiting-on-external-workflow-cancellation';
6465
export * from './set-timeout-after-microtasks';
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import { condition } from '@temporalio/workflow';
2+
3+
/**
4+
* Workflow for reproducing the GH #1866
5+
*
6+
* When the bug is present:
7+
* 1. First condition times out
8+
* 2. finally block calls CancellationScope.current().cancel()
9+
* 3. With disabled storage, current() returns rootScope
10+
* 4. rootScope.cancel() is called, failing the workflow with "Workflow cancelled"
11+
*
12+
* Actual failure happens on the second `condition` where when creating the new
13+
* cancellation scope, we see that the parent/root scope is already canceled.
14+
*/
15+
export async function conditionWithTimeoutAfterDisposal(): Promise<string> {
16+
const alwaysFalse = false;
17+
await condition(() => alwaysFalse, '500ms');
18+
await condition(() => alwaysFalse, '500ms');
19+
return 'done';
20+
}

packages/worker/src/workflow/reusable-vm.ts

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator {
2727
*
2828
* Use the {@link context} getter instead
2929
*/
30-
private _context?: vm.Context;
30+
private _context?: vm.Context & typeof globalThis;
3131
private pristineObj?: object;
3232

3333
constructor(
@@ -42,12 +42,12 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator {
4242
ReusableVMWorkflowCreator.unhandledRejectionHandlerHasBeenSet = true;
4343
}
4444

45-
this._context = vm.createContext({}, { microtaskMode: 'afterEvaluate' });
45+
this._context = vm.createContext({}, { microtaskMode: 'afterEvaluate' }) as vm.Context & typeof globalThis;
4646
vm.runInContext(
4747
`{
4848
const __TEMPORAL_CALL_INTO_SCOPE = () => {
49-
const [holder, fn, args] = globalThis.__TEMPORAL_ARGS__;
50-
delete globalThis.__TEMPORAL_ARGS__;
49+
const [holder, fn, args] = globalThis.__temporal_args;
50+
delete globalThis.__temporal_args;
5151
5252
if (globalThis.__TEMPORAL_BAG_HOLDER__ !== holder) {
5353
if (globalThis.__TEMPORAL_BAG_HOLDER__ !== undefined) {
@@ -81,13 +81,11 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator {
8181
{ timeout: isolateExecutionTimeoutMs, displayErrors: true }
8282
);
8383

84-
this.injectGlobals(this._context);
85-
8684
const sharedModules = new Map<string | symbol, any>();
8785
const __webpack_module_cache__ = new Proxy(
8886
{},
8987
{
90-
get: (_, p) => {
88+
get: (_, p: string) => {
9189
// Try the shared modules first
9290
const sharedModule = sharedModules.get(p);
9391
if (sharedModule) {
@@ -96,7 +94,7 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator {
9694
const moduleCache = this.context.__TEMPORAL_ACTIVATOR__?.moduleCache;
9795
return moduleCache?.get(p);
9896
},
99-
set: (_, p, val) => {
97+
set: (_, p: string, val) => {
10098
const moduleCache = this.context.__TEMPORAL_ACTIVATOR__?.moduleCache;
10199
if (moduleCache != null) {
102100
moduleCache.set(p, val);
@@ -115,6 +113,8 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator {
115113
configurable: false,
116114
});
117115

116+
this.injectGlobals(this._context);
117+
118118
script.runInContext(this.context);
119119

120120
// The V8 context is really composed of two distinct objects: the 'this._context' object on the outside, and another
@@ -127,7 +127,7 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator {
127127
...Object.getOwnPropertyNames(this.pristineObj),
128128
...Object.getOwnPropertySymbols(this.pristineObj),
129129
]) {
130-
if (k !== 'globalThis') {
130+
if (k !== 'globalThis' && k !== '__temporal_globalSandboxDestructors') {
131131
const v: PropertyDescriptor = (this.pristineObj as any)[k];
132132
v.value = deepFreeze(v.value);
133133
}
@@ -136,7 +136,7 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator {
136136
for (const v of sharedModules.values()) deepFreeze(v);
137137
}
138138

139-
protected get context(): vm.Context {
139+
protected get context(): vm.Context & typeof globalThis {
140140
const { _context } = this;
141141
if (_context == null) {
142142
throw new IllegalStateError('Tried to use v8 context after Workflow creator was destroyed');
@@ -159,31 +159,31 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator {
159159
async createWorkflow(options: WorkflowCreateOptions): Promise<Workflow> {
160160
const context = this.context;
161161
const holder: BagHolder = { bag: this.pristineObj! };
162-
163162
const { isolateExecutionTimeoutMs } = this;
163+
164164
const workflowModule: WorkflowModule = new Proxy(
165165
{},
166166
{
167167
get(_: any, fn: string) {
168168
return (...args: any[]) => {
169169
// By the time we get out of this call, all microtasks will have been executed
170-
context.__TEMPORAL_ARGS__ = [holder, fn, args];
170+
context.__temporal_args = [holder, fn, args];
171171
return callIntoVmScript.runInContext(context, {
172172
timeout: isolateExecutionTimeoutMs,
173173
displayErrors: true,
174174
});
175175
};
176176
},
177177
}
178-
) as any;
178+
);
179179

180180
workflowModule.initRuntime({
181181
...options,
182182
sourceMap: this.workflowBundle.sourceMap,
183183
getTimeOfDay: native.getTimeOfDay,
184184
registeredActivityNames: this.registeredActivityNames,
185185
});
186-
const activator = context['__TEMPORAL_ACTIVATOR__'];
186+
const activator = context.__TEMPORAL_ACTIVATOR__!;
187187
const newVM = new ReusableVMWorkflow(options.info.runId, context, activator, workflowModule);
188188
ReusableVMWorkflowCreator.workflowByRunId.set(options.info.runId, newVM);
189189
return newVM;
@@ -210,8 +210,12 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator {
210210
* Cleanup the pre-compiled script
211211
*/
212212
public async destroy(): Promise<void> {
213-
globalHandlers.removeWorkflowBundle(this.workflowBundle);
214-
delete this._context;
213+
try {
214+
vm.runInContext(`__TEMPORAL__.api.destroy()`, this.context);
215+
} finally {
216+
globalHandlers.removeWorkflowBundle(this.workflowBundle);
217+
delete this._context;
218+
}
215219
}
216220
}
217221

packages/worker/src/workflow/vm-shared.ts

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import v8 from 'node:v8';
22
import vm from 'node:vm';
3-
import { AsyncLocalStorage } from 'node:async_hooks';
3+
import { AsyncLocalStorage as AsyncLocalStorageOriginal } from 'node:async_hooks';
44
import assert from 'node:assert';
55
import { URL, URLSearchParams } from 'node:url';
66
import { TextDecoder, TextEncoder } from 'node:util';
@@ -18,6 +18,9 @@ import { convertDeploymentVersion } from '../utils';
1818
import { Workflow } from './interface';
1919
import { WorkflowBundleWithSourceMapAndFilename } from './workflow-worker-thread/input';
2020

21+
// We need this import for the ambient global extensions
22+
import '@temporalio/workflow/lib/global-attributes'; // eslint-disable-line import/no-unassigned-import
23+
2124
// Best effort to catch unhandled rejections from workflow code.
2225
// We crash the thread if we cannot find the culprit.
2326
export function setUnhandledRejectionHandler(getWorkflowByRunId: (runId: string) => BaseVMWorkflow | undefined): void {
@@ -89,8 +92,9 @@ function formatCallsiteName(callsite: NodeJS.CallSite): string | null {
8992
* Inject global objects as well as console.[log|...] into a vm context.
9093
*/
9194
export function injectGlobals(context: vm.Context): void {
95+
const sandboxGlobalThis = context as typeof globalThis;
96+
9297
const globals = {
93-
AsyncLocalStorage,
9498
URL,
9599
URLSearchParams,
96100
assert,
@@ -99,25 +103,61 @@ export function injectGlobals(context: vm.Context): void {
99103
AbortController,
100104
};
101105
for (const [k, v] of Object.entries(globals)) {
102-
Object.defineProperty(context, k, { value: v, writable: false, enumerable: true, configurable: false });
106+
Object.defineProperty(sandboxGlobalThis, k, { value: v, writable: false, enumerable: true, configurable: false });
103107
}
104108

105109
const consoleMethods = ['log', 'warn', 'error', 'info', 'debug'] as const;
106110
type ConsoleMethod = (typeof consoleMethods)[number];
107111
function makeConsoleFn(level: ConsoleMethod) {
108112
return function (...args: unknown[]) {
109-
const { info } = context.__TEMPORAL_ACTIVATOR__;
110-
if (info.isReplaying) return;
111-
console[level](`[${info.workflowType}(${info.workflowId})]`, ...args);
113+
if (sandboxGlobalThis.__TEMPORAL_ACTIVATOR__ === undefined) {
114+
// This should not happen in a normal execution environment, but this is
115+
// often handy while debugging the SDK, and costs nothing to keep around.
116+
console[level](`[not in workflow context]`, ...args);
117+
} else {
118+
const { info } = sandboxGlobalThis.__TEMPORAL_ACTIVATOR__!;
119+
if (info.unsafe.isReplaying) return;
120+
console[level](`[${info.workflowType}(${info.workflowId})]`, ...args);
121+
}
112122
};
113123
}
114124
const consoleObject = Object.fromEntries(consoleMethods.map((level) => [level, makeConsoleFn(level)]));
115-
Object.defineProperty(context, 'console', {
125+
Object.defineProperty(sandboxGlobalThis, 'console', {
116126
value: consoleObject,
117127
writable: true,
118128
enumerable: false,
119129
configurable: true,
120130
});
131+
132+
class AsyncLocalStorage extends AsyncLocalStorageOriginal<any> {
133+
constructor(private name: string = 'anonymous') {
134+
super();
135+
136+
const activator = sandboxGlobalThis.__TEMPORAL_ACTIVATOR__;
137+
if (activator) {
138+
activator.workflowSandboxDestructors.push(this.disable.bind(this));
139+
} else {
140+
if (sandboxGlobalThis.__temporal_globalSandboxDestructors === undefined)
141+
Object.defineProperty(sandboxGlobalThis, '__temporal_globalSandboxDestructors', {
142+
value: [],
143+
writable: false,
144+
enumerable: false,
145+
configurable: false,
146+
});
147+
sandboxGlobalThis.__temporal_globalSandboxDestructors!.push(this.disable.bind(this));
148+
}
149+
}
150+
151+
disable(): void {
152+
super.disable();
153+
}
154+
}
155+
Object.defineProperty(sandboxGlobalThis, 'AsyncLocalStorage', {
156+
value: AsyncLocalStorage,
157+
writable: false,
158+
enumerable: true,
159+
configurable: false,
160+
});
121161
}
122162

123163
/**

0 commit comments

Comments
 (0)