Skip to content

Commit 0a29eb2

Browse files
committed
impl
1 parent 1bcb339 commit 0a29eb2

File tree

6 files changed

+96
-17
lines changed

6 files changed

+96
-17
lines changed

packages/activity/src/index.ts

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
*
4949
* 1. `await` on {@link Context.cancelled | `Context.current().cancelled`} or
5050
* {@link Context.sleep | `Context.current().sleep()`}, which each throw a {@link CancelledFailure}.
51-
* 1. Pass the context's {@link https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal | `AbortSignal`} at
51+
* 2. Pass the context's {@link https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal | `AbortSignal`} at
5252
* {@link Context.cancellationSignal | `Context.current().cancellationSignal`} to a library that supports it.
5353
*
5454
* ### Examples
@@ -70,7 +70,16 @@
7070
*/
7171

7272
import { AsyncLocalStorage } from 'node:async_hooks';
73-
import { Logger, Duration, LogLevel, LogMetadata, MetricMeter, Priority } from '@temporalio/common';
73+
import {
74+
Logger,
75+
Duration,
76+
LogLevel,
77+
LogMetadata,
78+
MetricMeter,
79+
Priority,
80+
ActivityCancellationDetailsHolder,
81+
ActivityCancellationDetails,
82+
} from '@temporalio/common';
7483
import { msToNumber } from '@temporalio/common/lib/time';
7584
import { SymbolBasedInstanceOfError } from '@temporalio/common/lib/type-helpers';
7685

