diff --git a/packages/worker/src/interceptors.ts b/packages/worker/src/interceptors.ts index d72e34ce9..31888f18d 100644 --- a/packages/worker/src/interceptors.ts +++ b/packages/worker/src/interceptors.ts @@ -1,21 +1,22 @@ -/** - * Definitions for Activity interceptors. - * - * (The Worker also accepts Workflow interceptors but those are passed as module names) - * - * @module - */ - import { Context as ActivityContext } from '@temporalio/activity'; import { ClientInterceptors } from '@temporalio/client'; import { Headers, MetricTags, Next } from '@temporalio/common'; export { Next, Headers }; -/** Input for ActivityInboundCallsInterceptor.execute */ -export interface ActivityExecuteInput { - readonly args: unknown[]; - readonly headers: Headers; +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Activity Interceptors +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +export type ActivityInterceptorsFactory = (ctx: ActivityContext) => ActivityInterceptors; + +/** + * A function that takes Activity Context and returns an interceptor + */ + +export interface ActivityInterceptors { + inbound?: ActivityInboundCallsInterceptor; + outbound?: ActivityOutboundCallsInterceptor; } /** @@ -27,24 +28,17 @@ export interface ActivityInboundCallsInterceptor { * * @return result of Activity function */ - execute?: (input: ActivityExecuteInput, next: Next) => Promise; + execute?: (input: ActivityExecuteInput, next: Next) => Promise; } /** - * A function that takes Activity Context and returns an interceptor - * - * @deprecated Use {@link ActivityInterceptorsFactory} instead + * Input for {@link ActivityInboundCallsInterceptor.execute} */ -export interface ActivityInboundCallsInterceptorFactory { - (ctx: ActivityContext): ActivityInboundCallsInterceptor; +export interface ActivityExecuteInput { + readonly args: unknown[]; + readonly headers: Headers; } -/** Input for ActivityOutboundCallsInterceptor.getLogAttributes */ -export type GetLogAttributesInput = Record; - -/** Input for ActivityOutboundCallsInterceptor.getMetricTags */ -export type GetMetricTagsInput = MetricTags; - /** * Implement any of these methods to intercept Activity outbound calls */ @@ -54,7 +48,10 @@ export interface ActivityOutboundCallsInterceptor { * * The attributes returned in this call are attached to every log message. */ - getLogAttributes?: (input: GetLogAttributesInput, next: Next) => Record; + getLogAttributes?: ( + input: GetLogAttributesInput, + next: Next + ) => Record; /** * Called once every time a metric is emitted from an Activity metric @@ -63,18 +60,34 @@ export interface ActivityOutboundCallsInterceptor { * Tags returned by this hook are _prepended_ to tags defined at the metric level and tags defined * on the emitter function itself. */ - getMetricTags?: (input: GetMetricTagsInput, next: Next) => MetricTags; + getMetricTags?: ( + input: GetMetricTagsInput, + next: Next + ) => MetricTags; } -export interface ActivityInterceptors { - inbound?: ActivityInboundCallsInterceptor; - outbound?: ActivityOutboundCallsInterceptor; -} +/** + * Input for {@link ActivityOutboundCallsInterceptor.getLogAttributes} + */ +export type GetLogAttributesInput = Record; + +/** + * Input for {@link ActivityOutboundCallsInterceptor.getMetricTags} + */ +export type GetMetricTagsInput = MetricTags; /** * A function that takes Activity Context and returns an interceptor + * + * @deprecated Use {@link ActivityInterceptorsFactory} instead */ -export type ActivityInterceptorsFactory = (ctx: ActivityContext) => ActivityInterceptors; +export interface ActivityInboundCallsInterceptorFactory { + (ctx: ActivityContext): ActivityInboundCallsInterceptor; +} + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +// Worker Interceptors Configuration +//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// /** * Structure for passing in Worker interceptors via {@link WorkerOptions} @@ -88,11 +101,13 @@ export interface WorkerInterceptors { * instead. Client doesn't support cancellation through a Signal. */ client?: ClientInterceptors; + /** * List of factory functions that instanciate {@link ActivityInboundCallsInterceptor}s and * {@link ActivityOutboundCallsInterceptor}s. */ activity?: ActivityInterceptorsFactory[]; + /** * List of factory functions returning {@link ActivityInboundCallsInterceptor}s. If both `activity` and * `activityInbound` is supplied, then entries from `activityInbound` will be prepended to inbound interceptors @@ -101,6 +116,7 @@ export interface WorkerInterceptors { * @deprecated Use {@link WorkerInterceptors.activity} instead. */ activityInbound?: ActivityInboundCallsInterceptorFactory[]; // eslint-disable-line deprecation/deprecation + /** * List of modules to search for Workflow interceptors in * - Modules should export an `interceptors` variable of type {@link WorkflowInterceptorsFactory} diff --git a/packages/workflow/src/interceptors.ts b/packages/workflow/src/interceptors.ts index ae2f91fa7..b7ce3395e 100644 --- a/packages/workflow/src/interceptors.ts +++ b/packages/workflow/src/interceptors.ts @@ -1,7 +1,5 @@ /** - * Type definitions and generic helpers for interceptors. - * - * The Workflow specific interceptors are defined here. + * Type definitions for Workflow interceptors. * * @module */ @@ -20,14 +18,83 @@ import { ChildWorkflowOptionsWithDefaults, ContinueAsNewOptions } from './interf export { Next, Headers }; -/** Input for WorkflowInboundCallsInterceptor.execute */ +/** + * A function that instantiates {@link WorkflowInterceptors}. + * + * Workflow interceptor modules should export an `interceptors` function of this type. + * + * @example + * + * ```ts + * export function interceptors(): WorkflowInterceptors { + * return { + * inbound: [], // Populate with list of inbound interceptor implementations + * outbound: [], // Populate with list of outbound interceptor implementations + * internals: [], // Populate with list of internals interceptor implementations + * }; + * } + * ``` + */ +export type WorkflowInterceptorsFactory = () => WorkflowInterceptors; + +/** + * A mapping from interceptor type to an optional list of interceptor implementations + */ +export interface WorkflowInterceptors { + inbound?: WorkflowInboundCallsInterceptor[]; + outbound?: WorkflowOutboundCallsInterceptor[]; + internals?: WorkflowInternalsInterceptor[]; +} + +// Workflow Inbound Calls Interceptors ///////////////////////////////////////////////////////////////////////////////// + +/** + * Implement any of these methods to intercept Workflow inbound calls like execution, and signal and query handling. + */ +export interface WorkflowInboundCallsInterceptor { + /** + * Called when Workflow execute method is called + * + * @return result of the Workflow execution + */ + execute?: (input: WorkflowExecuteInput, next: Next) => Promise; + + /** + * Called when Update handler is called + * + * @return result of the Update + */ + handleUpdate?: (input: UpdateInput, next: Next) => Promise; + + /** + * Called when update validator called + */ + validateUpdate?: (input: UpdateInput, next: Next) => void; + + /** + * Called when signal is delivered to a Workflow execution + */ + handleSignal?: (input: SignalInput, next: Next) => Promise; + + /** + * Called when a Workflow is queried + * + * @return result of the query + */ + handleQuery?: (input: QueryInput, next: Next) => Promise; +} + +/** + * Input for {@link WorkflowInboundCallsInterceptor.execute}. + */ export interface WorkflowExecuteInput { readonly args: unknown[]; readonly headers: Headers; } -/** Input for WorkflowInboundCallsInterceptor.handleUpdate and - * WorkflowInboundCallsInterceptor.validateUpdate */ +/** + * Input for {@link WorkflowInboundCallsInterceptor.handleUpdate} and {@link WorkflowInboundCallsInterceptor.validateUpdate}. + */ export interface UpdateInput { readonly updateId: string; readonly name: string; @@ -35,14 +102,18 @@ export interface UpdateInput { readonly headers: Headers; } -/** Input for WorkflowInboundCallsInterceptor.handleSignal */ +/** + * Input for {@link WorkflowInboundCallsInterceptor.handleSignal}. + */ export interface SignalInput { readonly signalName: string; readonly args: unknown[]; readonly headers: Headers; } -/** Input for WorkflowInboundCallsInterceptor.handleQuery */ +/** + * Input for {@link WorkflowInboundCallsInterceptor.handleQuery}. + */ export interface QueryInput { readonly queryId: string; readonly queryName: string; @@ -50,38 +121,109 @@ export interface QueryInput { readonly headers: Headers; } +// Workflow Outbound Calls Interceptors ///////////////////////////////////////////////////////////////////////////////// + /** - * Implement any of these methods to intercept Workflow inbound calls like execution, and signal and query handling. + * Implement any of these methods to intercept Workflow code calls to the Temporal APIs, like scheduling an activity + * and starting a timer. */ -export interface WorkflowInboundCallsInterceptor { +export interface WorkflowOutboundCallsInterceptor { /** - * Called when Workflow execute method is called + * Called when Workflow starts a timer. + */ + startTimer?: (input: TimerInput, next: Next) => Promise; + + /** + * Called when Workflow schedules an Activity. * - * @return result of the Workflow execution + * @return result of the activity execution */ - execute?: (input: WorkflowExecuteInput, next: Next) => Promise; + scheduleActivity?: ( + input: ActivityInput, + next: Next + ) => Promise; - /** Called when Update handler is called + /** + * Called when Workflow schedules a local Activity. * - * @return result of the Update + * @return result of the activity execution + */ + scheduleLocalActivity?: ( + input: LocalActivityInput, + next: Next + ) => Promise; + + /** + * Called when Workflow starts a child workflow execution. + * + * The interceptor function returns 2 promises: + * - The first resolves with the `runId` when the child workflow has started or rejects if failed to start. + * - The second resolves with the workflow result when the child workflow completes or rejects on failure. */ - handleUpdate?: (input: UpdateInput, next: Next) => Promise; + startChildWorkflowExecution?: ( + input: StartChildWorkflowExecutionInput, + next: Next + ) => Promise<[Promise, Promise]>; - /** Called when update validator called */ - validateUpdate?: (input: UpdateInput, next: Next) => void; + /** + * Called when Workflow signals a child or external Workflow. + */ + signalWorkflow?: ( + input: SignalWorkflowInput, + next: Next + ) => Promise; - /** Called when signal is delivered to a Workflow execution */ - handleSignal?: (input: SignalInput, next: Next) => Promise; + /** + * Called when Workflow calls continueAsNew. + */ + continueAsNew?: ( + input: ContinueAsNewInput, + next: Next + ) => Promise; /** - * Called when a Workflow is queried + * Called on each invocation of the `workflow.log` methods. * - * @return result of the query + * The attributes returned in this call are attached to every log message. */ - handleQuery?: (input: QueryInput, next: Next) => Promise; + getLogAttributes?: ( + input: GetLogAttributesInput, + next: Next + ) => Record; + + /** + * Called once every time a metric is emitted from a Workflow metric (ie. a metric created + * from {@link workflow.metricMeter}). + * + * Tags returned by this hook are _prepended_ to tags defined at the metric level and tags + * defined on the emitter function itself. + */ + getMetricTags?: ( + input: GetMetricTagsInput, + next: Next + ) => MetricTags; +} + +/** + * Input for {@link WorkflowOutboundCallsInterceptor.startTimer} + */ +export interface TimerInput { + readonly durationMs: number; + readonly seq: number; + readonly options?: TimerOptions; } -/** Input for WorkflowOutboundCallsInterceptor.scheduleActivity */ +/** + * Options for starting a timer (i.e. sleep) + */ +export interface TimerOptions { + /** @experimental A fixed, single line summary of the command's purpose */ + readonly summary?: string; +} + +/** + * Input for {@link WorkflowOutboundCallsInterceptor.scheduleActivity}. + */ export interface ActivityInput { readonly activityType: string; readonly args: unknown[]; @@ -90,7 +232,9 @@ export interface ActivityInput { readonly seq: number; } -/** Input for WorkflowOutboundCallsInterceptor.scheduleLocalActivity */ +/** + * Input for {@link WorkflowOutboundCallsInterceptor.scheduleLocalActivity}. + */ export interface LocalActivityInput { readonly activityType: string; readonly args: unknown[]; @@ -101,7 +245,9 @@ export interface LocalActivityInput { readonly attempt: number; } -/** Input for WorkflowOutboundCallsInterceptor.startChildWorkflowExecution */ +/** + * Input for {@link WorkflowOutboundCallsInterceptor.startChildWorkflowExecution}. + */ export interface StartChildWorkflowExecutionInput { readonly workflowType: string; readonly options: ChildWorkflowOptionsWithDefaults; @@ -109,32 +255,9 @@ export interface StartChildWorkflowExecutionInput { readonly seq: number; } -/** Input for WorkflowOutboundCallsInterceptor.startTimer */ -export interface TimerInput { - readonly durationMs: number; - readonly seq: number; - readonly options?: TimerOptions; -} - -/** Options for starting a timer (i.e. sleep) */ -export interface TimerOptions { - /** @experimental A fixed, single line summary of the command's purpose */ - readonly summary?: string; -} - /** - * Same as ContinueAsNewOptions but workflowType must be defined + * Input for {@link WorkflowOutboundCallsInterceptor.signalWorkflow}. */ -export type ContinueAsNewInputOptions = ContinueAsNewOptions & Required>; - -/** Input for WorkflowOutboundCallsInterceptor.continueAsNew */ -export interface ContinueAsNewInput { - readonly args: unknown[]; - readonly headers: Headers; - readonly options: ContinueAsNewInputOptions; -} - -/** Input for WorkflowOutboundCallsInterceptor.signalWorkflow */ export interface SignalWorkflowInput { readonly seq: number; readonly signalName: string; @@ -151,90 +274,31 @@ export interface SignalWorkflowInput { }; } -/** Input for WorkflowOutboundCallsInterceptor.getLogAttributes */ -export type GetLogAttributesInput = Record; - -/** Input for WorkflowOutboundCallsInterceptor.getMetricTags */ -export type GetMetricTagsInput = MetricTags; - /** - * Implement any of these methods to intercept Workflow code calls to the Temporal APIs, like scheduling an activity and starting a timer + * Input for {@link WorkflowOutboundCallsInterceptor.continueAsNew}. */ -export interface WorkflowOutboundCallsInterceptor { - /** - * Called when Workflow schedules an Activity - * - * @return result of the activity execution - */ - scheduleActivity?: (input: ActivityInput, next: Next) => Promise; - - /** - * Called when Workflow schedules a local Activity - * - * @return result of the activity execution - */ - scheduleLocalActivity?: (input: LocalActivityInput, next: Next) => Promise; - - /** - * Called when Workflow starts a timer - */ - startTimer?: (input: TimerInput, next: Next) => Promise; - - /** - * Called when Workflow calls continueAsNew - */ - continueAsNew?: (input: ContinueAsNewInput, next: Next) => Promise; - - /** - * Called when Workflow signals a child or external Workflow - */ - signalWorkflow?: (input: SignalWorkflowInput, next: Next) => Promise; - - /** - * Called when Workflow starts a child workflow execution, the interceptor function returns 2 promises: - * - * - The first resolves with the `runId` when the child workflow has started or rejects if failed to start. - * - The second resolves with the workflow result when the child workflow completes or rejects on failure. - */ - startChildWorkflowExecution?: ( - input: StartChildWorkflowExecutionInput, - next: Next - ) => Promise<[Promise, Promise]>; - - /** - * Called on each invocation of the `workflow.log` methods. - * - * The attributes returned in this call are attached to every log message. - */ - getLogAttributes?: (input: GetLogAttributesInput, next: Next) => Record; - - /** - * Called once every time a metric is emitted from a Workflow metric (ie. a metric created - * from {@link workflow.metricMeter}). - * - * Tags returned by this hook are _prepended_ to tags defined at the metric level and tags - * defined on the emitter function itself. - */ - getMetricTags?: (input: GetMetricTagsInput, next: Next) => MetricTags; +export interface ContinueAsNewInput { + readonly args: unknown[]; + readonly headers: Headers; + readonly options: ContinueAsNewInputOptions; } -/** Input for WorkflowInternalsInterceptor.concludeActivation */ -export interface ConcludeActivationInput { - commands: coresdk.workflow_commands.IWorkflowCommand[]; -} +/** + * Input for {@link WorkflowOutboundCallsInterceptor.continueAsNew}. + */ +export type ContinueAsNewInputOptions = ContinueAsNewOptions & Required>; -/** Output for WorkflowInternalsInterceptor.concludeActivation */ -export type ConcludeActivationOutput = ConcludeActivationInput; +/** + * Input for {@link WorkflowOutboundCallsInterceptor.getLogAttributes}. + */ +export type GetLogAttributesInput = Record; -/** Input for WorkflowInternalsInterceptor.activate */ -export interface ActivateInput { - activation: coresdk.workflow_activation.IWorkflowActivation; - batchIndex: number; -} +/** + * Input for {@link WorkflowOutboundCallsInterceptor.getMetricTags}. + */ +export type GetMetricTagsInput = MetricTags; -/** Input for WorkflowInternalsInterceptor.dispose */ -// eslint-disable-next-line @typescript-eslint/no-empty-object-type -export interface DisposeInput {} +// Workflow Internals Interceptors ///////////////////////////////////////////////////////////////////////////////////// /** * Interceptor for the internals of the Workflow runtime. @@ -265,29 +329,27 @@ export interface WorkflowInternalsInterceptor { } /** - * A mapping from interceptor type to an optional list of interceptor implementations + * Input for {@link WorkflowInternalsInterceptor.activate} */ -export interface WorkflowInterceptors { - inbound?: WorkflowInboundCallsInterceptor[]; - outbound?: WorkflowOutboundCallsInterceptor[]; - internals?: WorkflowInternalsInterceptor[]; +export interface ActivateInput { + activation: coresdk.workflow_activation.IWorkflowActivation; + batchIndex: number; } /** - * A function that returns {@link WorkflowInterceptors} and takes no arguments. - * - * Workflow interceptor modules should export an `interceptors` function of this type. - * - * @example - * - * ```ts - * export function interceptors(): WorkflowInterceptors { - * return { - * inbound: [], // Populate with list of interceptor implementations - * outbound: [], // Populate with list of interceptor implementations - * internals: [], // Populate with list of interceptor implementations - * }; - * } - * ``` + * Input for {@link WorkflowInternalsInterceptor.dispose} */ -export type WorkflowInterceptorsFactory = () => WorkflowInterceptors; +// eslint-disable-next-line @typescript-eslint/no-empty-object-type +export interface DisposeInput {} + +/** + * Input for {@link WorkflowInternalsInterceptor.concludeActivation} + */ +export interface ConcludeActivationInput { + commands: coresdk.workflow_commands.IWorkflowCommand[]; +} + +/** + * Output for {@link WorkflowInternalsInterceptor.concludeActivation} + */ +export type ConcludeActivationOutput = ConcludeActivationInput; diff --git a/packages/workflow/src/internals.ts b/packages/workflow/src/internals.ts index a3d008959..d180809e4 100644 --- a/packages/workflow/src/internals.ts +++ b/packages/workflow/src/internals.ts @@ -95,10 +95,9 @@ export interface PromiseStackStore { promiseToStack: Map, Stack>; } -export interface Completion { - resolve(val: unknown): unknown; - - reject(reason: unknown): unknown; +export interface Completion { + resolve(val: Success): void; + reject(reason: Error): void; } export interface Condition { @@ -127,6 +126,8 @@ interface MessageHandlerExecution { id?: string; } +type InferMapValue = T extends Map ? V : never; + /** * Keeps all of the Workflow runtime state like pending completions for activities and timers. * @@ -150,16 +151,17 @@ export class Activator implements ActivationHandler { * Cache for modules - referenced in reusable-vm.ts */ readonly moduleCache = new Map(); + /** * Map of task sequence to a Completion */ readonly completions = { - timer: new Map(), - activity: new Map(), - childWorkflowStart: new Map(), - childWorkflowComplete: new Map(), - signalWorkflow: new Map(), - cancelWorkflow: new Map(), + timer: new Map>(), + activity: new Map>(), + childWorkflowStart: new Map>(), + childWorkflowComplete: new Map>(), + signalWorkflow: new Map>(), + cancelWorkflow: new Map>(), }; /** @@ -519,7 +521,7 @@ export class Activator implements ActivationHandler { public async startWorkflowNextHandler({ args }: WorkflowExecuteInput): Promise { const { workflow } = this; - if (workflow === undefined) { + if (workflow == null) { throw new IllegalStateError('Workflow uninitialized'); } return await workflow(...args); @@ -583,12 +585,16 @@ export class Activator implements ActivationHandler { resolve(result); } else if (activation.result.failed) { const { failure } = activation.result.failed; - const err = failure ? this.failureToError(failure) : undefined; - reject(err); + if (failure == null) { + throw new TypeError('Got failed result with no failure attribute'); + } + reject(this.failureToError(failure)); } else if (activation.result.cancelled) { const { failure } = activation.result.cancelled; - const err = failure ? this.failureToError(failure) : undefined; - reject(err); + if (failure == null) { + throw new TypeError('Got cancelled result with no failure attribute'); + } + reject(this.failureToError(failure)); } else if (activation.result.backoff) { reject(new LocalActivityDoBackoff(activation.result.backoff)); } @@ -599,6 +605,9 @@ export class Activator implements ActivationHandler { ): void { const { resolve, reject } = this.consumeCompletion('childWorkflowStart', getSeq(activation)); if (activation.succeeded) { + if (!activation.succeeded.runId) { + throw new TypeError('Got ResolveChildWorkflowExecutionStart with no runId'); + } resolve(activation.succeeded.runId); } else if (activation.failed) { if (decodeStartChildWorkflowExecutionFailedCause(activation.failed.cause) !== 'WORKFLOW_ALREADY_EXISTS') { @@ -635,13 +644,13 @@ export class Activator implements ActivationHandler { resolve(result); } else if (activation.result.failed) { const { failure } = activation.result.failed; - if (failure === undefined || failure === null) { + if (failure == null) { throw new TypeError('Got failed result with no failure attribute'); } reject(this.failureToError(failure)); } else if (activation.result.cancelled) { const { failure } = activation.result.cancelled; - if (failure === undefined || failure === null) { + if (failure == null) { throw new TypeError('Got cancelled result with no failure attribute'); } reject(this.failureToError(failure)); @@ -749,7 +758,7 @@ export class Activator implements ActivationHandler { : null); // If we don't have an entry from either source, buffer and return - if (entry === null) { + if (entry == null) { this.bufferedUpdates.push(activation); return; } @@ -1152,16 +1161,22 @@ export class Activator implements ActivationHandler { } /** Consume a completion if it exists in Workflow state */ - private maybeConsumeCompletion(type: keyof Activator['completions'], taskSeq: number): Completion | undefined { + private maybeConsumeCompletion( + type: K, + taskSeq: number + ): InferMapValue | undefined { const completion = this.completions[type].get(taskSeq); if (completion !== undefined) { this.completions[type].delete(taskSeq); } - return completion; + return completion as InferMapValue | undefined; } /** Consume a completion if it exists in Workflow state, throws if it doesn't */ - private consumeCompletion(type: keyof Activator['completions'], taskSeq: number): Completion { + private consumeCompletion( + type: K, + taskSeq: number + ): InferMapValue { const completion = this.maybeConsumeCompletion(type, taskSeq); if (completion === undefined) { throw new IllegalStateError(`No completion for taskSeq ${taskSeq}`); @@ -1191,7 +1206,7 @@ export class Activator implements ActivationHandler { function getSeq(activation: T): number { const seq = activation.seq; - if (seq === undefined || seq === null) { + if (seq == null) { throw new TypeError(`Got activation with no seq attribute`); } return seq;