From 0a29eb27f6532fdd2a5c3fde3d69d9268c8dfe6d Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Sun, 1 Jun 2025 07:09:24 +0200 Subject: [PATCH 01/14] impl --- packages/activity/src/index.ts | 29 ++++++++++-- .../src/activity-cancellation-details.ts | 45 +++++++++++++++++++ packages/common/src/index.ts | 1 + packages/core-bridge/src/worker.rs | 9 +--- packages/worker/src/activity.ts | 13 ++++-- packages/worker/src/worker.ts | 16 +++++-- 6 files changed, 96 insertions(+), 17 deletions(-) create mode 100644 packages/common/src/activity-cancellation-details.ts diff --git a/packages/activity/src/index.ts b/packages/activity/src/index.ts index f0ba9da60..2b97f9200 100644 --- a/packages/activity/src/index.ts +++ b/packages/activity/src/index.ts @@ -48,7 +48,7 @@ * * 1. `await` on {@link Context.cancelled | `Context.current().cancelled`} or * {@link Context.sleep | `Context.current().sleep()`}, which each throw a {@link CancelledFailure}. - * 1. Pass the context's {@link https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal | `AbortSignal`} at + * 2. Pass the context's {@link https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal | `AbortSignal`} at * {@link Context.cancellationSignal | `Context.current().cancellationSignal`} to a library that supports it. * * ### Examples @@ -70,7 +70,16 @@ */ import { AsyncLocalStorage } from 'node:async_hooks'; -import { Logger, Duration, LogLevel, LogMetadata, MetricMeter, Priority } from '@temporalio/common'; +import { + Logger, + Duration, + LogLevel, + LogMetadata, + MetricMeter, + Priority, + ActivityCancellationDetailsHolder, + ActivityCancellationDetails, +} from '@temporalio/common'; import { msToNumber } from '@temporalio/common/lib/time'; import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers'; @@ -289,6 +298,11 @@ export class Context { */ public readonly metricMeter: MetricMeter; + /** + * Holder object for activity cancellation details + */ + public readonly cancellationDetails: ActivityCancellationDetailsHolder; + /** * **Not** meant to instantiated by Activity code, used by the worker. * @@ -300,7 +314,8 @@ export class Context { cancellationSignal: AbortSignal, heartbeat: (details?: any) => void, log: Logger, - metricMeter: MetricMeter + metricMeter: MetricMeter, + details: ActivityCancellationDetailsHolder ) { this.info = info; this.cancelled = cancelled; @@ -308,6 +323,7 @@ export class Context { this.heartbeatFn = heartbeat; this.log = log; this.metricMeter = metricMeter; + this.cancellationDetails = details; } /** @@ -427,6 +443,13 @@ export function cancelled(): Promise { return Context.current().cancelled; } +/** + * Returns the cancellation details for this activity, if any. + */ +export function cancellationDetails(): ActivityCancellationDetails | undefined { + return Context.current().cancellationDetails.details; +} + /** * Return an {@link https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal | `AbortSignal`} that can be used to * react to Activity cancellation. diff --git a/packages/common/src/activity-cancellation-details.ts b/packages/common/src/activity-cancellation-details.ts new file mode 100644 index 000000000..0c41e30fc --- /dev/null +++ b/packages/common/src/activity-cancellation-details.ts @@ -0,0 +1,45 @@ +import type { coresdk } from '@temporalio/proto'; + +export interface ActivityCancellationDetailsHolder { + details?: ActivityCancellationDetails; +} + +/** + * Provides the reasons for the activity's cancellation. Cancellation details are set once and do not change once set. + */ +export class ActivityCancellationDetails { + readonly notFound: boolean; + readonly cancelRequested: boolean; + readonly paused: boolean; + readonly timedOut: boolean; + readonly workerShutdown: boolean; + + private constructor( + notFound: boolean = false, + cancelRequested: boolean = false, + paused: boolean = false, + timedOut: boolean = false, + workerShutdown: boolean = false + ) { + this.notFound = notFound; + this.cancelRequested = cancelRequested; + this.paused = paused; + this.timedOut = timedOut; + this.workerShutdown = workerShutdown; + } + + static fromProto( + proto: coresdk.activity_task.IActivityCancellationDetails | null | undefined + ): ActivityCancellationDetails { + if (proto == null) { + return new ActivityCancellationDetails(); + } + return new ActivityCancellationDetails( + proto.isNotFound ?? false, + proto.isCancelled ?? false, + proto.isPaused ?? false, + proto.isTimedOut ?? false, + proto.isWorkerShutdown ?? false + ); + } +} diff --git a/packages/common/src/index.ts b/packages/common/src/index.ts index ba703f3f0..2c9c50b08 100644 --- a/packages/common/src/index.ts +++ b/packages/common/src/index.ts @@ -8,6 +8,7 @@ import * as encoding from './encoding'; import * as helpers from './type-helpers'; export * from './activity-options'; +export * from './activity-cancellation-details'; export * from './converter/data-converter'; export * from './converter/failure-converter'; export * from './converter/payload-codec'; diff --git a/packages/core-bridge/src/worker.rs b/packages/core-bridge/src/worker.rs index 7a418beca..872bfd60e 100644 --- a/packages/core-bridge/src/worker.rs +++ b/packages/core-bridge/src/worker.rs @@ -520,9 +520,7 @@ mod config { impl From for CorePollerBehavior { fn from(val: PollerBehavior) -> Self { match val { - PollerBehavior::SimpleMaximum { maximum } => { - Self::SimpleMaximum(maximum) - } + PollerBehavior::SimpleMaximum { maximum } => Self::SimpleMaximum(maximum), PollerBehavior::Autoscaling { minimum, maximum, @@ -771,10 +769,7 @@ mod custom_slot_supplier { slot_type: SK::kind().into(), task_queue: ctx.task_queue().to_string(), worker_identity: ctx.worker_identity().to_string(), - worker_deployment_version: ctx - .worker_deployment_version() - .clone() - .map(Into::into), + worker_deployment_version: ctx.worker_deployment_version().clone().map(Into::into), is_sticky: ctx.is_sticky(), }; diff --git a/packages/worker/src/activity.ts b/packages/worker/src/activity.ts index df0c10b1d..091d116bc 100644 --- a/packages/worker/src/activity.ts +++ b/packages/worker/src/activity.ts @@ -1,6 +1,8 @@ import 'abort-controller/polyfill'; // eslint-disable-line import/no-unassigned-import import { asyncLocalStorage, CompleteAsyncError, Context, Info } from '@temporalio/activity'; import { + ActivityCancellationDetails, + ActivityCancellationDetailsHolder, ActivityFunction, ApplicationFailure, CancelledFailure, @@ -34,8 +36,9 @@ export type CancelReason = export class Activity { protected cancelReason?: CancelReason; + protected cancellationDetails: ActivityCancellationDetailsHolder; public readonly context: Context; - public cancel: (reason: CancelReason) => void = () => undefined; + public cancel: (reason: CancelReason, details: ActivityCancellationDetails) => void = () => undefined; public readonly abortController: AbortController = new AbortController(); public readonly interceptors: { inbound: ActivityInboundCallsInterceptor[]; @@ -65,10 +68,11 @@ export class Activity { ) { this.workerLogger = LoggerWithComposedMetadata.compose(workerLogger, this.getLogAttributes.bind(this)); this.metricMeter = MetricMeterWithComposedTags.compose(workerMetricMeter, this.getMetricTags.bind(this)); - + this.cancellationDetails = {}; const promise = new Promise((_, reject) => { - this.cancel = (reason: CancelReason) => { + this.cancel = (reason: CancelReason, details: ActivityCancellationDetails) => { this.cancelReason = reason; + this.cancellationDetails.details = details; const err = new CancelledFailure(reason); this.abortController.abort(err); reject(err); @@ -81,7 +85,8 @@ export class Activity { this.heartbeatCallback, // This is the activity context logger, to be used exclusively from user code LoggerWithComposedMetadata.compose(this.workerLogger, { sdkComponent: SdkComponent.activity }), - this.metricMeter + this.metricMeter, + this.cancellationDetails ); // Prevent unhandled rejection promise.catch(() => undefined); diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index 7a1487d72..04d694321 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -33,6 +33,7 @@ import { TypedSearchAttributes, decodePriority, MetricMeter, + ActivityCancellationDetails, } from '@temporalio/common'; import { decodeArrayFromPayloads, @@ -988,7 +989,12 @@ export class Worker { base64TaskToken, details, onError() { - activity?.cancel('HEARTBEAT_DETAILS_CONVERSION_FAILED'); // activity must be defined + // activity must be defined + // empty cancellation details, not corresponding detail for heartbeat detail conversion failure + activity?.cancel( + 'HEARTBEAT_DETAILS_CONVERSION_FAILED', + ActivityCancellationDetails.fromProto(undefined) + ); }, }), this.logger, @@ -1027,11 +1033,15 @@ export class Worker { // NOTE: activity will not be considered cancelled until it confirms cancellation (by throwing a CancelledFailure) this.logger.trace('Cancelling activity', activityLogAttributes(activity.info)); const reason = task.cancel?.reason; + const cancellationDetails = task.cancel?.details; if (reason === undefined || reason === null) { // Special case of Lang side cancellation during shutdown (see `activity.shutdown.evict` above) - activity.cancel('WORKER_SHUTDOWN'); + activity.cancel('WORKER_SHUTDOWN', ActivityCancellationDetails.fromProto(cancellationDetails)); } else { - activity.cancel(coresdk.activity_task.ActivityCancelReason[reason] as CancelReason); + activity.cancel( + coresdk.activity_task.ActivityCancelReason[reason] as CancelReason, + ActivityCancellationDetails.fromProto(cancellationDetails) + ); } break; } From 8787f579107c5b7f4114230fac242e9bc2af38fe Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Mon, 2 Jun 2025 20:23:50 +0200 Subject: [PATCH 02/14] add pause error for external activity heartbeating --- packages/client/src/async-completion-client.ts | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/packages/client/src/async-completion-client.ts b/packages/client/src/async-completion-client.ts index fd41fd134..8a481bcc2 100644 --- a/packages/client/src/async-completion-client.ts +++ b/packages/client/src/async-completion-client.ts @@ -35,6 +35,13 @@ export class ActivityCompletionError extends Error {} @SymbolBasedInstanceOfError('ActivityCancelledError') export class ActivityCancelledError extends Error {} +/** + * Thrown by {@link AsyncCompletionClient.heartbeat} when the reporting Activity + * has been paused. + */ +@SymbolBasedInstanceOfError('ActivityPausedError') +export class ActivityPausedError extends Error {} + /** * Options used to configure {@link AsyncCompletionClient} */ @@ -211,6 +218,7 @@ export class AsyncCompletionClient extends BaseClient { async heartbeat(taskTokenOrFullActivityId: Uint8Array | FullActivityId, details?: unknown): Promise { const payloads = await encodeToPayloads(this.dataConverter, details); let cancelRequested = false; + let paused = false; try { if (taskTokenOrFullActivityId instanceof Uint8Array) { const response = await this.workflowService.recordActivityTaskHeartbeat({ @@ -220,6 +228,7 @@ export class AsyncCompletionClient extends BaseClient { details: { payloads }, }); cancelRequested = !!response.cancelRequested; + paused = !!response.activityPaused; } else { const response = await this.workflowService.recordActivityTaskHeartbeatById({ identity: this.options.identity, @@ -228,6 +237,7 @@ export class AsyncCompletionClient extends BaseClient { details: { payloads }, }); cancelRequested = !!response.cancelRequested; + paused = !!response.activityPaused; } } catch (err) { this.handleError(err); @@ -235,5 +245,8 @@ export class AsyncCompletionClient extends BaseClient { if (cancelRequested) { throw new ActivityCancelledError('cancelled'); } + if (paused) { + throw new ActivityPausedError('paused'); + } } } From 582e4f014cd5804998faa88e5006953c0b3a40df Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Mon, 2 Jun 2025 20:27:41 +0200 Subject: [PATCH 03/14] add logging for activity pause, emit application failure on activity pause --- packages/worker/src/activity.ts | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/packages/worker/src/activity.ts b/packages/worker/src/activity.ts index 091d116bc..93056fece 100644 --- a/packages/worker/src/activity.ts +++ b/packages/worker/src/activity.ts @@ -143,7 +143,11 @@ export class Activity { (error instanceof CancelledFailure || isAbortError(error)) && this.context.cancellationSignal.aborted ) { - this.workerLogger.debug('Activity completed as cancelled', { durationMs }); + if (this.context.cancellationDetails.details?.paused) { + this.workerLogger.debug('Activity paused', { durationMs }); + } else { + this.workerLogger.debug('Activity completed as cancelled', { durationMs }); + } } else if (error instanceof CompleteAsyncError) { this.workerLogger.debug('Activity will complete asynchronously', { durationMs }); } else { @@ -176,9 +180,21 @@ export class Activity { } else if (this.cancelReason) { // Either a CancelledFailure that we threw or AbortError from AbortController if (err instanceof CancelledFailure) { - const failure = await encodeErrorToFailure(this.dataConverter, err); - failure.stackTrace = undefined; - return { cancelled: { failure } }; + // If cancel due to activity pause, emit an application failure for the pause. + if (this.context.cancellationDetails.details?.paused) { + return { + failed: { + failure: await encodeErrorToFailure( + this.dataConverter, + new ApplicationFailure('Activity paused', 'ActivityPause') + ), + }, + }; + } else { + const failure = await encodeErrorToFailure(this.dataConverter, err); + failure.stackTrace = undefined; + return { cancelled: { failure } }; + } } else if (isAbortError(err)) { return { cancelled: { failure: { source: FAILURE_SOURCE, canceledFailureInfo: {} } } }; } From 9f7f0dafd4b797e9c51c7a8060a2f9c27d68dbab Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Mon, 2 Jun 2025 20:28:17 +0200 Subject: [PATCH 04/14] add 'reset' to activity cancellation details --- packages/common/src/activity-cancellation-details.ts | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/common/src/activity-cancellation-details.ts b/packages/common/src/activity-cancellation-details.ts index 0c41e30fc..f51d63f74 100644 --- a/packages/common/src/activity-cancellation-details.ts +++ b/packages/common/src/activity-cancellation-details.ts @@ -13,19 +13,22 @@ export class ActivityCancellationDetails { readonly paused: boolean; readonly timedOut: boolean; readonly workerShutdown: boolean; + readonly reset: boolean; private constructor( notFound: boolean = false, cancelRequested: boolean = false, paused: boolean = false, timedOut: boolean = false, - workerShutdown: boolean = false + workerShutdown: boolean = false, + reset: boolean = false ) { this.notFound = notFound; this.cancelRequested = cancelRequested; this.paused = paused; this.timedOut = timedOut; this.workerShutdown = workerShutdown; + this.reset = reset; } static fromProto( @@ -39,7 +42,8 @@ export class ActivityCancellationDetails { proto.isCancelled ?? false, proto.isPaused ?? false, proto.isTimedOut ?? false, - proto.isWorkerShutdown ?? false + proto.isWorkerShutdown ?? false, + proto.isReset ?? false ); } } From a9a93398b510ab40f68caf52de92eb807ed762f1 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Mon, 2 Jun 2025 20:28:31 +0200 Subject: [PATCH 05/14] add tests --- .../heartbeat-cancellation-details.ts | 24 ++++ packages/test/src/helpers-integration.ts | 45 ++++++- .../test/src/test-integration-workflows.ts | 126 +++++++++++++++++- .../src/mocking-activity-environment.ts | 2 +- 4 files changed, 193 insertions(+), 4 deletions(-) create mode 100644 packages/test/src/activities/heartbeat-cancellation-details.ts diff --git a/packages/test/src/activities/heartbeat-cancellation-details.ts b/packages/test/src/activities/heartbeat-cancellation-details.ts new file mode 100644 index 000000000..9a7d7daf4 --- /dev/null +++ b/packages/test/src/activities/heartbeat-cancellation-details.ts @@ -0,0 +1,24 @@ +import { ActivityCancellationDetails } from '@temporalio/common'; +import * as activity from '@temporalio/activity'; + +export async function heartbeatCancellationDetailsActivity( + catchErr: boolean +): Promise { + // Exit early if we've already run this activity. + if (activity.activityInfo().heartbeatDetails === 'finally-complete') { + return activity.cancellationDetails(); + } + // eslint-disable-next-line no-constant-condition + while (true) { + try { + activity.heartbeat(); + await activity.sleep(300); + } catch (err) { + if (err instanceof activity.CancelledFailure && catchErr) { + return activity.cancellationDetails(); + } + activity.heartbeat('finally-complete'); + throw err; + } + } +} diff --git a/packages/test/src/helpers-integration.ts b/packages/test/src/helpers-integration.ts index bbaad5ff8..7ae6e53fc 100644 --- a/packages/test/src/helpers-integration.ts +++ b/packages/test/src/helpers-integration.ts @@ -30,7 +30,7 @@ import * as workflow from '@temporalio/workflow'; import { temporal } from '@temporalio/proto'; import { defineSearchAttributeKey, SearchAttributeType } from '@temporalio/common/lib/search-attributes'; import { ConnectionInjectorInterceptor } from './activities/interceptors'; -import { Worker, TestWorkflowEnvironment, test as anyTest, bundlerOptions } from './helpers'; +import { Worker, TestWorkflowEnvironment, test as anyTest, bundlerOptions, waitUntil } from './helpers'; export interface Context { env: TestWorkflowEnvironment; @@ -38,6 +38,7 @@ export interface Context { } const defaultDynamicConfigOptions = [ + 'frontend.activityAPIsEnabled=true', 'frontend.enableExecuteMultiOperation=true', 'frontend.workerVersioningDataAPIs=true', 'frontend.workerVersioningWorkflowAPIs=true', @@ -284,6 +285,48 @@ export function configurableHelpers( }; } +export async function assertPendingActivityExistsEventually( + handle: WorkflowHandle, + activityId: string, + timeoutMs: number +): Promise { + let activityInfo: temporal.api.workflow.v1.IPendingActivityInfo | undefined; + try { + await waitUntil(async () => { + const desc = await handle.describe(); + activityInfo = desc.raw.pendingActivities?.find((pa) => pa.activityId === activityId); + return activityInfo !== undefined; + }, timeoutMs); + } catch { + throw new Error(`Unable to find pending activity for activity ${activityId}`); + } + return activityInfo as temporal.api.workflow.v1.IPendingActivityInfo; +} + +export async function setActivityPauseState(handle: WorkflowHandle, activityId: string, pause: boolean): Promise { + const desc = await handle.describe(); + const req = { + namespace: handle.client.options.namespace, + execution: { + workflowId: desc.raw.workflowExecutionInfo?.execution?.workflowId, + runId: desc.raw.workflowExecutionInfo?.execution?.runId, + }, + id: activityId, + }; + if (pause) { + await handle.client.workflowService.pauseActivity(req); + } else { + await handle.client.workflowService.unpauseActivity(req); + } + await waitUntil(async () => { + const info = await assertPendingActivityExistsEventually(handle, activityId, 10000); + if (pause) { + return info.paused ?? false; + } + return !info.paused; + }, 10000); +} + export function helpers(t: ExecutionContext, testEnv: TestWorkflowEnvironment = t.context.env): Helpers { return configurableHelpers(t, t.context.workflowBundle, testEnv); } diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index ea05b71f6..2ca7dc4a7 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -2,7 +2,7 @@ import { setTimeout as setTimeoutPromise } from 'timers/promises'; import { randomUUID } from 'crypto'; import { ExecutionContext } from 'ava'; import { firstValueFrom, Subject } from 'rxjs'; -import { WorkflowFailedError } from '@temporalio/client'; +import { WorkflowFailedError, WorkflowHandle } from '@temporalio/client'; import * as activity from '@temporalio/activity'; import { msToNumber, tsToMs } from '@temporalio/common/lib/time'; import { TestWorkflowEnvironment } from '@temporalio/testing'; @@ -11,6 +11,7 @@ import * as workflow from '@temporalio/workflow'; import { defineQuery, defineSignal } from '@temporalio/workflow'; import { SdkFlags } from '@temporalio/workflow/lib/flags'; import { + ActivityCancellationDetails, ActivityCancellationType, ApplicationFailure, defineSearchAttributeKey, @@ -22,9 +23,17 @@ import { import { signalSchedulingWorkflow } from './activities/helpers'; import { activityStartedSignal } from './workflows/definitions'; import * as workflows from './workflows'; -import { Context, createLocalTestEnvironment, helpers, makeTestFunction } from './helpers-integration'; +import { + assertPendingActivityExistsEventually, + Context, + createLocalTestEnvironment, + helpers, + makeTestFunction, + setActivityPauseState, +} from './helpers-integration'; import { overrideSdkInternalFlag } from './mock-internal-flags'; import { asSdkLoggerSink, loadHistory, RUN_TIME_SKIPPING_TESTS, waitUntil } from './helpers'; +import { heartbeatCancellationDetailsActivity } from './activities/heartbeat-cancellation-details'; const test = makeTestFunction({ workflowsPath: __filename, @@ -1414,3 +1423,116 @@ test('Workflow can return root workflow', async (t) => { t.deepEqual(result, 'empty test-root-workflow-length'); }); }); + +export async function heartbeatPauseWorkflow( + activityId: string, + catchErr: boolean, + maximumAttempts: number +): Promise> { + const { heartbeatCancellationDetailsActivity } = workflow.proxyActivities({ + startToCloseTimeout: '5s', + activityId, + retry: { + maximumAttempts, + }, + heartbeatTimeout: '1s', + }); + const { heartbeatCancellationDetailsActivity2 } = workflow.proxyActivities({ + startToCloseTimeout: '5s', + activityId: `${activityId}-2`, + retry: { + maximumAttempts, + }, + heartbeatTimeout: '1s', + }); + const results = []; + results.push( + await heartbeatCancellationDetailsActivity(catchErr), + await heartbeatCancellationDetailsActivity2(catchErr) + ); + return results; +} + +test('Activity pause returns expected cancellation details', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker({ + activities: { + heartbeatCancellationDetailsActivity, + heartbeatCancellationDetailsActivity2: heartbeatCancellationDetailsActivity, + }, + }); + + await worker.runUntil(async () => { + const testActivityId = randomUUID(); + const handle = await startWorkflow(heartbeatPauseWorkflow, { args: [testActivityId, true, 1] }); + + const activityInfo = await assertPendingActivityExistsEventually(handle, testActivityId, 5000); + t.true(activityInfo.paused === false); + await setActivityPauseState(handle, testActivityId, true); + const activityInfo2 = await assertPendingActivityExistsEventually(handle, `${testActivityId}-2`, 5000); + t.true(activityInfo2.paused === false); + await setActivityPauseState(handle, `${testActivityId}-2`, true); + const result = await handle.result(); + t.deepEqual(result[0], { + cancelRequested: false, + notFound: false, + paused: true, + timedOut: false, + workerShutdown: false, + reset: false, + }); + t.deepEqual(result[1], { + cancelRequested: false, + notFound: false, + paused: true, + timedOut: false, + workerShutdown: false, + reset: false, + }); + }); +}); + +test('Activity can pause and unpause', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + async function checkHeartbeatDetailsExist(handle: WorkflowHandle, activityId: string) { + const activityInfo = await assertPendingActivityExistsEventually(handle, activityId, 5000); + if (activityInfo.heartbeatDetails?.payloads) { + for (const payload of activityInfo.heartbeatDetails?.payloads || []) { + if (payload.data && payload.data?.length > 0) { + return true; + } + } + } + return false; + } + + const worker = await createWorker({ + activities: { + heartbeatCancellationDetailsActivity, + heartbeatCancellationDetailsActivity2: heartbeatCancellationDetailsActivity, + }, + }); + + await worker.runUntil(async () => { + const testActivityId = randomUUID(); + const handle = await startWorkflow(heartbeatPauseWorkflow, { args: [testActivityId, false, 2] }); + const activityInfo = await assertPendingActivityExistsEventually(handle, testActivityId, 5000); + t.true(activityInfo.paused === false); + await setActivityPauseState(handle, testActivityId, true); + await waitUntil(async () => { + return await checkHeartbeatDetailsExist(handle, testActivityId); + }, 5000); + await setActivityPauseState(handle, testActivityId, false); + const activityInfo2 = await assertPendingActivityExistsEventually(handle, `${testActivityId}-2`, 5000); + t.true(activityInfo2.paused === false); + await setActivityPauseState(handle, `${testActivityId}-2`, true); + await waitUntil(async () => { + return await checkHeartbeatDetailsExist(handle, `${testActivityId}-2`); + }, 5000); + await setActivityPauseState(handle, `${testActivityId}-2`, false); + const result = await handle.result(); + // Undefined values are converted to null by data converter. + t.true(result[0] === null); + t.true(result[1] === null); + }); +}); diff --git a/packages/testing/src/mocking-activity-environment.ts b/packages/testing/src/mocking-activity-environment.ts index b9f9c05b0..8b4344158 100644 --- a/packages/testing/src/mocking-activity-environment.ts +++ b/packages/testing/src/mocking-activity-environment.ts @@ -30,7 +30,7 @@ export interface MockActivityEnvironmentOptions { * will immediately be in a cancelled state. */ export class MockActivityEnvironment extends events.EventEmitter { - public cancel: (reason?: any) => void = () => undefined; + public cancel: (reason?: any, details?: any) => void = () => undefined; public readonly context: activity.Context; private readonly activity: Activity; From e79f51ecab90790e38db1c373851dd3e29b393a5 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Wed, 23 Jul 2025 14:41:58 -0400 Subject: [PATCH 06/14] make 'cancel' in mock activity env more well-defined, make cancellation details field in activity context private, make activity cancellation tests less flaky --- packages/activity/src/index.ts | 10 ++- .../src/activity-cancellation-details.ts | 46 +++++------ packages/test/src/helpers-integration.ts | 26 ++----- .../test/src/test-integration-workflows.ts | 78 +++++++++++++------ packages/test/src/test-mockactivityenv.ts | 2 +- .../src/mocking-activity-environment.ts | 18 ++++- packages/worker/src/activity.ts | 4 +- 7 files changed, 107 insertions(+), 77 deletions(-) diff --git a/packages/activity/src/index.ts b/packages/activity/src/index.ts index 2b97f9200..66c4f3e2f 100644 --- a/packages/activity/src/index.ts +++ b/packages/activity/src/index.ts @@ -301,7 +301,7 @@ export class Context { /** * Holder object for activity cancellation details */ - public readonly cancellationDetails: ActivityCancellationDetailsHolder; + private readonly _cancellationDetails: ActivityCancellationDetailsHolder; /** * **Not** meant to instantiated by Activity code, used by the worker. @@ -323,7 +323,7 @@ export class Context { this.heartbeatFn = heartbeat; this.log = log; this.metricMeter = metricMeter; - this.cancellationDetails = details; + this._cancellationDetails = details; } /** @@ -363,6 +363,10 @@ export class Context { }); return Promise.race([this.cancelled.finally(() => clearTimeout(handle)), timer]); }; + + public cancellationDetails(): ActivityCancellationDetails | undefined { + return this._cancellationDetails.details; + } } /** @@ -447,7 +451,7 @@ export function cancelled(): Promise { * Returns the cancellation details for this activity, if any. */ export function cancellationDetails(): ActivityCancellationDetails | undefined { - return Context.current().cancellationDetails.details; + return Context.current().cancellationDetails(); } /** diff --git a/packages/common/src/activity-cancellation-details.ts b/packages/common/src/activity-cancellation-details.ts index f51d63f74..63488ee88 100644 --- a/packages/common/src/activity-cancellation-details.ts +++ b/packages/common/src/activity-cancellation-details.ts @@ -4,6 +4,15 @@ export interface ActivityCancellationDetailsHolder { details?: ActivityCancellationDetails; } +export interface ActivityCancellationDetailsOptions { + notFound?: boolean; + cancelRequested?: boolean; + paused?: boolean; + timedOut?: boolean; + workerShutdown?: boolean; + reset?: boolean; +} + /** * Provides the reasons for the activity's cancellation. Cancellation details are set once and do not change once set. */ @@ -15,20 +24,13 @@ export class ActivityCancellationDetails { readonly workerShutdown: boolean; readonly reset: boolean; - private constructor( - notFound: boolean = false, - cancelRequested: boolean = false, - paused: boolean = false, - timedOut: boolean = false, - workerShutdown: boolean = false, - reset: boolean = false - ) { - this.notFound = notFound; - this.cancelRequested = cancelRequested; - this.paused = paused; - this.timedOut = timedOut; - this.workerShutdown = workerShutdown; - this.reset = reset; + public constructor(options: ActivityCancellationDetailsOptions = {}) { + this.notFound = options.notFound ?? false; + this.cancelRequested = options.cancelRequested ?? false; + this.paused = options.paused ?? false; + this.timedOut = options.timedOut ?? false; + this.workerShutdown = options.workerShutdown ?? false; + this.reset = options.reset ?? false; } static fromProto( @@ -37,13 +39,13 @@ export class ActivityCancellationDetails { if (proto == null) { return new ActivityCancellationDetails(); } - return new ActivityCancellationDetails( - proto.isNotFound ?? false, - proto.isCancelled ?? false, - proto.isPaused ?? false, - proto.isTimedOut ?? false, - proto.isWorkerShutdown ?? false, - proto.isReset ?? false - ); + return new ActivityCancellationDetails({ + notFound: proto.isNotFound ?? false, + cancelRequested: proto.isCancelled ?? false, + paused: proto.isPaused ?? false, + timedOut: proto.isTimedOut ?? false, + workerShutdown: proto.isWorkerShutdown ?? false, + reset: proto.isReset ?? false + }); } } diff --git a/packages/test/src/helpers-integration.ts b/packages/test/src/helpers-integration.ts index 7ae6e53fc..515427e7a 100644 --- a/packages/test/src/helpers-integration.ts +++ b/packages/test/src/helpers-integration.ts @@ -285,24 +285,6 @@ export function configurableHelpers( }; } -export async function assertPendingActivityExistsEventually( - handle: WorkflowHandle, - activityId: string, - timeoutMs: number -): Promise { - let activityInfo: temporal.api.workflow.v1.IPendingActivityInfo | undefined; - try { - await waitUntil(async () => { - const desc = await handle.describe(); - activityInfo = desc.raw.pendingActivities?.find((pa) => pa.activityId === activityId); - return activityInfo !== undefined; - }, timeoutMs); - } catch { - throw new Error(`Unable to find pending activity for activity ${activityId}`); - } - return activityInfo as temporal.api.workflow.v1.IPendingActivityInfo; -} - export async function setActivityPauseState(handle: WorkflowHandle, activityId: string, pause: boolean): Promise { const desc = await handle.describe(); const req = { @@ -319,11 +301,13 @@ export async function setActivityPauseState(handle: WorkflowHandle, activityId: await handle.client.workflowService.unpauseActivity(req); } await waitUntil(async () => { - const info = await assertPendingActivityExistsEventually(handle, activityId, 10000); + const { raw } = await handle.describe(); + const activityInfo = raw.pendingActivities?.find((act) => act.activityId === activityId); + if (!activityInfo) return false; if (pause) { - return info.paused ?? false; + return activityInfo.paused ?? false; } - return !info.paused; + return !activityInfo.paused; }, 10000); } diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index 2ca7dc4a7..947a8f827 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -24,7 +24,6 @@ import { signalSchedulingWorkflow } from './activities/helpers'; import { activityStartedSignal } from './workflows/definitions'; import * as workflows from './workflows'; import { - assertPendingActivityExistsEventually, Context, createLocalTestEnvironment, helpers, @@ -1424,6 +1423,9 @@ test('Workflow can return root workflow', async (t) => { }); }); +export const activityStartedQuery = workflow.defineQuery('activityStarted'); +export const proceedSignal = workflow.defineSignal<[]>('proceed'); + export async function heartbeatPauseWorkflow( activityId: string, catchErr: boolean, @@ -1445,12 +1447,34 @@ export async function heartbeatPauseWorkflow( }, heartbeatTimeout: '1s', }); - const results = []; - results.push( - await heartbeatCancellationDetailsActivity(catchErr), - await heartbeatCancellationDetailsActivity2(catchErr) - ); - return results; + + let activity1Started = false; + let activity2Started = false; + + workflow.setHandler(activityStartedQuery, (num) => { + if (num === 1) return activity1Started; + return activity2Started; + }); + + let proceed = false; + workflow.setHandler(proceedSignal, () => { + proceed = true; + }); + + activity1Started = true; + const promise1 = heartbeatCancellationDetailsActivity(catchErr); + + // Wait for the test to pause activity 1 and signal us to continue + await workflow.condition(() => proceed); + proceed = false; // reset for next step + + activity2Started = true; + const promise2 = heartbeatCancellationDetailsActivity2(catchErr); + + // Wait for the test to pause activity 2 and signal us to continue + await workflow.condition(() => proceed); + + return Promise.all([promise1, promise2]); } test('Activity pause returns expected cancellation details', async (t) => { @@ -1466,12 +1490,14 @@ test('Activity pause returns expected cancellation details', async (t) => { const testActivityId = randomUUID(); const handle = await startWorkflow(heartbeatPauseWorkflow, { args: [testActivityId, true, 1] }); - const activityInfo = await assertPendingActivityExistsEventually(handle, testActivityId, 5000); - t.true(activityInfo.paused === false); + await waitUntil(async () => handle.query(activityStartedQuery, 1), 5000); await setActivityPauseState(handle, testActivityId, true); - const activityInfo2 = await assertPendingActivityExistsEventually(handle, `${testActivityId}-2`, 5000); - t.true(activityInfo2.paused === false); + await handle.signal(proceedSignal); + + await waitUntil(async () => handle.query(activityStartedQuery, 2), 5000); await setActivityPauseState(handle, `${testActivityId}-2`, true); + await handle.signal(proceedSignal); + const result = await handle.result(); t.deepEqual(result[0], { cancelRequested: false, @@ -1494,11 +1520,14 @@ test('Activity pause returns expected cancellation details', async (t) => { test('Activity can pause and unpause', async (t) => { const { createWorker, startWorkflow } = helpers(t); - async function checkHeartbeatDetailsExist(handle: WorkflowHandle, activityId: string) { - const activityInfo = await assertPendingActivityExistsEventually(handle, activityId, 5000); + async function checkHeartbeatDetailsExist(handle: WorkflowHandle, activityId: string): Promise { + const { raw } = await handle.describe(); + const activityInfo = raw.pendingActivities?.find((act) => act.activityId === activityId); + if (!activityInfo) return false; + if (activityInfo.heartbeatDetails?.payloads) { - for (const payload of activityInfo.heartbeatDetails?.payloads || []) { - if (payload.data && payload.data?.length > 0) { + for (const payload of activityInfo.heartbeatDetails.payloads) { + if (payload.data && payload.data.length > 0) { return true; } } @@ -1516,20 +1545,19 @@ test('Activity can pause and unpause', async (t) => { await worker.runUntil(async () => { const testActivityId = randomUUID(); const handle = await startWorkflow(heartbeatPauseWorkflow, { args: [testActivityId, false, 2] }); - const activityInfo = await assertPendingActivityExistsEventually(handle, testActivityId, 5000); - t.true(activityInfo.paused === false); + + await waitUntil(async () => handle.query(activityStartedQuery, 1), 5000); await setActivityPauseState(handle, testActivityId, true); - await waitUntil(async () => { - return await checkHeartbeatDetailsExist(handle, testActivityId); - }, 5000); + await waitUntil(async () => checkHeartbeatDetailsExist(handle, testActivityId), 5000); await setActivityPauseState(handle, testActivityId, false); - const activityInfo2 = await assertPendingActivityExistsEventually(handle, `${testActivityId}-2`, 5000); - t.true(activityInfo2.paused === false); + await handle.signal(proceedSignal); + + await waitUntil(async () => handle.query(activityStartedQuery, 2), 5000); await setActivityPauseState(handle, `${testActivityId}-2`, true); - await waitUntil(async () => { - return await checkHeartbeatDetailsExist(handle, `${testActivityId}-2`); - }, 5000); + await waitUntil(async () => checkHeartbeatDetailsExist(handle, `${testActivityId}-2`), 5000); await setActivityPauseState(handle, `${testActivityId}-2`, false); + await handle.signal(proceedSignal); + const result = await handle.result(); // Undefined values are converted to null by data converter. t.true(result[0] === null); diff --git a/packages/test/src/test-mockactivityenv.ts b/packages/test/src/test-mockactivityenv.ts index 6811cf06b..feb50b543 100644 --- a/packages/test/src/test-mockactivityenv.ts +++ b/packages/test/src/test-mockactivityenv.ts @@ -24,7 +24,7 @@ test('MockActivityEnvironment emits heartbeat events and can be cancelled', asyn const env = new MockActivityEnvironment(); env.on('heartbeat', (d: unknown) => { if (d === 6) { - env.cancel('test'); + env.cancel('CANCELLED'); } }); await t.throwsAsync( diff --git a/packages/testing/src/mocking-activity-environment.ts b/packages/testing/src/mocking-activity-environment.ts index 8b4344158..189375cc7 100644 --- a/packages/testing/src/mocking-activity-environment.ts +++ b/packages/testing/src/mocking-activity-environment.ts @@ -9,10 +9,11 @@ import { defaultPayloadConverter, MetricMeter, noopMetricMeter, + ActivityCancellationDetails, } from '@temporalio/common'; import { LoggerWithComposedMetadata } from '@temporalio/common/lib/logger'; import { ActivityInterceptorsFactory, DefaultLogger } from '@temporalio/worker'; -import { Activity } from '@temporalio/worker/lib/activity'; +import { Activity, CancelReason } from '@temporalio/worker/lib/activity'; export interface MockActivityEnvironmentOptions { interceptors?: ActivityInterceptorsFactory[]; @@ -30,7 +31,10 @@ export interface MockActivityEnvironmentOptions { * will immediately be in a cancelled state. */ export class MockActivityEnvironment extends events.EventEmitter { - public cancel: (reason?: any, details?: any) => void = () => undefined; + public cancel: ( + reason?: CancelReason, + details?: ActivityCancellationDetails + ) => void = () => undefined; public readonly context: activity.Context; private readonly activity: Activity; @@ -52,7 +56,15 @@ export class MockActivityEnvironment extends events.EventEmitter { opts?.interceptors ?? [] ); this.context = this.activity.context; - this.cancel = this.activity.cancel; + this.cancel = ( + reason?: CancelReason, + details?: ActivityCancellationDetails + ) => { + // Default to CANCELLED if nothing provided. + const r = reason ?? "CANCELLED"; + const d = details ?? new ActivityCancellationDetails({ cancelRequested: true }); + this.activity.cancel(r, d); + } } /** diff --git a/packages/worker/src/activity.ts b/packages/worker/src/activity.ts index 7a501b0e9..a36f1f13c 100644 --- a/packages/worker/src/activity.ts +++ b/packages/worker/src/activity.ts @@ -144,7 +144,7 @@ export class Activity { (error instanceof CancelledFailure || isAbortError(error)) && this.context.cancellationSignal.aborted ) { - if (this.context.cancellationDetails.details?.paused) { + if (this.context.cancellationDetails()?.paused) { this.workerLogger.debug('Activity paused', { durationMs }); } else { this.workerLogger.debug('Activity completed as cancelled', { durationMs }); @@ -187,7 +187,7 @@ export class Activity { // Either a CancelledFailure that we threw or AbortError from AbortController if (err instanceof CancelledFailure) { // If cancel due to activity pause, emit an application failure for the pause. - if (this.context.cancellationDetails.details?.paused) { + if (this.context.cancellationDetails()?.paused) { return { failed: { failure: await encodeErrorToFailure( From 589cae87f62eb818899af58acf0d4d856e6e0130 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Wed, 23 Jul 2025 14:48:25 -0400 Subject: [PATCH 07/14] formatting/linting --- .../common/src/activity-cancellation-details.ts | 2 +- .../testing/src/mocking-activity-environment.ts | 16 +++++----------- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/packages/common/src/activity-cancellation-details.ts b/packages/common/src/activity-cancellation-details.ts index 63488ee88..ae2a36500 100644 --- a/packages/common/src/activity-cancellation-details.ts +++ b/packages/common/src/activity-cancellation-details.ts @@ -45,7 +45,7 @@ export class ActivityCancellationDetails { paused: proto.isPaused ?? false, timedOut: proto.isTimedOut ?? false, workerShutdown: proto.isWorkerShutdown ?? false, - reset: proto.isReset ?? false + reset: proto.isReset ?? false, }); } } diff --git a/packages/testing/src/mocking-activity-environment.ts b/packages/testing/src/mocking-activity-environment.ts index 189375cc7..943f5f4b7 100644 --- a/packages/testing/src/mocking-activity-environment.ts +++ b/packages/testing/src/mocking-activity-environment.ts @@ -31,10 +31,7 @@ export interface MockActivityEnvironmentOptions { * will immediately be in a cancelled state. */ export class MockActivityEnvironment extends events.EventEmitter { - public cancel: ( - reason?: CancelReason, - details?: ActivityCancellationDetails - ) => void = () => undefined; + public cancel: (reason?: CancelReason, details?: ActivityCancellationDetails) => void = () => undefined; public readonly context: activity.Context; private readonly activity: Activity; @@ -56,15 +53,12 @@ export class MockActivityEnvironment extends events.EventEmitter { opts?.interceptors ?? [] ); this.context = this.activity.context; - this.cancel = ( - reason?: CancelReason, - details?: ActivityCancellationDetails - ) => { + this.cancel = (reason?: CancelReason, details?: ActivityCancellationDetails) => { // Default to CANCELLED if nothing provided. - const r = reason ?? "CANCELLED"; + const r = reason ?? 'CANCELLED'; const d = details ?? new ActivityCancellationDetails({ cancelRequested: true }); - this.activity.cancel(r, d); - } + this.activity.cancel(r, d); + }; } /** From 5e6201307b46a432040617b4221d9a46b9259e29 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Wed, 23 Jul 2025 15:05:01 -0400 Subject: [PATCH 08/14] mock activity hb test fix --- packages/test/src/test-mockactivityenv.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/test/src/test-mockactivityenv.ts b/packages/test/src/test-mockactivityenv.ts index feb50b543..dd8916b25 100644 --- a/packages/test/src/test-mockactivityenv.ts +++ b/packages/test/src/test-mockactivityenv.ts @@ -35,7 +35,7 @@ test('MockActivityEnvironment emits heartbeat events and can be cancelled', asyn }, 3), { instanceOf: activity.CancelledFailure, - message: 'test', + message: 'CANCELLED', } ); }); From d62e1399a6825218f2a4ad7bf1b44f5ccbfdbd77 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Wed, 23 Jul 2025 15:39:57 -0400 Subject: [PATCH 09/14] bump pause helper timeout --- packages/test/src/helpers-integration.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/test/src/helpers-integration.ts b/packages/test/src/helpers-integration.ts index 515427e7a..679606938 100644 --- a/packages/test/src/helpers-integration.ts +++ b/packages/test/src/helpers-integration.ts @@ -308,7 +308,7 @@ export async function setActivityPauseState(handle: WorkflowHandle, activityId: return activityInfo.paused ?? false; } return !activityInfo.paused; - }, 10000); + }, 15000); } export function helpers(t: ExecutionContext, testEnv: TestWorkflowEnvironment = t.context.env): Helpers { From e1431f254703193331209d5a3ba630432807320e Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Thu, 31 Jul 2025 16:43:11 -0400 Subject: [PATCH 10/14] address review suggestions --- packages/activity/src/index.ts | 17 +++++++++++++---- packages/common/src/index.ts | 2 +- packages/worker/src/activity.ts | 6 +++--- 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/packages/activity/src/index.ts b/packages/activity/src/index.ts index 66c4f3e2f..32f80da13 100644 --- a/packages/activity/src/index.ts +++ b/packages/activity/src/index.ts @@ -77,11 +77,11 @@ import { LogMetadata, MetricMeter, Priority, - ActivityCancellationDetailsHolder, ActivityCancellationDetails, } from '@temporalio/common'; import { msToNumber } from '@temporalio/common/lib/time'; import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers'; +import { ActivityCancellationDetailsHolder } from '@temporalio/common/lib/activity-cancellation-details'; export { ActivityFunction, @@ -364,7 +364,13 @@ export class Context { return Promise.race([this.cancelled.finally(() => clearTimeout(handle)), timer]); }; - public cancellationDetails(): ActivityCancellationDetails | undefined { + /** + * Return the cancellation details for this activity, if any. + * @returns an object with boolean properties that describes the reason for cancellation, or undefined if not cancelled. + * + * @experimental Activity cancellation details include usage of experimental features such as activity pause, and may be subject to change. + */ + public get cancellationDetails(): ActivityCancellationDetails | undefined { return this._cancellationDetails.details; } } @@ -448,10 +454,13 @@ export function cancelled(): Promise { } /** - * Returns the cancellation details for this activity, if any. + * Return the cancellation details for this activity, if any. + * @returns an object with boolean properties that describes the reason for cancellation, or undefined if not cancelled. + * + * @experimental Activity cancellation details include usage of experimental features such as activity pause, and may be subject to change. */ export function cancellationDetails(): ActivityCancellationDetails | undefined { - return Context.current().cancellationDetails(); + return Context.current().cancellationDetails; } /** diff --git a/packages/common/src/index.ts b/packages/common/src/index.ts index 2c9c50b08..095023ad8 100644 --- a/packages/common/src/index.ts +++ b/packages/common/src/index.ts @@ -8,7 +8,7 @@ import * as encoding from './encoding'; import * as helpers from './type-helpers'; export * from './activity-options'; -export * from './activity-cancellation-details'; +export { ActivityCancellationDetailsOptions, ActivityCancellationDetails } from './activity-cancellation-details'; export * from './converter/data-converter'; export * from './converter/failure-converter'; export * from './converter/payload-codec'; diff --git a/packages/worker/src/activity.ts b/packages/worker/src/activity.ts index a36f1f13c..59388abf1 100644 --- a/packages/worker/src/activity.ts +++ b/packages/worker/src/activity.ts @@ -2,7 +2,6 @@ import 'abort-controller/polyfill'; // eslint-disable-line import/no-unassigned- import { asyncLocalStorage, CompleteAsyncError, Context, Info } from '@temporalio/activity'; import { ActivityCancellationDetails, - ActivityCancellationDetailsHolder, ActivityFunction, ApplicationFailure, ApplicationFailureCategory, @@ -21,6 +20,7 @@ import { isAbortError } from '@temporalio/common/lib/type-helpers'; import { Logger, LoggerWithComposedMetadata } from '@temporalio/common/lib/logger'; import { MetricMeterWithComposedTags } from '@temporalio/common/lib/metrics'; import { coresdk } from '@temporalio/proto'; +import { ActivityCancellationDetailsHolder } from '@temporalio/common/lib/activity-cancellation-details'; import { ActivityExecuteInput, ActivityInboundCallsInterceptor, @@ -144,7 +144,7 @@ export class Activity { (error instanceof CancelledFailure || isAbortError(error)) && this.context.cancellationSignal.aborted ) { - if (this.context.cancellationDetails()?.paused) { + if (this.context.cancellationDetails?.paused) { this.workerLogger.debug('Activity paused', { durationMs }); } else { this.workerLogger.debug('Activity completed as cancelled', { durationMs }); @@ -187,7 +187,7 @@ export class Activity { // Either a CancelledFailure that we threw or AbortError from AbortController if (err instanceof CancelledFailure) { // If cancel due to activity pause, emit an application failure for the pause. - if (this.context.cancellationDetails()?.paused) { + if (this.context.cancellationDetails?.paused) { return { failed: { failure: await encodeErrorToFailure( From 95697140fe7730e94bb2e832cf9fee536dc2be82 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Thu, 31 Jul 2025 17:29:11 -0400 Subject: [PATCH 11/14] ignore ts prune --- packages/common/src/activity-cancellation-details.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/common/src/activity-cancellation-details.ts b/packages/common/src/activity-cancellation-details.ts index ae2a36500..e4ed1845c 100644 --- a/packages/common/src/activity-cancellation-details.ts +++ b/packages/common/src/activity-cancellation-details.ts @@ -1,5 +1,6 @@ import type { coresdk } from '@temporalio/proto'; +// ts-prune-ignore-next export interface ActivityCancellationDetailsHolder { details?: ActivityCancellationDetails; } From 3ee7227b21f02a2adbed4df34205fc8c9c75e6a5 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Wed, 13 Aug 2025 11:14:28 -0400 Subject: [PATCH 12/14] simplify cancellation details tests, reduce flakiness --- .../heartbeat-cancellation-details.ts | 2 +- packages/test/src/helpers-integration.ts | 22 ++++ .../test/src/test-integration-workflows.ts | 115 +++++++++--------- 3 files changed, 78 insertions(+), 61 deletions(-) diff --git a/packages/test/src/activities/heartbeat-cancellation-details.ts b/packages/test/src/activities/heartbeat-cancellation-details.ts index 9a7d7daf4..18a147f09 100644 --- a/packages/test/src/activities/heartbeat-cancellation-details.ts +++ b/packages/test/src/activities/heartbeat-cancellation-details.ts @@ -11,7 +11,7 @@ export async function heartbeatCancellationDetailsActivity( // eslint-disable-next-line no-constant-condition while (true) { try { - activity.heartbeat(); + activity.heartbeat('heartbeated'); await activity.sleep(300); } catch (err) { if (err instanceof activity.CancelledFailure && catchErr) { diff --git a/packages/test/src/helpers-integration.ts b/packages/test/src/helpers-integration.ts index 679606938..c9a20f666 100644 --- a/packages/test/src/helpers-integration.ts +++ b/packages/test/src/helpers-integration.ts @@ -311,6 +311,28 @@ export async function setActivityPauseState(handle: WorkflowHandle, activityId: }, 15000); } +// Helper function to check if an activity has heartbeated +export async function hasActivityHeartbeat( + handle: WorkflowHandle, + activityId: string, + expectedContent?: string +): Promise { + const { raw } = await handle.describe(); + const activityInfo = raw.pendingActivities?.find((act) => act.activityId === activityId); + const heartbeatData = activityInfo?.heartbeatDetails?.payloads?.[0]?.data; + if (!heartbeatData) return false; + + // If no expected content specified, just check that heartbeat data exists + if (!expectedContent) return true; + + try { + const decoded = Buffer.from(heartbeatData).toString(); + return decoded.includes(expectedContent); + } catch { + return false; + } +} + export function helpers(t: ExecutionContext, testEnv: TestWorkflowEnvironment = t.context.env): Helpers { return configurableHelpers(t, t.context.workflowBundle, testEnv); } diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index b7cdb3b33..2a6a8fc33 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -2,7 +2,7 @@ import { setTimeout as setTimeoutPromise } from 'timers/promises'; import { randomUUID } from 'crypto'; import { ExecutionContext } from 'ava'; import { firstValueFrom, Subject } from 'rxjs'; -import { WorkflowFailedError, WorkflowHandle } from '@temporalio/client'; +import { WorkflowFailedError } from '@temporalio/client'; import * as activity from '@temporalio/activity'; import { msToNumber, tsToMs } from '@temporalio/common/lib/time'; import { TestWorkflowEnvironment } from '@temporalio/testing'; @@ -27,6 +27,7 @@ import * as workflows from './workflows'; import { Context, createLocalTestEnvironment, + hasActivityHeartbeat, helpers, makeTestFunction, setActivityPauseState, @@ -1449,6 +1450,55 @@ test('Workflow can return root workflow', async (t) => { }); }); +export async function heartbeatPauseWorkflowBasic( + activityId: string, + catchErr: boolean +): Promise { + const { heartbeatCancellationDetailsActivity } = workflow.proxyActivities({ + startToCloseTimeout: '5s', + activityId, + retry: { + maximumAttempts: 1, + }, + heartbeatTimeout: '1s', + }); + + return await heartbeatCancellationDetailsActivity(catchErr); +} + +test('Activity pause returns expected cancellation details', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + + const worker = await createWorker({ + activities: { + heartbeatCancellationDetailsActivity, + }, + }); + + await worker.runUntil(async () => { + const testActivityId = randomUUID(); + const handle = await startWorkflow(heartbeatPauseWorkflowBasic, { + args: [testActivityId, true], + }); + + // Wait for activity to start heartbeating + await waitUntil(async () => hasActivityHeartbeat(handle, testActivityId, 'heartbeated'), 5000); + // Now pause the activity + await setActivityPauseState(handle, testActivityId, true); + // Get the result - should contain pause cancellation details + const result = await handle.result(); + + t.deepEqual(result, { + cancelRequested: false, + notFound: false, + paused: true, + timedOut: false, + workerShutdown: false, + reset: false, + }); + }); +}); + export const activityStartedQuery = workflow.defineQuery('activityStarted'); export const proceedSignal = workflow.defineSignal<[]>('proceed'); @@ -1487,15 +1537,15 @@ export async function heartbeatPauseWorkflow( proceed = true; }); - activity1Started = true; const promise1 = heartbeatCancellationDetailsActivity(catchErr); + activity1Started = true; // Wait for the test to pause activity 1 and signal us to continue await workflow.condition(() => proceed); proceed = false; // reset for next step - activity2Started = true; const promise2 = heartbeatCancellationDetailsActivity2(catchErr); + activity2Started = true; // Wait for the test to pause activity 2 and signal us to continue await workflow.condition(() => proceed); @@ -1503,63 +1553,8 @@ export async function heartbeatPauseWorkflow( return Promise.all([promise1, promise2]); } -test('Activity pause returns expected cancellation details', async (t) => { - const { createWorker, startWorkflow } = helpers(t); - const worker = await createWorker({ - activities: { - heartbeatCancellationDetailsActivity, - heartbeatCancellationDetailsActivity2: heartbeatCancellationDetailsActivity, - }, - }); - - await worker.runUntil(async () => { - const testActivityId = randomUUID(); - const handle = await startWorkflow(heartbeatPauseWorkflow, { args: [testActivityId, true, 1] }); - - await waitUntil(async () => handle.query(activityStartedQuery, 1), 5000); - await setActivityPauseState(handle, testActivityId, true); - await handle.signal(proceedSignal); - - await waitUntil(async () => handle.query(activityStartedQuery, 2), 5000); - await setActivityPauseState(handle, `${testActivityId}-2`, true); - await handle.signal(proceedSignal); - - const result = await handle.result(); - t.deepEqual(result[0], { - cancelRequested: false, - notFound: false, - paused: true, - timedOut: false, - workerShutdown: false, - reset: false, - }); - t.deepEqual(result[1], { - cancelRequested: false, - notFound: false, - paused: true, - timedOut: false, - workerShutdown: false, - reset: false, - }); - }); -}); - test('Activity can pause and unpause', async (t) => { const { createWorker, startWorkflow } = helpers(t); - async function checkHeartbeatDetailsExist(handle: WorkflowHandle, activityId: string): Promise { - const { raw } = await handle.describe(); - const activityInfo = raw.pendingActivities?.find((act) => act.activityId === activityId); - if (!activityInfo) return false; - - if (activityInfo.heartbeatDetails?.payloads) { - for (const payload of activityInfo.heartbeatDetails.payloads) { - if (payload.data && payload.data.length > 0) { - return true; - } - } - } - return false; - } const worker = await createWorker({ activities: { @@ -1574,13 +1569,13 @@ test('Activity can pause and unpause', async (t) => { await waitUntil(async () => handle.query(activityStartedQuery, 1), 5000); await setActivityPauseState(handle, testActivityId, true); - await waitUntil(async () => checkHeartbeatDetailsExist(handle, testActivityId), 5000); + await waitUntil(async () => hasActivityHeartbeat(handle, testActivityId, 'finally-complete'), 5000); await setActivityPauseState(handle, testActivityId, false); await handle.signal(proceedSignal); await waitUntil(async () => handle.query(activityStartedQuery, 2), 5000); await setActivityPauseState(handle, `${testActivityId}-2`, true); - await waitUntil(async () => checkHeartbeatDetailsExist(handle, `${testActivityId}-2`), 5000); + await waitUntil(async () => hasActivityHeartbeat(handle, `${testActivityId}-2`, 'finally-complete'), 5000); await setActivityPauseState(handle, `${testActivityId}-2`, false); await handle.signal(proceedSignal); From e93afc327ead92d46f960a264b00d32b26c7b5e3 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Fri, 15 Aug 2025 11:21:10 -0400 Subject: [PATCH 13/14] simplify further, eliminate flakiness --- packages/test/src/helpers-integration.ts | 13 ++- .../test/src/test-integration-workflows.ts | 99 +++++-------------- 2 files changed, 32 insertions(+), 80 deletions(-) diff --git a/packages/test/src/helpers-integration.ts b/packages/test/src/helpers-integration.ts index c9a20f666..70af5d574 100644 --- a/packages/test/src/helpers-integration.ts +++ b/packages/test/src/helpers-integration.ts @@ -303,11 +303,14 @@ export async function setActivityPauseState(handle: WorkflowHandle, activityId: await waitUntil(async () => { const { raw } = await handle.describe(); const activityInfo = raw.pendingActivities?.find((act) => act.activityId === activityId); - if (!activityInfo) return false; - if (pause) { - return activityInfo.paused ?? false; - } - return !activityInfo.paused; + // If we are pausing: success when either + // • paused flag is true OR + // • the activity vanished (it completed / retried) + if (pause) return activityInfo ? activityInfo.paused ?? false : true; + // If we are unpausing: success when either + // • paused flag is false OR + // • the activity vanished (already completed) + return activityInfo ? !activityInfo.paused : true; }, 15000); } diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index 19bdf14a8..976849482 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -1418,15 +1418,16 @@ test('Workflow can return root workflow', async (t) => { }); }); -export async function heartbeatPauseWorkflowBasic( +export async function heartbeatPauseWorkflow( activityId: string, - catchErr: boolean + catchErr: boolean, + maximumAttempts: number ): Promise { const { heartbeatCancellationDetailsActivity } = workflow.proxyActivities({ startToCloseTimeout: '5s', activityId, retry: { - maximumAttempts: 1, + maximumAttempts, }, heartbeatTimeout: '1s', }); @@ -1445,12 +1446,18 @@ test('Activity pause returns expected cancellation details', async (t) => { await worker.runUntil(async () => { const testActivityId = randomUUID(); - const handle = await startWorkflow(heartbeatPauseWorkflowBasic, { - args: [testActivityId, true], + const handle = await startWorkflow(heartbeatPauseWorkflow, { + args: [testActivityId, true, 1], }); - // Wait for activity to start heartbeating - await waitUntil(async () => hasActivityHeartbeat(handle, testActivityId, 'heartbeated'), 5000); + // Wait for activity to appear in pending activities AND start heartbeating + await waitUntil(async () => { + const { raw } = await handle.describe(); + const activityInfo = raw.pendingActivities?.find((act) => act.activityId === testActivityId); + // Check both: activity exists and has heartbeated + return !!(activityInfo && (await hasActivityHeartbeat(handle, testActivityId, 'heartbeated'))); + }, 10000); + // Now pause the activity await setActivityPauseState(handle, testActivityId, true); // Get the result - should contain pause cancellation details @@ -1467,67 +1474,12 @@ test('Activity pause returns expected cancellation details', async (t) => { }); }); -export const activityStartedQuery = workflow.defineQuery('activityStarted'); -export const proceedSignal = workflow.defineSignal<[]>('proceed'); - -export async function heartbeatPauseWorkflow( - activityId: string, - catchErr: boolean, - maximumAttempts: number -): Promise> { - const { heartbeatCancellationDetailsActivity } = workflow.proxyActivities({ - startToCloseTimeout: '5s', - activityId, - retry: { - maximumAttempts, - }, - heartbeatTimeout: '1s', - }); - const { heartbeatCancellationDetailsActivity2 } = workflow.proxyActivities({ - startToCloseTimeout: '5s', - activityId: `${activityId}-2`, - retry: { - maximumAttempts, - }, - heartbeatTimeout: '1s', - }); - - let activity1Started = false; - let activity2Started = false; - - workflow.setHandler(activityStartedQuery, (num) => { - if (num === 1) return activity1Started; - return activity2Started; - }); - - let proceed = false; - workflow.setHandler(proceedSignal, () => { - proceed = true; - }); - - const promise1 = heartbeatCancellationDetailsActivity(catchErr); - activity1Started = true; - - // Wait for the test to pause activity 1 and signal us to continue - await workflow.condition(() => proceed); - proceed = false; // reset for next step - - const promise2 = heartbeatCancellationDetailsActivity2(catchErr); - activity2Started = true; - - // Wait for the test to pause activity 2 and signal us to continue - await workflow.condition(() => proceed); - - return Promise.all([promise1, promise2]); -} - -test('Activity can pause and unpause', async (t) => { +test('Activity can be cancelled via pause and retry after unpause', async (t) => { const { createWorker, startWorkflow } = helpers(t); const worker = await createWorker({ activities: { heartbeatCancellationDetailsActivity, - heartbeatCancellationDetailsActivity2: heartbeatCancellationDetailsActivity, }, }); @@ -1535,22 +1487,19 @@ test('Activity can pause and unpause', async (t) => { const testActivityId = randomUUID(); const handle = await startWorkflow(heartbeatPauseWorkflow, { args: [testActivityId, false, 2] }); - await waitUntil(async () => handle.query(activityStartedQuery, 1), 5000); + // Wait for it to exist and heartbeat + await waitUntil(async () => { + const { raw } = await handle.describe(); + const activityInfo = raw.pendingActivities?.find((act) => act.activityId === testActivityId); + return !!(activityInfo && (await hasActivityHeartbeat(handle, testActivityId, 'heartbeated'))); + }, 10000); + await setActivityPauseState(handle, testActivityId, true); - await waitUntil(async () => hasActivityHeartbeat(handle, testActivityId, 'finally-complete'), 5000); + await waitUntil(async () => hasActivityHeartbeat(handle, testActivityId, 'finally-complete'), 10000); await setActivityPauseState(handle, testActivityId, false); - await handle.signal(proceedSignal); - - await waitUntil(async () => handle.query(activityStartedQuery, 2), 5000); - await setActivityPauseState(handle, `${testActivityId}-2`, true); - await waitUntil(async () => hasActivityHeartbeat(handle, `${testActivityId}-2`, 'finally-complete'), 5000); - await setActivityPauseState(handle, `${testActivityId}-2`, false); - await handle.signal(proceedSignal); const result = await handle.result(); - // Undefined values are converted to null by data converter. - t.true(result[0] === null); - t.true(result[1] === null); + t.true(result == null); }); }); From 30f050d3b36a78d763520017ce09f645a37adcd5 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Fri, 15 Aug 2025 11:37:36 -0400 Subject: [PATCH 14/14] linting --- packages/common/src/internal-non-workflow/codec-helpers.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/common/src/internal-non-workflow/codec-helpers.ts b/packages/common/src/internal-non-workflow/codec-helpers.ts index 7f57e314b..70d3bd3de 100644 --- a/packages/common/src/internal-non-workflow/codec-helpers.ts +++ b/packages/common/src/internal-non-workflow/codec-helpers.ts @@ -1,5 +1,5 @@ import { Payload } from '../interfaces'; -import { arrayFromPayloads, fromPayloadsAtIndex, PayloadConverter, toPayloads } from '../converter/payload-converter'; +import { arrayFromPayloads, fromPayloadsAtIndex, toPayloads } from '../converter/payload-converter'; import { PayloadConverterError } from '../errors'; import { PayloadCodec } from '../converter/payload-codec'; import { ProtoFailure } from '../failure';