@@ -289,6 +298,11 @@ export class Context {
289298
*/
290299
public readonly metricMeter: MetricMeter;
291300

301+
/**
302+
* Holder object for activity cancellation details
303+
*/
304+
public readonly cancellationDetails: ActivityCancellationDetailsHolder;
305+
292306
/**
293307
* **Not** meant to instantiated by Activity code, used by the worker.
294308
*
@@ -300,14 +314,16 @@ export class Context {
300314
cancellationSignal: AbortSignal,
301315
heartbeat: (details?: any) => void,
302316
log: Logger,
303-
metricMeter: MetricMeter
317+
metricMeter: MetricMeter,
318+
details: ActivityCancellationDetailsHolder
304319
) {
305320
this.info = info;
306321
this.cancelled = cancelled;
307322
this.cancellationSignal = cancellationSignal;
308323
this.heartbeatFn = heartbeat;
309324
this.log = log;
310325
this.metricMeter = metricMeter;
326+
this.cancellationDetails = details;
311327
}
312328

313329
/**
@@ -427,6 +443,13 @@ export function cancelled(): Promise<never> {
427443
return Context.current().cancelled;
428444
}
429445

446+
/**
447+
* Returns the cancellation details for this activity, if any.
448+
*/
449+
export function cancellationDetails(): ActivityCancellationDetails | undefined {
450+
return Context.current().cancellationDetails.details;
451+
}
452+
430453
/**
431454
* Return an {@link https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal | `AbortSignal`} that can be used to
432455
* react to Activity cancellation.
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import type { coresdk } from '@temporalio/proto';
2+
3+
export interface ActivityCancellationDetailsHolder {
4+
details?: ActivityCancellationDetails;
5+
}
6+
7+
/**
8+
* Provides the reasons for the activity's cancellation. Cancellation details are set once and do not change once set.
9+
*/
10+
export class ActivityCancellationDetails {
11+
readonly notFound: boolean;
12+
readonly cancelRequested: boolean;
13+
readonly paused: boolean;
14+
readonly timedOut: boolean;
15+
readonly workerShutdown: boolean;
16+
17+
private constructor(
18+
notFound: boolean = false,
19+
cancelRequested: boolean = false,
20+
paused: boolean = false,
21+
timedOut: boolean = false,
22+
workerShutdown: boolean = false
23+
) {
24+
this.notFound = notFound;
25+
this.cancelRequested = cancelRequested;
26+
this.paused = paused;
27+
this.timedOut = timedOut;
28+
this.workerShutdown = workerShutdown;
29+
}
30+
31+
static fromProto(
32+
proto: coresdk.activity_task.IActivityCancellationDetails | null | undefined
33+
): ActivityCancellationDetails {
34+
if (proto == null) {
35+
return new ActivityCancellationDetails();
36+
}
37+
return new ActivityCancellationDetails(
38+
proto.isNotFound ?? false,
39+
proto.isCancelled ?? false,
40+
proto.isPaused ?? false,
41+
proto.isTimedOut ?? false,
42+
proto.isWorkerShutdown ?? false
43+
);
44+
}
45+
}

packages/common/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import * as encoding from './encoding';
88
import * as helpers from './type-helpers';
99

1010
export * from './activity-options';
11+
export * from './activity-cancellation-details';
1112
export * from './converter/data-converter';
1213
export * from './converter/failure-converter';
1314
export * from './converter/payload-codec';

packages/core-bridge/src/worker.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -520,9 +520,7 @@ mod config {
520520
impl From<PollerBehavior> for CorePollerBehavior {
521521
fn from(val: PollerBehavior) -> Self {
522522
match val {
523-
PollerBehavior::SimpleMaximum { maximum } => {
524-
Self::SimpleMaximum(maximum)
525-
}
523+
PollerBehavior::SimpleMaximum { maximum } => Self::SimpleMaximum(maximum),
526524
PollerBehavior::Autoscaling {
527525
minimum,
528526
maximum,
@@ -771,10 +769,7 @@ mod custom_slot_supplier {
771769
slot_type: SK::kind().into(),
772770
task_queue: ctx.task_queue().to_string(),
773771
worker_identity: ctx.worker_identity().to_string(),
774-
worker_deployment_version: ctx
775-
.worker_deployment_version()
776-
.clone()
777-
.map(Into::into),
772+
worker_deployment_version: ctx.worker_deployment_version().clone().map(Into::into),
778773
is_sticky: ctx.is_sticky(),
779774
};
780775

packages/worker/src/activity.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import 'abort-controller/polyfill'; // eslint-disable-line import/no-unassigned-import
22
import { asyncLocalStorage, CompleteAsyncError, Context, Info } from '@temporalio/activity';
33
import {
4+
ActivityCancellationDetails,
5+
ActivityCancellationDetailsHolder,
46
ActivityFunction,
57
ApplicationFailure,
68
CancelledFailure,
@@ -34,8 +36,9 @@ export type CancelReason =
3436

3537
export class Activity {
3638
protected cancelReason?: CancelReason;
39+
protected cancellationDetails: ActivityCancellationDetailsHolder;
3740
public readonly context: Context;
38-
public cancel: (reason: CancelReason) => void = () => undefined;
41+
public cancel: (reason: CancelReason, details: ActivityCancellationDetails) => void = () => undefined;
3942
public readonly abortController: AbortController = new AbortController();
4043
public readonly interceptors: {
4144
inbound: ActivityInboundCallsInterceptor[];
@@ -65,10 +68,11 @@ export class Activity {
6568
) {
6669
this.workerLogger = LoggerWithComposedMetadata.compose(workerLogger, this.getLogAttributes.bind(this));
6770
this.metricMeter = MetricMeterWithComposedTags.compose(workerMetricMeter, this.getMetricTags.bind(this));
68-
71+
this.cancellationDetails = {};
6972
const promise = new Promise<never>((_, reject) => {
70-
this.cancel = (reason: CancelReason) => {
73+
this.cancel = (reason: CancelReason, details: ActivityCancellationDetails) => {
7174
this.cancelReason = reason;
75+
this.cancellationDetails.details = details;
7276
const err = new CancelledFailure(reason);
7377
this.abortController.abort(err);
7478
reject(err);
@@ -81,7 +85,8 @@ export class Activity {
8185
this.heartbeatCallback,
8286
// This is the activity context logger, to be used exclusively from user code
8387
LoggerWithComposedMetadata.compose(this.workerLogger, { sdkComponent: SdkComponent.activity }),
84-
this.metricMeter
88+
this.metricMeter,
89+
this.cancellationDetails
8590
);
8691
// Prevent unhandled rejection
8792
promise.catch(() => undefined);

packages/worker/src/worker.ts

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import {
3333
TypedSearchAttributes,
3434
decodePriority,
3535
MetricMeter,
36+
ActivityCancellationDetails,
3637
} from '@temporalio/common';
3738
import {
3839
decodeArrayFromPayloads,
@@ -988,7 +989,12 @@ export class Worker {
988989
base64TaskToken,
989990
details,
990991
onError() {
991-
activity?.cancel('HEARTBEAT_DETAILS_CONVERSION_FAILED'); // activity must be defined
992+
// activity must be defined
993+
// empty cancellation details, not corresponding detail for heartbeat detail conversion failure
994+
activity?.cancel(
995+
'HEARTBEAT_DETAILS_CONVERSION_FAILED',
996+
ActivityCancellationDetails.fromProto(undefined)
997+
);
992998
},
993999
}),
9941000
this.logger,
@@ -1027,11 +1033,15 @@ export class Worker {
10271033
// NOTE: activity will not be considered cancelled until it confirms cancellation (by throwing a CancelledFailure)
10281034
this.logger.trace('Cancelling activity', activityLogAttributes(activity.info));
10291035
const reason = task.cancel?.reason;
1036+
const cancellationDetails = task.cancel?.details;
10301037
if (reason === undefined || reason === null) {
10311038
// Special case of Lang side cancellation during shutdown (see `activity.shutdown.evict` above)
1032-
activity.cancel('WORKER_SHUTDOWN');
1039+
activity.cancel('WORKER_SHUTDOWN', ActivityCancellationDetails.fromProto(cancellationDetails));
10331040
} else {
1034-
activity.cancel(coresdk.activity_task.ActivityCancelReason[reason] as CancelReason);
1041+
activity.cancel(
1042+
coresdk.activity_task.ActivityCancelReason[reason] as CancelReason,
1043+
ActivityCancellationDetails.fromProto(cancellationDetails)
1044+
);
10351045
}
10361046
break;
10371047
}

0 commit comments

Comments
 (0)