diff --git a/packages/client/src/interceptor-adapters.ts b/packages/client/src/interceptor-adapters.ts new file mode 100644 index 000000000..6a06b500f --- /dev/null +++ b/packages/client/src/interceptor-adapters.ts @@ -0,0 +1,41 @@ +import { WorkflowClientInterceptor, WorkflowStartInput, WorkflowStartOutput } from './interceptors'; + +export function adaptWorkflowClientInterceptor(i: WorkflowClientInterceptor): WorkflowClientInterceptor { + return adaptLegacyStartInterceptor(i); +} + +// Adapt legacy `start` interceptors to the new `startWithDetails` interceptor. +function adaptLegacyStartInterceptor(i: WorkflowClientInterceptor): WorkflowClientInterceptor { + // If it already has the new method, or doesn't have the legacy one, no adaptation is needed. + // eslint-disable-next-line deprecation/deprecation + if (i.startWithDetails || !i.start) { + return i; + } + + // This interceptor has a legacy `start` but not `startWithDetails`. We'll adapt it. + return { + ...i, + startWithDetails: async (input, next): Promise => { + let downstreamOut: WorkflowStartOutput | undefined; + + // Patched `next` for legacy `start` interceptors. + // Captures the full `WorkflowStartOutput` while returning `runId` as a string. + const patchedNext = async (patchedInput: WorkflowStartInput): Promise => { + downstreamOut = await next(patchedInput); + return downstreamOut.runId; + }; + + const runIdFromLegacyInterceptor = await i.start!(input, patchedNext); // eslint-disable-line deprecation/deprecation + + // If the interceptor short-circuited (didn't call `next`), `downstreamOut` will be undefined. + // In that case, we can't have an eager start. + if (downstreamOut === undefined) { + return { runId: runIdFromLegacyInterceptor, eagerlyStarted: false }; + } + + // If `next` was called, honor the `runId` from the legacy interceptor but preserve + // the `eagerlyStarted` status from the actual downstream call. + return { ...downstreamOut, runId: runIdFromLegacyInterceptor }; + }, + }; +} diff --git a/packages/client/src/interceptors.ts b/packages/client/src/interceptors.ts index 632a6ed71..c4626fffe 100644 --- a/packages/client/src/interceptors.ts +++ b/packages/client/src/interceptors.ts @@ -35,6 +35,12 @@ export interface WorkflowStartUpdateInput { readonly options: WorkflowUpdateOptions; } +/** Output for WorkflowClientInterceptor.startWithDetails */ +export interface WorkflowStartOutput { + readonly runId: string; + readonly eagerlyStarted: boolean; +} + /** Output for WorkflowClientInterceptor.startUpdate */ export interface WorkflowStartUpdateOutput { readonly updateId: string; @@ -118,8 +124,21 @@ export interface WorkflowClientInterceptor { * * If you implement this method, * {@link signalWithStart} most likely needs to be implemented too + * + * @deprecated in favour of {@link startWithDetails} */ start?: (input: WorkflowStartInput, next: Next) => Promise; + + /** + * Intercept a service call to startWorkflowExecution + * + * This method returns start details via {@link WorkflowStartOutput}. + * + * If you implement this method, + * {@link signalWithStart} most likely needs to be implemented too + */ + startWithDetails?: (input: WorkflowStartInput, next: Next) => Promise; + /** * Intercept a service call to updateWorkflowExecution */ diff --git a/packages/client/src/types.ts b/packages/client/src/types.ts index ea1ffbc71..c478222fd 100644 --- a/packages/client/src/types.ts +++ b/packages/client/src/types.ts @@ -164,6 +164,17 @@ export interface ConnectionLike { withAbortSignal(abortSignal: AbortSignal, fn: () => Promise): Promise; } +export const InternalConnectionLikeSymbol = Symbol('__temporal_internal_connection_like'); +export type InternalConnectionLike = ConnectionLike & { + [InternalConnectionLikeSymbol]?: { + /** + * Capability flag that determines whether the connection supports eager workflow start. + * This will only be true if the underlying connection is a {@link NativeConnection}. + */ + readonly supportsEagerStart?: boolean; + }; +}; + export const QueryRejectCondition = { NONE: 'NONE', NOT_OPEN: 'NOT_OPEN', diff --git a/packages/client/src/workflow-client.ts b/packages/client/src/workflow-client.ts index 727497316..a27aa6837 100644 --- a/packages/client/src/workflow-client.ts +++ b/packages/client/src/workflow-client.ts @@ -61,12 +61,15 @@ import { WorkflowStartUpdateOutput, WorkflowStartUpdateWithStartInput, WorkflowStartUpdateWithStartOutput, + WorkflowStartOutput, } from './interceptors'; import { CountWorkflowExecution, DescribeWorkflowExecutionResponse, encodeQueryRejectCondition, GetWorkflowExecutionHistoryRequest, + InternalConnectionLike, + InternalConnectionLikeSymbol, QueryRejectCondition, RequestCancelWorkflowExecutionResponse, StartWorkflowExecutionRequest, @@ -94,6 +97,7 @@ import { import { mapAsyncIterable } from './iterators-utils'; import { WorkflowUpdateStage, encodeWorkflowUpdateStage } from './workflow-update-stage'; import { InternalWorkflowStartOptionsSymbol, InternalWorkflowStartOptions } from './internal'; +import { adaptWorkflowClientInterceptor } from './interceptor-adapters'; const UpdateWorkflowExecutionLifecycleStage = temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage; @@ -263,6 +267,15 @@ export interface WorkflowHandleWithFirstExecutionRunId + extends WorkflowHandleWithFirstExecutionRunId { + readonly eagerlyStarted: boolean; +} + /** * This interface is exactly the same as {@link WorkflowHandle} except it * includes the `signaledRunId` returned from `signalWithStart`. @@ -514,14 +527,19 @@ export class WorkflowClient extends BaseClient { workflowTypeOrFunc: string | T, options: WorkflowStartOptions, interceptors: WorkflowClientInterceptor[] - ): Promise { + ): Promise { const workflowType = extractWorkflowType(workflowTypeOrFunc); assertRequiredWorkflowOptions(options); const compiledOptions = compileWorkflowOptions(ensureArgs(options)); + const adaptedInterceptors = interceptors.map((i) => adaptWorkflowClientInterceptor(i)); - const start = composeInterceptors(interceptors, 'start', this._startWorkflowHandler.bind(this)); + const startWithDetails = composeInterceptors( + adaptedInterceptors, + 'startWithDetails', + this._startWorkflowHandler.bind(this) + ); - return start({ + return startWithDetails({ options: compiledOptions, headers: {}, workflowType, @@ -537,7 +555,6 @@ export class WorkflowClient extends BaseClient { const { signal, signalArgs, ...rest } = options; assertRequiredWorkflowOptions(rest); const compiledOptions = compileWorkflowOptions(ensureArgs(rest)); - const signalWithStart = composeInterceptors( interceptors, 'signalWithStart', @@ -561,22 +578,25 @@ export class WorkflowClient extends BaseClient { public async start( workflowTypeOrFunc: string | T, options: WorkflowStartOptions - ): Promise> { + ): Promise> { const { workflowId } = options; const interceptors = this.getOrMakeInterceptors(workflowId); - const runId = await this._start(workflowTypeOrFunc, { ...options, workflowId }, interceptors); + const wfStartOutput = await this._start(workflowTypeOrFunc, { ...options, workflowId }, interceptors); // runId is not used in handles created with `start*` calls because these // handles should allow interacting with the workflow if it continues as new. - const handle = this._createWorkflowHandle({ + const baseHandle = this._createWorkflowHandle({ workflowId, runId: undefined, - firstExecutionRunId: runId, - runIdForResult: runId, + firstExecutionRunId: wfStartOutput.runId, + runIdForResult: wfStartOutput.runId, interceptors, followRuns: options.followRuns ?? true, - }) as WorkflowHandleWithFirstExecutionRunId; // Cast is safe because we know we add the firstExecutionRunId below - (handle as any) /* readonly */.firstExecutionRunId = runId; - return handle; + }); + return { + ...baseHandle, + firstExecutionRunId: wfStartOutput.runId, + eagerlyStarted: wfStartOutput.eagerlyStarted, + }; } /** @@ -1246,7 +1266,7 @@ export class WorkflowClient extends BaseClient { * * Used as the final function of the start interceptor chain */ - protected async _startWorkflowHandler(input: WorkflowStartInput): Promise { + protected async _startWorkflowHandler(input: WorkflowStartInput): Promise { const req = await this.createStartWorkflowRequest(input); const { options: opts, workflowType } = input; const internalOptions = (opts as InternalWorkflowStartOptions)[InternalWorkflowStartOptionsSymbol]; @@ -1255,7 +1275,10 @@ export class WorkflowClient extends BaseClient { if (internalOptions != null) { internalOptions.backLink = response.link ?? undefined; } - return response.runId; + return { + runId: response.runId, + eagerlyStarted: response.eagerWorkflowTask != null, + }; } catch (err: any) { if (err.code === grpcStatus.ALREADY_EXISTS) { throw new WorkflowExecutionAlreadyStartedError( @@ -1272,6 +1295,15 @@ export class WorkflowClient extends BaseClient { const { options: opts, workflowType, headers } = input; const { identity, namespace } = this.options; const internalOptions = (opts as InternalWorkflowStartOptions)[InternalWorkflowStartOptionsSymbol]; + const supportsEagerStart = (this.connection as InternalConnectionLike)?.[InternalConnectionLikeSymbol] + ?.supportsEagerStart; + + if (opts.requestEagerStart && !supportsEagerStart) { + throw new Error( + 'Eager workflow start requires a NativeConnection shared between client and worker. ' + + 'Pass a NativeConnection via ClientOptions.connection, or disable requestEagerStart.' + ); + } return { namespace, @@ -1303,6 +1335,7 @@ export class WorkflowClient extends BaseClient { userMetadata: await encodeUserMetadata(this.dataConverter, opts.staticSummary, opts.staticDetails), priority: opts.priority ? compilePriority(opts.priority) : undefined, versioningOverride: opts.versioningOverride ?? undefined, + requestEagerExecution: opts.requestEagerStart, ...filterNullAndUndefined(internalOptions ?? {}), }; } diff --git a/packages/client/src/workflow-options.ts b/packages/client/src/workflow-options.ts index 4d71cbec4..d40589ccc 100644 --- a/packages/client/src/workflow-options.ts +++ b/packages/client/src/workflow-options.ts @@ -54,6 +54,12 @@ export interface WorkflowOptions extends CommonWorkflowOptions { * @experimental Deployment based versioning is experimental and may change in the future. */ versioningOverride?: VersioningOverride; + + /** + * Potentially reduce the latency to start this workflow by requesting that the server + * start it on a local worker running with this same client. + */ + requestEagerStart?: boolean; } export type WithCompiledWorkflowOptions = Replace< @@ -97,7 +103,8 @@ export type WorkflowSignalWithStartOptions = Sign ? WorkflowSignalWithStartOptionsWithArgs : WorkflowSignalWithStartOptionsWithoutArgs; -export interface WorkflowSignalWithStartOptionsWithoutArgs extends WorkflowOptions { +export interface WorkflowSignalWithStartOptionsWithoutArgs + extends Omit { /** * SignalDefinition or name of signal */ @@ -109,7 +116,8 @@ export interface WorkflowSignalWithStartOptionsWithoutArgs extends WorkflowOptions { +export interface WorkflowSignalWithStartOptionsWithArgs + extends Omit { /** * SignalDefinition or name of signal */ diff --git a/packages/core-bridge/src/worker.rs b/packages/core-bridge/src/worker.rs index 9c8384288..ca6131f92 100644 --- a/packages/core-bridge/src/worker.rs +++ b/packages/core-bridge/src/worker.rs @@ -672,10 +672,8 @@ mod config { self.local_activity_task_slot_supplier .into_slot_supplier(&mut rbo), ); - tuner_holder.nexus_slot_options( - self.nexus_task_slot_supplier - .into_slot_supplier(&mut rbo) - ); + tuner_holder + .nexus_slot_options(self.nexus_task_slot_supplier.into_slot_supplier(&mut rbo)); if let Some(rbo) = rbo { tuner_holder.resource_based_options(rbo); } diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index 976849482..d217b341c 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -3,7 +3,7 @@ import { randomUUID } from 'crypto'; import asyncRetry from 'async-retry'; import { ExecutionContext } from 'ava'; import { firstValueFrom, Subject } from 'rxjs'; -import { WorkflowFailedError, WorkflowHandle } from '@temporalio/client'; +import { Client, WorkflowClient, WorkflowFailedError, WorkflowHandle } from '@temporalio/client'; import * as activity from '@temporalio/activity'; import { msToNumber, tsToMs } from '@temporalio/common/lib/time'; import { TestWorkflowEnvironment } from '@temporalio/testing'; @@ -1731,3 +1731,49 @@ test('Default handlers fail given reserved prefix', async (t) => { await handle.terminate(); }); }); + +export async function helloWorkflow(name: string): Promise { + return `Hello, ${name}!`; +} + +test('Workflow can be started eagerly with shared NativeConnection', async (t) => { + const { createWorker, taskQueue } = helpers(t); + const client = new Client({ + connection: t.context.env.nativeConnection, + namespace: t.context.env.client.options.namespace, + }); + + const worker = await createWorker(); + await worker.runUntil(async () => { + const handle = await client.workflow.start(helloWorkflow, { + args: ['Temporal'], + workflowId: `eager-workflow-${randomUUID()}`, + taskQueue, + requestEagerStart: true, + workflowTaskTimeout: '1h', // hang if retry needed + }); + + t.true(handle.eagerlyStarted); + + const result = await handle.result(); + t.is(result, 'Hello, Temporal!'); + }); +}); + +test('Error thrown when requestEagerStart is used with regular Connection', async (t) => { + const { taskQueue } = helpers(t); + + const client = new WorkflowClient({ connection: t.context.env.connection }); + + await t.throwsAsync( + client.start(helloWorkflow, { + args: ['Temporal'], + workflowId: `eager-workflow-error-${randomUUID()}`, + taskQueue, + requestEagerStart: true, + }), + { + message: /Eager workflow start requires a NativeConnection/, + } + ); +}); diff --git a/packages/worker/src/connection.ts b/packages/worker/src/connection.ts index 952d00422..b69ced2ac 100644 --- a/packages/worker/src/connection.ts +++ b/packages/worker/src/connection.ts @@ -11,6 +11,7 @@ import { OperatorService, HealthService, TestService, + InternalConnectionLikeSymbol, } from '@temporalio/client'; import { InternalConnectionOptions, InternalConnectionOptionsSymbol } from '@temporalio/client/lib/connection'; import { TransportError } from './errors'; @@ -93,6 +94,14 @@ export class NativeConnection implements ConnectionLike { false ); } + + // Set internal capability flag - not part of public API + Object.defineProperty(this, InternalConnectionLikeSymbol, { + value: { supportsEagerStart: true }, + writable: false, + enumerable: false, + configurable: false, + }); } /**