Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 46 additions & 5 deletions packages/test/src/activities/heartbeat-cancellation-details.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,64 @@
import { ActivityCancellationDetails } from '@temporalio/common';
import * as activity from '@temporalio/activity';

export interface ActivityState {
pause?: boolean;
unpause?: boolean;
reset?: boolean;
shouldRetry?: boolean;
}

export async function heartbeatCancellationDetailsActivity(
catchErr: boolean
state: ActivityState
): Promise<ActivityCancellationDetails | undefined> {
const info = activity.activityInfo();
// Exit early if we've already run this activity.
if (activity.activityInfo().heartbeatDetails === 'finally-complete') {
if (info.attempt > 1) {
return activity.cancellationDetails();
}

// Otherwise, either pause or reset this activity (or both).
const client = activity.getClient();
const req = {
namespace: client.options.namespace,
execution: {
workflowId: info.workflowExecution.workflowId,
runId: info.workflowExecution.runId,
},
id: info.activityId,
};
// Pause AND reset the activity.
if (state.pause && state.reset) {
await Promise.all([client.workflowService.pauseActivity(req), client.workflowService.resetActivity(req)]);
// Just pause.
} else if (state.pause) {
await client.workflowService.pauseActivity(req);
// Just reset.
} else if (state.reset) {
await client.workflowService.resetActivity(req);
}

// eslint-disable-next-line no-constant-condition
while (true) {
try {
activity.heartbeat('heartbeated');
// Heartbeat to propagate cancellation signals from pause/reset.
activity.heartbeat();
await activity.sleep(300);
} catch (err) {
if (err instanceof activity.CancelledFailure && catchErr) {
// If we encountered an unexpected, non-cancellation failure,
// throw a non-retryable error to fail the activity.
if (!(err instanceof activity.CancelledFailure)) {
throw activity.ApplicationFailure.nonRetryable('Unexpected failure', 'Error', err);
}
// If we don't want the activity to retry, return the cancellation details immediately.
if (!state.shouldRetry) {
return activity.cancellationDetails();
}
activity.heartbeat('finally-complete');
// Unpause if requested (a paused activity with not retry).
if (state.unpause) {
await client.workflowService.unpauseActivity(req);
}
// Re-throw the cancellation to retry the activity
throw err;
}
}
Expand Down
85 changes: 1 addition & 84 deletions packages/test/src/helpers-integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import {
import * as workflow from '@temporalio/workflow';
import { temporal } from '@temporalio/proto';
import { defineSearchAttributeKey, SearchAttributeType } from '@temporalio/common/lib/search-attributes';
import { Worker, TestWorkflowEnvironment, test as anyTest, bundlerOptions, waitUntil } from './helpers';
import { Worker, TestWorkflowEnvironment, test as anyTest, bundlerOptions } from './helpers';

export interface Context {
env: TestWorkflowEnvironment;
Expand Down Expand Up @@ -296,89 +296,6 @@ export function configurableHelpers<T>(
};
}

export async function setActivityState(
handle: WorkflowHandle,
activityId: string,
state: 'pause' | 'unpause' | 'reset' | 'pause & reset'
): Promise<void> {
const desc = await handle.describe();
const req = {
namespace: handle.client.options.namespace,
execution: {
workflowId: desc.raw.workflowExecutionInfo?.execution?.workflowId,
runId: desc.raw.workflowExecutionInfo?.execution?.runId,
},
id: activityId,
};
if (state === 'pause') {
await handle.client.workflowService.pauseActivity(req);
} else if (state === 'unpause') {
await handle.client.workflowService.unpauseActivity(req);
} else if (state === 'reset') {
await handle.client.workflowService.resetActivity({ ...req, resetHeartbeat: true });
} else {
await Promise.all([
handle.client.workflowService.pauseActivity(req),
handle.client.workflowService.resetActivity({ ...req, resetHeartbeat: true }),
]);
}
await waitUntil(async () => {
const { raw } = await handle.describe();
const activityInfo = raw.pendingActivities?.find((act) => act.activityId === activityId);
// If we are pausing: success when either
// • paused flag is true OR
// • the activity vanished (it completed / retried)
if (state === 'pause') {
if (!activityInfo) {
return true; // Activity vanished (completed/retried)
}
return activityInfo.paused ?? false;
} else if (state === 'unpause') {
// If we are unpausing: success when either
// • paused flag is false OR
// • the activity vanished (already completed)
return activityInfo ? !activityInfo.paused : true;
} else if (state === 'reset') {
// If we are resetting, success when either
// • heartbeat details have been reset OR
// • the activity vanished (completed / retried)
return activityInfo ? activityInfo.heartbeatDetails === null : true;
} else {
// If we are pausing & resetting, success when either
// • activity is paused AND heartbeat details have been reset OR
// • the activity vanished (completed / retried)
if (!activityInfo) {
return true; // Activity vanished (completed/retried)
}
const isPaused = activityInfo.paused ?? false;
const isHeartbeatReset = activityInfo.heartbeatDetails === null;
return isPaused && isHeartbeatReset;
}
}, 15000);
}

// Helper function to check if an activity has heartbeated
export async function hasActivityHeartbeat(
handle: WorkflowHandle,
activityId: string,
expectedContent?: string
): Promise<boolean> {
const { raw } = await handle.describe();
const activityInfo = raw.pendingActivities?.find((act) => act.activityId === activityId);
const heartbeatData = activityInfo?.heartbeatDetails?.payloads?.[0]?.data;
if (!heartbeatData) return false;

// If no expected content specified, just check that heartbeat data exists
if (!expectedContent) return true;

try {
const decoded = Buffer.from(heartbeatData).toString();
return decoded.includes(expectedContent);
} catch {
return false;
}
}

export function helpers(t: ExecutionContext<Context>, testEnv: TestWorkflowEnvironment = t.context.env): Helpers {
return configurableHelpers(t, t.context.workflowBundle, testEnv);
}
109 changes: 33 additions & 76 deletions packages/test/src/test-integration-workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,9 @@ import { encode } from '@temporalio/common/lib/encoding';
import { signalSchedulingWorkflow } from './activities/helpers';
import { activityStartedSignal } from './workflows/definitions';
import * as workflows from './workflows';
import {
Context,
createLocalTestEnvironment,
hasActivityHeartbeat,
helpers,
makeTestFunction,
setActivityState,
} from './helpers-integration';
import { Context, createLocalTestEnvironment, helpers, makeTestFunction } from './helpers-integration';
import { overrideSdkInternalFlag } from './mock-internal-flags';
import { heartbeatCancellationDetailsActivity } from './activities/heartbeat-cancellation-details';
import { ActivityState, heartbeatCancellationDetailsActivity } from './activities/heartbeat-cancellation-details';
import { loadHistory, RUN_TIME_SKIPPING_TESTS, waitUntil } from './helpers';

const test = makeTestFunction({
Expand Down Expand Up @@ -1441,51 +1434,33 @@ test('Workflow can return root workflow', async (t) => {
});
});

export async function heartbeatPauseWorkflow(
activityId: string,
catchErr: boolean,
maximumAttempts: number
export async function heartbeatCancellationWorkflow(
state: ActivityState
): Promise<ActivityCancellationDetails | undefined> {
const { heartbeatCancellationDetailsActivity } = workflow.proxyActivities({
startToCloseTimeout: '5s',
activityId,
retry: {
maximumAttempts,
maximumAttempts: 2,
},
heartbeatTimeout: '1s',
});

return await heartbeatCancellationDetailsActivity(catchErr);
return await heartbeatCancellationDetailsActivity(state);
}

test('Activity pause returns expected cancellation details', async (t) => {
const { createWorker, startWorkflow } = helpers(t);

const { createWorker, executeWorkflow } = helpers(t);
const worker = await createWorker({
activities: {
heartbeatCancellationDetailsActivity,
},
});

await worker.runUntil(async () => {
const testActivityId = randomUUID();
const handle = await startWorkflow(heartbeatPauseWorkflow, {
args: [testActivityId, true, 1],
const result = await executeWorkflow(heartbeatCancellationWorkflow, {
args: [{ pause: true }],
});

// Wait for activity to appear in pending activities AND start heartbeating
await waitUntil(async () => {
const { raw } = await handle.describe();
const activityInfo = raw.pendingActivities?.find((act) => act.activityId === testActivityId);
// Check both: activity exists and has heartbeated
return !!(activityInfo && (await hasActivityHeartbeat(handle, testActivityId, 'heartbeated')));
}, 10000);

// Now pause the activity
await setActivityState(handle, testActivityId, 'pause');
// Get the result - should contain pause cancellation details
const result = await handle.result();

t.deepEqual(result, {
cancelRequested: false,
notFound: false,
Expand All @@ -1498,7 +1473,7 @@ test('Activity pause returns expected cancellation details', async (t) => {
});

test('Activity can be cancelled via pause and retry after unpause', async (t) => {
const { createWorker, startWorkflow } = helpers(t);
const { createWorker, executeWorkflow } = helpers(t);

const worker = await createWorker({
activities: {
Expand All @@ -1507,46 +1482,25 @@ test('Activity can be cancelled via pause and retry after unpause', async (t) =>
});

await worker.runUntil(async () => {
const testActivityId = randomUUID();
const handle = await startWorkflow(heartbeatPauseWorkflow, { args: [testActivityId, false, 2] });

// Wait for it to exist and heartbeat
await waitUntil(async () => {
const { raw } = await handle.describe();
const activityInfo = raw.pendingActivities?.find((act) => act.activityId === testActivityId);
return !!(activityInfo && (await hasActivityHeartbeat(handle, testActivityId, 'heartbeated')));
}, 10000);

await setActivityState(handle, testActivityId, 'pause');
await waitUntil(async () => hasActivityHeartbeat(handle, testActivityId, 'finally-complete'), 10000);
await setActivityState(handle, testActivityId, 'unpause');

const result = await handle.result();
const result = await executeWorkflow(heartbeatCancellationWorkflow, {
args: [{ pause: true, unpause: true, shouldRetry: true }],
});
// Note that we expect the result to be null because unpausing an activity
// resets the activity context (akin to starting the activity anew)
t.true(result == null);
});
});

test('Activity reset returns expected cancellation details', async (t) => {
const { createWorker, startWorkflow } = helpers(t);
test('Activity reset without retry returns expected cancellation details', async (t) => {
const { createWorker, executeWorkflow } = helpers(t);
const worker = await createWorker({
activities: {
heartbeatCancellationDetailsActivity,
},
});

await worker.runUntil(async () => {
const testActivityId = randomUUID();
const handle = await startWorkflow(heartbeatPauseWorkflow, { args: [testActivityId, true, 1] });

// Wait for it to exist and heartbeat
await waitUntil(async () => {
const { raw } = await handle.describe();
const activityInfo = raw.pendingActivities?.find((act) => act.activityId === testActivityId);
return !!(activityInfo && (await hasActivityHeartbeat(handle, testActivityId, 'heartbeated')));
}, 10000);

await setActivityState(handle, testActivityId, 'reset');
const result = await handle.result();
const result = await executeWorkflow(heartbeatCancellationWorkflow, { args: [{ reset: true }] });
t.deepEqual(result, {
cancelRequested: false,
notFound: false,
Expand All @@ -1558,27 +1512,30 @@ test('Activity reset returns expected cancellation details', async (t) => {
});
});

test('Activity set as both paused and reset returns expected cancellation details', async (t) => {
const { createWorker, startWorkflow } = helpers(t);
test('Activity reset with retry returns expected cancellation details', async (t) => {
const { createWorker, executeWorkflow } = helpers(t);
const worker = await createWorker({
activities: {
heartbeatCancellationDetailsActivity,
},
});

await worker.runUntil(async () => {
const testActivityId = randomUUID();
const handle = await startWorkflow(heartbeatPauseWorkflow, { args: [testActivityId, true, 1] });
const result = await executeWorkflow(heartbeatCancellationWorkflow, { args: [{ reset: true, shouldRetry: true }] });
t.true(result == null);
});
});

// Wait for it to exist and heartbeat
await waitUntil(async () => {
const { raw } = await handle.describe();
const activityInfo = raw.pendingActivities?.find((act) => act.activityId === testActivityId);
return !!(activityInfo && (await hasActivityHeartbeat(handle, testActivityId, 'heartbeated')));
}, 10000);
test('Activity paused and reset returns expected cancellation details', async (t) => {
const { createWorker, executeWorkflow } = helpers(t);
const worker = await createWorker({
activities: {
heartbeatCancellationDetailsActivity,
},
});

await setActivityState(handle, testActivityId, 'pause & reset');
const result = await handle.result();
await worker.runUntil(async () => {
const result = await executeWorkflow(heartbeatCancellationWorkflow, { args: [{ pause: true, reset: true }] });
t.deepEqual(result, {
cancelRequested: false,
notFound: false,
Expand Down
Loading