Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion packages/test/src/test-interceptors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@ import { WorkflowClient, WorkflowFailedError } from '@temporalio/client';
import { ApplicationFailure, TerminatedFailure } from '@temporalio/common';
import { DefaultLogger, Runtime } from '@temporalio/worker';
import { defaultPayloadConverter, WorkflowInfo } from '@temporalio/workflow';
import { cleanOptionalStackTrace, compareStackTrace, RUN_INTEGRATION_TESTS, Worker } from './helpers';
import { cleanOptionalStackTrace, compareStackTrace, REUSE_V8_CONTEXT, RUN_INTEGRATION_TESTS, Worker } from './helpers';
import { defaultOptions } from './mock-native-worker';
import {
checkDisposeRan,
conditionWithTimeoutAfterDisposal,
continueAsNewToDifferentWorkflow,
initAndResetFlag,
interceptorExample,
internalsInterceptorExample,
successString,
unblockOrCancel,
} from './workflows';
import { getSecretQuery, unblockWithSecretSignal } from './workflows/interceptor-example';
Expand Down Expand Up @@ -312,4 +314,43 @@ if (RUN_INTEGRATION_TESTS) {
t.true(disposeFlagSetNow);
});
});

if (REUSE_V8_CONTEXT) {
// Test to trigger GH #1866
// When `reuseV8Context: true`, dispose() calls disableStorage() which disables the
// AsyncLocalStorage instance that stores cancellation scope.
// This causes CancellationScope.current() to return rootScope instead of the correct
// inner scope for workflows that continue afterward.
//
// The bug manifests in condition() with timeout: the finally block calls
// CancellationScope.current().cancel() to clean up.
// When storage is disabled, this incorrectly cancels the rootScope, failing the workflow with "Workflow cancelled".
test.serial('workflow disposal does not break CancellationScope in other workflows in reusable vm', async (t) => {
const taskQueue = 'test-reusable-vm-disposal-cancellation-scope';
const worker = await Worker.create({
...defaultOptions,
taskQueue,
});

const client = new WorkflowClient();
const result = await worker.runUntil(async () => {
// Fill the cache with workflow that complete immediately
await client.execute(successString, { taskQueue, workflowId: uuid4() });

// Start the condition workflow
const conditionHandle = await client.start(conditionWithTimeoutAfterDisposal, {
taskQueue,
workflowId: uuid4(),
});

// Run another workflow to trigger an evictions and disposal() while
// conditionWithTimeoutAfterDisposal is cached and waiting
await client.execute(successString, { taskQueue, workflowId: uuid4() });

// If dispose incorrectly disables the cancellation scope storage, then it will fail with CancelledFailure: "Workflow cancelled"
return await conditionHandle.result();
});
t.is(result, 'done');
});
}
}
1 change: 1 addition & 0 deletions packages/test/src/workflows/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ export * from './promise-then-promise';
export * from './race';
export * from './random';
export * from './reject-promise';
export * from './reusable-vm-disposal-bug';
export * from './run-activity-in-different-task-queue';
export * from './scope-cancelled-while-waiting-on-external-workflow-cancellation';
export * from './set-timeout-after-microtasks';
Expand Down
20 changes: 20 additions & 0 deletions packages/test/src/workflows/reusable-vm-disposal-bug.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { condition } from '@temporalio/workflow';

/**
* Workflow for reproducing the GH #1866
*
* When the bug is present:
* 1. First condition times out
* 2. finally block calls CancellationScope.current().cancel()
* 3. With disabled storage, current() returns rootScope
* 4. rootScope.cancel() is called, failing the workflow with "Workflow cancelled"
*
* Actual failure happens on the second `condition` where when creating the new
* cancellation scope, we see that the parent/root scope is already canceled.
*/
export async function conditionWithTimeoutAfterDisposal(): Promise<string> {
const alwaysFalse = false;
await condition(() => alwaysFalse, '500ms');
await condition(() => alwaysFalse, '500ms');
return 'done';
}
9 changes: 9 additions & 0 deletions packages/worker/src/workflow/reusable-vm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,15 @@ export class ReusableVMWorkflowCreator implements WorkflowCreator {
* Cleanup the pre-compiled script
*/
public async destroy(): Promise<void> {
if (this._context) {
vm.runInContext(
`{
__TEMPORAL__.api.destroy();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if there's a better way to call this

}`,
this._context,
{ timeout: this.isolateExecutionTimeoutMs, displayErrors: true }
);
}
globalHandlers.removeWorkflowBundle(this.workflowBundle);
delete this._context;
}
Expand Down
1 change: 1 addition & 0 deletions packages/worker/src/workflow/vm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ export class VMWorkflowCreator implements WorkflowCreator {
export class VMWorkflow extends BaseVMWorkflow {
public async dispose(): Promise<void> {
this.workflowModule.dispose();
this.workflowModule.destroy();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding back the disableStore/disableUpdateStore calls that were present in dispose before this PR.

VMWorkflowCreator.workflowByRunId.delete(this.runId);
delete this.context;
}
Expand Down
14 changes: 8 additions & 6 deletions packages/workflow/src/worker-interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
import { encodeVersioningBehavior, IllegalStateError, WorkflowFunctionWithOptions } from '@temporalio/common';
import { composeInterceptors } from '@temporalio/common/lib/interceptors';
import { coresdk } from '@temporalio/proto';
import { disableStorage } from './cancellation-scope';
import { disableUpdateStorage } from './update-scope';
import { WorkflowInterceptorsFactory } from './interceptors';
import { WorkflowCreateOptionsInternal } from './interfaces';
import { Activator } from './internals';
import { setActivatorUntyped, getActivator } from './global-attributes';
import { disableStorage } from './cancellation-scope';
import { disableUpdateStorage } from './update-scope';

// Export the type for use on the "worker" side
export { PromiseStackStore } from './internals';
Expand Down Expand Up @@ -256,16 +256,18 @@ export function dispose(): void {
const activator = getActivator();
activator.rethrowSynchronously = true;
try {
const dispose = composeInterceptors(activator.interceptors.internals, 'dispose', async () => {
disableStorage();
disableUpdateStorage();
});
const dispose = composeInterceptors(activator.interceptors.internals, 'dispose', async () => {});
dispose({});
} finally {
activator.rethrowSynchronously = false;
}
}

export function destroy(): void {
disableStorage();
disableUpdateStorage();
}

function isWorkflowFunctionWithOptions(obj: any): obj is WorkflowFunctionWithOptions<any[], any> {
if (obj == null) return false;
return Object.hasOwn(obj, 'workflowDefinitionOptions');
Expand Down
Loading