Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 67 additions & 85 deletions packages/activity/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,100 +240,82 @@ export class Context {
}

/**
* Holds information about the current executing Activity.
*/
public readonly info: Info;

/**
* A Promise that fails with a {@link CancelledFailure} when cancellation of this activity is requested. The promise
* is guaranteed to never successfully resolve. Await this promise in an Activity to get notified of cancellation.
*
* Note that to get notified of cancellation, an activity must _also_ {@link Context.heartbeat}.
* **Not** meant to instantiated by Activity code, used by the worker.
*
* @see [Cancellation](/api/namespaces/activity#cancellation)
* @ignore
*/
public readonly cancelled: Promise<never>;
constructor(
/**
* Holds information about the current executing Activity.
*/
public readonly info: Info,

/**
* An {@link https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal | `AbortSignal`} that can be used to react to
* Activity cancellation.
*
* This can be passed in to libraries such as
* {@link https://www.npmjs.com/package/node-fetch#request-cancellation-with-abortsignal | fetch} to abort an
* in-progress request and
* {@link https://nodejs.org/api/child_process.html#child_process_child_process_spawn_command_args_options child_process}
* to abort a child process, as well as other built-in node modules and modules found on npm.
*
* Note that to get notified of cancellation, an activity must _also_ {@link Context.heartbeat}.
*
* @see [Cancellation](/api/namespaces/activity#cancellation)
*/
public readonly cancellationSignal: AbortSignal;
/**
* A Promise that fails with a {@link CancelledFailure} when cancellation of this activity is requested. The promise
* is guaranteed to never successfully resolve. Await this promise in an Activity to get notified of cancellation.
*
* Note that to get notified of cancellation, an activity must _also_ {@link Context.heartbeat}.
*
* @see [Cancellation](/api/namespaces/activity#cancellation)
*/
public readonly cancelled: Promise<never>,

/**
* The heartbeat implementation, injected via the constructor.
*/
protected readonly heartbeatFn: (details?: any) => void;
/**
* An {@link https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal | `AbortSignal`} that can be used to react to
* Activity cancellation.
*
* This can be passed in to libraries such as
* {@link https://www.npmjs.com/package/node-fetch#request-cancellation-with-abortsignal | fetch} to abort an
* in-progress request and
* {@link https://nodejs.org/api/child_process.html#child_process_child_process_spawn_command_args_options child_process}
* to abort a child process, as well as other built-in node modules and modules found on npm.
*
* Note that to get notified of cancellation, an activity must _also_ {@link Context.heartbeat}.
*
* @see [Cancellation](/api/namespaces/activity#cancellation)
*/
public readonly cancellationSignal: AbortSignal,

/**
* The Worker's client, passed down through Activity context.
*/
protected readonly _client: Client | undefined;
/**
* The heartbeat implementation, injected via the constructor.
*/
protected readonly heartbeatFn: (details?: any) => void,

/**
* The logger for this Activity.
*
* This defaults to the `Runtime`'s Logger (see {@link Runtime.logger}). Attributes from the current Activity context
* are automatically included as metadata on every log entries. An extra `sdkComponent` metadata attribute is also
* added, with value `activity`; this can be used for fine-grained filtering of log entries further downstream.
*
* To customize log attributes, register a {@link ActivityOutboundCallsInterceptor} that intercepts the
* `getLogAttributes()` method.
*
* Modifying the context logger (eg. `context.log = myCustomLogger` or by an {@link ActivityInboundLogInterceptor}
* with a custom logger as argument) is deprecated. Doing so will prevent automatic inclusion of custom log attributes
* through the `getLogAttributes()` interceptor. To customize _where_ log messages are sent, set the
* {@link Runtime.logger} property instead.
*/
public log: Logger;
/**
* The Worker's client, passed down through Activity context.
*/
protected readonly _client: Client | undefined,

/**
* Get the metric meter for this activity with activity-specific tags.
*
* To add custom tags, register a {@link ActivityOutboundCallsInterceptor} that
* intercepts the `getMetricTags()` method.
*/
public readonly metricMeter: MetricMeter;
/**
* The logger for this Activity.
*
* This defaults to the `Runtime`'s Logger (see {@link Runtime.logger}). Attributes from the current Activity context
* are automatically included as metadata on every log entries. An extra `sdkComponent` metadata attribute is also
* added, with value `activity`; this can be used for fine-grained filtering of log entries further downstream.
*
* To customize log attributes, register a {@link ActivityOutboundCallsInterceptor} that intercepts the
* `getLogAttributes()` method.
*
* Modifying the context logger (eg. `context.log = myCustomLogger` or by an {@link ActivityInboundLogInterceptor}
* with a custom logger as argument) is deprecated. Doing so will prevent automatic inclusion of custom log attributes
* through the `getLogAttributes()` interceptor. To customize _where_ log messages are sent, set the
* {@link Runtime.logger} property instead.
*/
public log: Logger,

/**
* Holder object for activity cancellation details
*/
private readonly _cancellationDetails: ActivityCancellationDetailsHolder;
/**
* Get the metric meter for this activity with activity-specific tags.
*
* To add custom tags, register a {@link ActivityOutboundCallsInterceptor} that
* intercepts the `getMetricTags()` method.
*/
public readonly metricMeter: MetricMeter,

/**
* **Not** meant to instantiated by Activity code, used by the worker.
*
* @ignore
*/
constructor(
info: Info,
cancelled: Promise<never>,
cancellationSignal: AbortSignal,
heartbeat: (details?: any) => void,
client: Client | undefined,
log: Logger,
metricMeter: MetricMeter,
details: ActivityCancellationDetailsHolder
) {
this.info = info;
this.cancelled = cancelled;
this.cancellationSignal = cancellationSignal;
this.heartbeatFn = heartbeat;
this._client = client;
this.log = log;
this.metricMeter = metricMeter;
this._cancellationDetails = details;
}
/**
* Holder object for activity cancellation details
*/
protected readonly _cancellationDetails: ActivityCancellationDetailsHolder
) {}

