Skip to content

Commit 544e2b2

Browse files
authored
Activity pause/unpause (#1729)
1 parent 0c58746 commit 544e2b2

File tree

12 files changed

+331
-22
lines changed

12 files changed

+331
-22
lines changed

packages/activity/src/index.ts

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
*
4949
* 1. `await` on {@link Context.cancelled | `Context.current().cancelled`} or
5050
* {@link Context.sleep | `Context.current().sleep()`}, which each throw a {@link CancelledFailure}.
51-
* 1. Pass the context's {@link https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal | `AbortSignal`} at
51+
* 2. Pass the context's {@link https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal | `AbortSignal`} at
5252
* {@link Context.cancellationSignal | `Context.current().cancellationSignal`} to a library that supports it.
5353
*
5454
* ### Examples
@@ -70,9 +70,18 @@
7070
*/
7171

7272
import { AsyncLocalStorage } from 'node:async_hooks';
73-
import { Logger, Duration, LogLevel, LogMetadata, MetricMeter, Priority } from '@temporalio/common';
73+
import {
74+
Logger,
75+
Duration,
76+
LogLevel,
77+
LogMetadata,
78+
MetricMeter,
79+
Priority,
80+
ActivityCancellationDetails,
81+
} from '@temporalio/common';
7482
import { msToNumber } from '@temporalio/common/lib/time';
7583
import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers';
84+
import { ActivityCancellationDetailsHolder } from '@temporalio/common/lib/activity-cancellation-details';
7685

7786
export {
7887
ActivityFunction,
@@ -289,6 +298,11 @@ export class Context {
289298
*/
290299
public readonly metricMeter: MetricMeter;
291300

301+
/**
302+
* Holder object for activity cancellation details
303+
*/
304+
private readonly _cancellationDetails: ActivityCancellationDetailsHolder;
305+
292306
/**
293307
* **Not** meant to instantiated by Activity code, used by the worker.
294308
*
@@ -300,14 +314,16 @@ export class Context {
300314
cancellationSignal: AbortSignal,
301315
heartbeat: (details?: any) => void,
302316
log: Logger,
303-
metricMeter: MetricMeter
317+
metricMeter: MetricMeter,
318+
details: ActivityCancellationDetailsHolder
304319
) {
305320
this.info = info;
306321
this.cancelled = cancelled;
307322
this.cancellationSignal = cancellationSignal;
308323
this.heartbeatFn = heartbeat;
309324
this.log = log;
310325
this.metricMeter = metricMeter;
326+
this._cancellationDetails = details;
311327
}
312328

313329
/**
@@ -347,6 +363,16 @@ export class Context {
347363
});
348364
return Promise.race([this.cancelled.finally(() => clearTimeout(handle)), timer]);
349365
};
366+
367+
/**
368+
* Return the cancellation details for this activity, if any.
369+
* @returns an object with boolean properties that describes the reason for cancellation, or undefined if not cancelled.
370+
*
371+
* @experimental Activity cancellation details include usage of experimental features such as activity pause, and may be subject to change.
372+
*/
373+
public get cancellationDetails(): ActivityCancellationDetails | undefined {
374+
return this._cancellationDetails.details;
375+
}
350376
}
351377

352378
/**
@@ -427,6 +453,16 @@ export function cancelled(): Promise<never> {
427453
return Context.current().cancelled;
428454
}
429455

456+
/**
457+
* Return the cancellation details for this activity, if any.
458+
* @returns an object with boolean properties that describes the reason for cancellation, or undefined if not cancelled.
459+
*
460+
* @experimental Activity cancellation details include usage of experimental features such as activity pause, and may be subject to change.
461+
*/
462+
export function cancellationDetails(): ActivityCancellationDetails | undefined {
463+
return Context.current().cancellationDetails;
464+
}
465+
430466
/**
431467
* Return an {@link https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal | `AbortSignal`} that can be used to
432468
* react to Activity cancellation.

packages/client/src/async-completion-client.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,13 @@ export class ActivityCompletionError extends Error {}
3535
@SymbolBasedInstanceOfError('ActivityCancelledError')
3636
export class ActivityCancelledError extends Error {}
3737

38+
/**
39+
* Thrown by {@link AsyncCompletionClient.heartbeat} when the reporting Activity
40+
* has been paused.
41+
*/
42+
@SymbolBasedInstanceOfError('ActivityPausedError')
43+
export class ActivityPausedError extends Error {}
44+
3845
/**
3946
* Options used to configure {@link AsyncCompletionClient}
4047
*/
@@ -211,6 +218,7 @@ export class AsyncCompletionClient extends BaseClient {
211218
async heartbeat(taskTokenOrFullActivityId: Uint8Array | FullActivityId, details?: unknown): Promise<void> {
212219
const payloads = await encodeToPayloads(this.dataConverter, details);
213220
let cancelRequested = false;
221+
let paused = false;
214222
try {
215223
if (taskTokenOrFullActivityId instanceof Uint8Array) {
216224
const response = await this.workflowService.recordActivityTaskHeartbeat({
@@ -220,6 +228,7 @@ export class AsyncCompletionClient extends BaseClient {
220228
details: { payloads },
221229
});
222230
cancelRequested = !!response.cancelRequested;
231+
paused = !!response.activityPaused;
223232
} else {
224233
const response = await this.workflowService.recordActivityTaskHeartbeatById({
225234
identity: this.options.identity,
@@ -228,12 +237,16 @@ export class AsyncCompletionClient extends BaseClient {
228237
details: { payloads },
229238
});
230239
cancelRequested = !!response.cancelRequested;
240+
paused = !!response.activityPaused;
231241
}
232242
} catch (err) {
233243
this.handleError(err);
234244
}
235245
if (cancelRequested) {
236246
throw new ActivityCancelledError('cancelled');
237247
}
248+
if (paused) {
249+
throw new ActivityPausedError('paused');
250+
}
238251
}
239252
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
import type { coresdk } from '@temporalio/proto';
2+
3+
// ts-prune-ignore-next
4+
export interface ActivityCancellationDetailsHolder {
5+
details?: ActivityCancellationDetails;
6+
}
7+
8+
export interface ActivityCancellationDetailsOptions {
9+
notFound?: boolean;
10+
cancelRequested?: boolean;
11+
paused?: boolean;
12+
timedOut?: boolean;
13+
workerShutdown?: boolean;
14+
reset?: boolean;
15+
}
16+
17+
/**
18+
* Provides the reasons for the activity's cancellation. Cancellation details are set once and do not change once set.
19+
*/
20+
export class ActivityCancellationDetails {
21+
readonly notFound: boolean;
22+
readonly cancelRequested: boolean;
23+
readonly paused: boolean;
24+
readonly timedOut: boolean;
25+
readonly workerShutdown: boolean;
26+
readonly reset: boolean;
27+
28+
public constructor(options: ActivityCancellationDetailsOptions = {}) {
29+
this.notFound = options.notFound ?? false;
30+
this.cancelRequested = options.cancelRequested ?? false;
31+
this.paused = options.paused ?? false;
32+
this.timedOut = options.timedOut ?? false;
33+
this.workerShutdown = options.workerShutdown ?? false;
34+
this.reset = options.reset ?? false;
35+
}
36+
37+
static fromProto(
38+
proto: coresdk.activity_task.IActivityCancellationDetails | null | undefined
39+
): ActivityCancellationDetails {
40+
if (proto == null) {
41+
return new ActivityCancellationDetails();
42+
}
43+
return new ActivityCancellationDetails({
44+
notFound: proto.isNotFound ?? false,
45+
cancelRequested: proto.isCancelled ?? false,
46+
paused: proto.isPaused ?? false,
47+
timedOut: proto.isTimedOut ?? false,
48+
workerShutdown: proto.isWorkerShutdown ?? false,
49+
reset: proto.isReset ?? false,
50+
});
51+
}
52+
}

packages/common/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import * as encoding from './encoding';
88
import * as helpers from './type-helpers';
99

1010
export * from './activity-options';
11+
export { ActivityCancellationDetailsOptions, ActivityCancellationDetails } from './activity-cancellation-details';
1112
export * from './converter/data-converter';
1213
export * from './converter/failure-converter';
1314
export * from './converter/payload-codec';

packages/common/src/internal-non-workflow/codec-helpers.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Payload } from '../interfaces';
2-
import { arrayFromPayloads, fromPayloadsAtIndex, PayloadConverter, toPayloads } from '../converter/payload-converter';
2+
import { arrayFromPayloads, fromPayloadsAtIndex, toPayloads } from '../converter/payload-converter';
33
import { PayloadConverterError } from '../errors';
44
import { PayloadCodec } from '../converter/payload-codec';
55
import { ProtoFailure } from '../failure';
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import { ActivityCancellationDetails } from '@temporalio/common';
2+
import * as activity from '@temporalio/activity';
3+
4+
export async function heartbeatCancellationDetailsActivity(
5+
catchErr: boolean
6+
): Promise<ActivityCancellationDetails | undefined> {
7+
// Exit early if we've already run this activity.
8+
if (activity.activityInfo().heartbeatDetails === 'finally-complete') {
9+
return activity.cancellationDetails();
10+
}
11+
// eslint-disable-next-line no-constant-condition
12+
while (true) {
13+
try {
14+
activity.heartbeat('heartbeated');
15+
await activity.sleep(300);
16+
} catch (err) {
17+
if (err instanceof activity.CancelledFailure && catchErr) {
18+
return activity.cancellationDetails();
19+
}
20+
activity.heartbeat('finally-complete');
21+
throw err;
22+
}
23+
}
24+
}

packages/test/src/helpers-integration.ts

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,15 @@ import * as workflow from '@temporalio/workflow';
3030
import { temporal } from '@temporalio/proto';
3131
import { defineSearchAttributeKey, SearchAttributeType } from '@temporalio/common/lib/search-attributes';
3232
import { ConnectionInjectorInterceptor } from './activities/interceptors';
33-
import { Worker, TestWorkflowEnvironment, test as anyTest, bundlerOptions } from './helpers';
33+
import { Worker, TestWorkflowEnvironment, test as anyTest, bundlerOptions, waitUntil } from './helpers';
3434

3535
export interface Context {
3636
env: TestWorkflowEnvironment;
3737
workflowBundle: WorkflowBundle;
3838
}
3939

4040
const defaultDynamicConfigOptions = [
41+
'frontend.activityAPIsEnabled=true',
4142
'frontend.enableExecuteMultiOperation=true',
4243
'frontend.workerVersioningDataAPIs=true',
4344
'frontend.workerVersioningWorkflowAPIs=true',
@@ -284,6 +285,57 @@ export function configurableHelpers<T>(
284285
};
285286
}
286287

288+
export async function setActivityPauseState(handle: WorkflowHandle, activityId: string, pause: boolean): Promise<void> {
289+
const desc = await handle.describe();
290+
const req = {
291+
namespace: handle.client.options.namespace,
292+
execution: {
293+
workflowId: desc.raw.workflowExecutionInfo?.execution?.workflowId,
294+
runId: desc.raw.workflowExecutionInfo?.execution?.runId,
295+
},
296+
id: activityId,
297+
};
298+
if (pause) {
299+
await handle.client.workflowService.pauseActivity(req);
300+
} else {
301+
await handle.client.workflowService.unpauseActivity(req);
302+
}
303+
await waitUntil(async () => {
304+
const { raw } = await handle.describe();
305+
const activityInfo = raw.pendingActivities?.find((act) => act.activityId === activityId);
306+
// If we are pausing: success when either
307+
// • paused flag is true OR
308+
// • the activity vanished (it completed / retried)
309+
if (pause) return activityInfo ? activityInfo.paused ?? false : true;
310+
// If we are unpausing: success when either
311+
// • paused flag is false OR
312+
// • the activity vanished (already completed)
313+
return activityInfo ? !activityInfo.paused : true;
314+
}, 15000);
315+
}
316+
317+
// Helper function to check if an activity has heartbeated
318+
export async function hasActivityHeartbeat(
319+
handle: WorkflowHandle,
320+
activityId: string,
321+
expectedContent?: string
322+
): Promise<boolean> {
323+
const { raw } = await handle.describe();
324+
const activityInfo = raw.pendingActivities?.find((act) => act.activityId === activityId);
325+
const heartbeatData = activityInfo?.heartbeatDetails?.payloads?.[0]?.data;
326+
if (!heartbeatData) return false;
327+
328+
// If no expected content specified, just check that heartbeat data exists
329+
if (!expectedContent) return true;
330+
331+
try {
332+
const decoded = Buffer.from(heartbeatData).toString();
333+
return decoded.includes(expectedContent);
334+
} catch {
335+
return false;
336+
}
337+
}
338+
287339
export function helpers(t: ExecutionContext<Context>, testEnv: TestWorkflowEnvironment = t.context.env): Helpers {
288340
return configurableHelpers(t, t.context.workflowBundle, testEnv);
289341
}

0 commit comments

Comments
 (0)