diff --git a/packages/test/src/activities/heartbeat-cancellation-details.ts b/packages/test/src/activities/heartbeat-cancellation-details.ts index 18a147f09..f02cb600d 100644 --- a/packages/test/src/activities/heartbeat-cancellation-details.ts +++ b/packages/test/src/activities/heartbeat-cancellation-details.ts @@ -1,23 +1,64 @@ import { ActivityCancellationDetails } from '@temporalio/common'; import * as activity from '@temporalio/activity'; +export interface ActivityState { + pause?: boolean; + unpause?: boolean; + reset?: boolean; + shouldRetry?: boolean; +} + export async function heartbeatCancellationDetailsActivity( - catchErr: boolean + state: ActivityState ): Promise { + const info = activity.activityInfo(); // Exit early if we've already run this activity. - if (activity.activityInfo().heartbeatDetails === 'finally-complete') { + if (info.attempt > 1) { return activity.cancellationDetails(); } + + // Otherwise, either pause or reset this activity (or both). + const client = activity.getClient(); + const req = { + namespace: client.options.namespace, + execution: { + workflowId: info.workflowExecution.workflowId, + runId: info.workflowExecution.runId, + }, + id: info.activityId, + }; + // Pause AND reset the activity. + if (state.pause && state.reset) { + await Promise.all([client.workflowService.pauseActivity(req), client.workflowService.resetActivity(req)]); + // Just pause. + } else if (state.pause) { + await client.workflowService.pauseActivity(req); + // Just reset. + } else if (state.reset) { + await client.workflowService.resetActivity(req); + } + // eslint-disable-next-line no-constant-condition while (true) { try { - activity.heartbeat('heartbeated'); + // Heartbeat to propagate cancellation signals from pause/reset. + activity.heartbeat(); await activity.sleep(300); } catch (err) { - if (err instanceof activity.CancelledFailure && catchErr) { + // If we encountered an unexpected, non-cancellation failure, + // throw a non-retryable error to fail the activity. + if (!(err instanceof activity.CancelledFailure)) { + throw activity.ApplicationFailure.nonRetryable('Unexpected failure', 'Error', err); + } + // If we don't want the activity to retry, return the cancellation details immediately. + if (!state.shouldRetry) { return activity.cancellationDetails(); } - activity.heartbeat('finally-complete'); + // Unpause if requested (a paused activity with not retry). + if (state.unpause) { + await client.workflowService.unpauseActivity(req); + } + // Re-throw the cancellation to retry the activity throw err; } } diff --git a/packages/test/src/helpers-integration.ts b/packages/test/src/helpers-integration.ts index 3e73562c1..9d0d6fab0 100644 --- a/packages/test/src/helpers-integration.ts +++ b/packages/test/src/helpers-integration.ts @@ -29,7 +29,7 @@ import { import * as workflow from '@temporalio/workflow'; import { temporal } from '@temporalio/proto'; import { defineSearchAttributeKey, SearchAttributeType } from '@temporalio/common/lib/search-attributes'; -import { Worker, TestWorkflowEnvironment, test as anyTest, bundlerOptions, waitUntil } from './helpers'; +import { Worker, TestWorkflowEnvironment, test as anyTest, bundlerOptions } from './helpers'; export interface Context { env: TestWorkflowEnvironment; @@ -296,89 +296,6 @@ export function configurableHelpers( }; } -export async function setActivityState( - handle: WorkflowHandle, - activityId: string, - state: 'pause' | 'unpause' | 'reset' | 'pause & reset' -): Promise { - const desc = await handle.describe(); - const req = { - namespace: handle.client.options.namespace, - execution: { - workflowId: desc.raw.workflowExecutionInfo?.execution?.workflowId, - runId: desc.raw.workflowExecutionInfo?.execution?.runId, - }, - id: activityId, - }; - if (state === 'pause') { - await handle.client.workflowService.pauseActivity(req); - } else if (state === 'unpause') { - await handle.client.workflowService.unpauseActivity(req); - } else if (state === 'reset') { - await handle.client.workflowService.resetActivity({ ...req, resetHeartbeat: true }); - } else { - await Promise.all([ - handle.client.workflowService.pauseActivity(req), - handle.client.workflowService.resetActivity({ ...req, resetHeartbeat: true }), - ]); - } - await waitUntil(async () => { - const { raw } = await handle.describe(); - const activityInfo = raw.pendingActivities?.find((act) => act.activityId === activityId); - // If we are pausing: success when either - // • paused flag is true OR - // • the activity vanished (it completed / retried) - if (state === 'pause') { - if (!activityInfo) { - return true; // Activity vanished (completed/retried) - } - return activityInfo.paused ?? false; - } else if (state === 'unpause') { - // If we are unpausing: success when either - // • paused flag is false OR - // • the activity vanished (already completed) - return activityInfo ? !activityInfo.paused : true; - } else if (state === 'reset') { - // If we are resetting, success when either - // • heartbeat details have been reset OR - // • the activity vanished (completed / retried) - return activityInfo ? activityInfo.heartbeatDetails === null : true; - } else { - // If we are pausing & resetting, success when either - // • activity is paused AND heartbeat details have been reset OR - // • the activity vanished (completed / retried) - if (!activityInfo) { - return true; // Activity vanished (completed/retried) - } - const isPaused = activityInfo.paused ?? false; - const isHeartbeatReset = activityInfo.heartbeatDetails === null; - return isPaused && isHeartbeatReset; - } - }, 15000); -} - -// Helper function to check if an activity has heartbeated -export async function hasActivityHeartbeat( - handle: WorkflowHandle, - activityId: string, - expectedContent?: string -): Promise { - const { raw } = await handle.describe(); - const activityInfo = raw.pendingActivities?.find((act) => act.activityId === activityId); - const heartbeatData = activityInfo?.heartbeatDetails?.payloads?.[0]?.data; - if (!heartbeatData) return false; - - // If no expected content specified, just check that heartbeat data exists - if (!expectedContent) return true; - - try { - const decoded = Buffer.from(heartbeatData).toString(); - return decoded.includes(expectedContent); - } catch { - return false; - } -} - export function helpers(t: ExecutionContext, testEnv: TestWorkflowEnvironment = t.context.env): Helpers { return configurableHelpers(t, t.context.workflowBundle, testEnv); } diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index 1a4c8e0c8..3b0604d17 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -42,16 +42,9 @@ import { encode } from '@temporalio/common/lib/encoding'; import { signalSchedulingWorkflow } from './activities/helpers'; import { activityStartedSignal } from './workflows/definitions'; import * as workflows from './workflows'; -import { - Context, - createLocalTestEnvironment, - hasActivityHeartbeat, - helpers, - makeTestFunction, - setActivityState, -} from './helpers-integration'; +import { Context, createLocalTestEnvironment, helpers, makeTestFunction } from './helpers-integration'; import { overrideSdkInternalFlag } from './mock-internal-flags'; -import { heartbeatCancellationDetailsActivity } from './activities/heartbeat-cancellation-details'; +import { ActivityState, heartbeatCancellationDetailsActivity } from './activities/heartbeat-cancellation-details'; import { loadHistory, RUN_TIME_SKIPPING_TESTS, waitUntil } from './helpers'; const test = makeTestFunction({ @@ -1441,26 +1434,22 @@ test('Workflow can return root workflow', async (t) => { }); }); -export async function heartbeatPauseWorkflow( - activityId: string, - catchErr: boolean, - maximumAttempts: number +export async function heartbeatCancellationWorkflow( + state: ActivityState ): Promise { const { heartbeatCancellationDetailsActivity } = workflow.proxyActivities({ startToCloseTimeout: '5s', - activityId, retry: { - maximumAttempts, + maximumAttempts: 2, }, heartbeatTimeout: '1s', }); - return await heartbeatCancellationDetailsActivity(catchErr); + return await heartbeatCancellationDetailsActivity(state); } test('Activity pause returns expected cancellation details', async (t) => { - const { createWorker, startWorkflow } = helpers(t); - + const { createWorker, executeWorkflow } = helpers(t); const worker = await createWorker({ activities: { heartbeatCancellationDetailsActivity, @@ -1468,24 +1457,10 @@ test('Activity pause returns expected cancellation details', async (t) => { }); await worker.runUntil(async () => { - const testActivityId = randomUUID(); - const handle = await startWorkflow(heartbeatPauseWorkflow, { - args: [testActivityId, true, 1], + const result = await executeWorkflow(heartbeatCancellationWorkflow, { + args: [{ pause: true }], }); - // Wait for activity to appear in pending activities AND start heartbeating - await waitUntil(async () => { - const { raw } = await handle.describe(); - const activityInfo = raw.pendingActivities?.find((act) => act.activityId === testActivityId); - // Check both: activity exists and has heartbeated - return !!(activityInfo && (await hasActivityHeartbeat(handle, testActivityId, 'heartbeated'))); - }, 10000); - - // Now pause the activity - await setActivityState(handle, testActivityId, 'pause'); - // Get the result - should contain pause cancellation details - const result = await handle.result(); - t.deepEqual(result, { cancelRequested: false, notFound: false, @@ -1498,7 +1473,7 @@ test('Activity pause returns expected cancellation details', async (t) => { }); test('Activity can be cancelled via pause and retry after unpause', async (t) => { - const { createWorker, startWorkflow } = helpers(t); + const { createWorker, executeWorkflow } = helpers(t); const worker = await createWorker({ activities: { @@ -1507,27 +1482,17 @@ test('Activity can be cancelled via pause and retry after unpause', async (t) => }); await worker.runUntil(async () => { - const testActivityId = randomUUID(); - const handle = await startWorkflow(heartbeatPauseWorkflow, { args: [testActivityId, false, 2] }); - - // Wait for it to exist and heartbeat - await waitUntil(async () => { - const { raw } = await handle.describe(); - const activityInfo = raw.pendingActivities?.find((act) => act.activityId === testActivityId); - return !!(activityInfo && (await hasActivityHeartbeat(handle, testActivityId, 'heartbeated'))); - }, 10000); - - await setActivityState(handle, testActivityId, 'pause'); - await waitUntil(async () => hasActivityHeartbeat(handle, testActivityId, 'finally-complete'), 10000); - await setActivityState(handle, testActivityId, 'unpause'); - - const result = await handle.result(); + const result = await executeWorkflow(heartbeatCancellationWorkflow, { + args: [{ pause: true, unpause: true, shouldRetry: true }], + }); + // Note that we expect the result to be null because unpausing an activity + // resets the activity context (akin to starting the activity anew) t.true(result == null); }); }); -test('Activity reset returns expected cancellation details', async (t) => { - const { createWorker, startWorkflow } = helpers(t); +test('Activity reset without retry returns expected cancellation details', async (t) => { + const { createWorker, executeWorkflow } = helpers(t); const worker = await createWorker({ activities: { heartbeatCancellationDetailsActivity, @@ -1535,18 +1500,7 @@ test('Activity reset returns expected cancellation details', async (t) => { }); await worker.runUntil(async () => { - const testActivityId = randomUUID(); - const handle = await startWorkflow(heartbeatPauseWorkflow, { args: [testActivityId, true, 1] }); - - // Wait for it to exist and heartbeat - await waitUntil(async () => { - const { raw } = await handle.describe(); - const activityInfo = raw.pendingActivities?.find((act) => act.activityId === testActivityId); - return !!(activityInfo && (await hasActivityHeartbeat(handle, testActivityId, 'heartbeated'))); - }, 10000); - - await setActivityState(handle, testActivityId, 'reset'); - const result = await handle.result(); + const result = await executeWorkflow(heartbeatCancellationWorkflow, { args: [{ reset: true }] }); t.deepEqual(result, { cancelRequested: false, notFound: false, @@ -1558,8 +1512,8 @@ test('Activity reset returns expected cancellation details', async (t) => { }); }); -test('Activity set as both paused and reset returns expected cancellation details', async (t) => { - const { createWorker, startWorkflow } = helpers(t); +test('Activity reset with retry returns expected cancellation details', async (t) => { + const { createWorker, executeWorkflow } = helpers(t); const worker = await createWorker({ activities: { heartbeatCancellationDetailsActivity, @@ -1567,18 +1521,21 @@ test('Activity set as both paused and reset returns expected cancellation detail }); await worker.runUntil(async () => { - const testActivityId = randomUUID(); - const handle = await startWorkflow(heartbeatPauseWorkflow, { args: [testActivityId, true, 1] }); + const result = await executeWorkflow(heartbeatCancellationWorkflow, { args: [{ reset: true, shouldRetry: true }] }); + t.true(result == null); + }); +}); - // Wait for it to exist and heartbeat - await waitUntil(async () => { - const { raw } = await handle.describe(); - const activityInfo = raw.pendingActivities?.find((act) => act.activityId === testActivityId); - return !!(activityInfo && (await hasActivityHeartbeat(handle, testActivityId, 'heartbeated'))); - }, 10000); +test('Activity paused and reset returns expected cancellation details', async (t) => { + const { createWorker, executeWorkflow } = helpers(t); + const worker = await createWorker({ + activities: { + heartbeatCancellationDetailsActivity, + }, + }); - await setActivityState(handle, testActivityId, 'pause & reset'); - const result = await handle.result(); + await worker.runUntil(async () => { + const result = await executeWorkflow(heartbeatCancellationWorkflow, { args: [{ pause: true, reset: true }] }); t.deepEqual(result, { cancelRequested: false, notFound: false,