/**
* Send a {@link https://docs.temporal.io/concepts/what-is-an-activity-heartbeat | heartbeat} from an Activity.
Expand Down
78 changes: 47 additions & 31 deletions packages/worker/src/interceptors.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
/**
* Definitions for Activity interceptors.
*
* (The Worker also accepts Workflow interceptors but those are passed as module names)
*
* @module
*/

import { Context as ActivityContext } from '@temporalio/activity';
import { ClientInterceptors } from '@temporalio/client';
import { Headers, MetricTags, Next } from '@temporalio/common';

export { Next, Headers };

/** Input for ActivityInboundCallsInterceptor.execute */
export interface ActivityExecuteInput {
readonly args: unknown[];
readonly headers: Headers;
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Activity Interceptors
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

export type ActivityInterceptorsFactory = (ctx: ActivityContext) => ActivityInterceptors;

/**
* A function that takes Activity Context and returns an interceptor
*/

export interface ActivityInterceptors {
inbound?: ActivityInboundCallsInterceptor;
outbound?: ActivityOutboundCallsInterceptor;
}

/**
Expand All @@ -27,24 +28,17 @@ export interface ActivityInboundCallsInterceptor {
*
* @return result of Activity function
*/
execute?: (input: ActivityExecuteInput, next: Next<this, 'execute'>) => Promise<unknown>;
execute?: (input: ActivityExecuteInput, next: Next<ActivityInboundCallsInterceptor, 'execute'>) => Promise<unknown>;
}

/**
* A function that takes Activity Context and returns an interceptor
*
* @deprecated Use {@link ActivityInterceptorsFactory} instead
* Input for {@link ActivityInboundCallsInterceptor.execute}
*/
export interface ActivityInboundCallsInterceptorFactory {
(ctx: ActivityContext): ActivityInboundCallsInterceptor;
export interface ActivityExecuteInput {
readonly args: unknown[];
readonly headers: Headers;
}

/** Input for ActivityOutboundCallsInterceptor.getLogAttributes */
export type GetLogAttributesInput = Record<string, unknown>;

/** Input for ActivityOutboundCallsInterceptor.getMetricTags */
export type GetMetricTagsInput = MetricTags;

/**
* Implement any of these methods to intercept Activity outbound calls
*/
Expand All @@ -54,7 +48,10 @@ export interface ActivityOutboundCallsInterceptor {
*
* The attributes returned in this call are attached to every log message.
*/
getLogAttributes?: (input: GetLogAttributesInput, next: Next<this, 'getLogAttributes'>) => Record<string, unknown>;
getLogAttributes?: (
input: GetLogAttributesInput,
next: Next<ActivityOutboundCallsInterceptor, 'getLogAttributes'>
) => Record<string, unknown>;

/**
* Called once every time a metric is emitted from an Activity metric
Expand All @@ -63,18 +60,34 @@ export interface ActivityOutboundCallsInterceptor {
* Tags returned by this hook are _prepended_ to tags defined at the metric level and tags defined
* on the emitter function itself.
*/
getMetricTags?: (input: GetMetricTagsInput, next: Next<this, 'getMetricTags'>) => MetricTags;
getMetricTags?: (
input: GetMetricTagsInput,
next: Next<ActivityOutboundCallsInterceptor, 'getMetricTags'>
) => MetricTags;
}

export interface ActivityInterceptors {
inbound?: ActivityInboundCallsInterceptor;
outbound?: ActivityOutboundCallsInterceptor;
}
/**
* Input for {@link ActivityOutboundCallsInterceptor.getLogAttributes}
*/
export type GetLogAttributesInput = Record<string, unknown>;

/**
* Input for {@link ActivityOutboundCallsInterceptor.getMetricTags}
*/
export type GetMetricTagsInput = MetricTags;

/**
* A function that takes Activity Context and returns an interceptor
*
* @deprecated Use {@link ActivityInterceptorsFactory} instead
*/
export type ActivityInterceptorsFactory = (ctx: ActivityContext) => ActivityInterceptors;
export interface ActivityInboundCallsInterceptorFactory {
(ctx: ActivityContext): ActivityInboundCallsInterceptor;
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Worker Interceptors Configuration
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

/**
* Structure for passing in Worker interceptors via {@link WorkerOptions}
Expand All @@ -88,11 +101,13 @@ export interface WorkerInterceptors {
* instead. Client doesn't support cancellation through a Signal.
*/
client?: ClientInterceptors;

/**
* List of factory functions that instanciate {@link ActivityInboundCallsInterceptor}s and
* {@link ActivityOutboundCallsInterceptor}s.
*/
activity?: ActivityInterceptorsFactory[];

/**
* List of factory functions returning {@link ActivityInboundCallsInterceptor}s. If both `activity` and
* `activityInbound` is supplied, then entries from `activityInbound` will be prepended to inbound interceptors
Expand All @@ -101,6 +116,7 @@ export interface WorkerInterceptors {
* @deprecated Use {@link WorkerInterceptors.activity} instead.
*/
activityInbound?: ActivityInboundCallsInterceptorFactory[]; // eslint-disable-line deprecation/deprecation

/**
* List of modules to search for Workflow interceptors in
* - Modules should export an `interceptors` variable of type {@link WorkflowInterceptorsFactory}
Expand Down
Loading
Loading