Skip to content

Commit 49df572

Browse files
authored
Eager Workflow Start (#1757)
1 parent fcda0b7 commit 49df572

File tree

7 files changed

+184
-17
lines changed

7 files changed

+184
-17
lines changed
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import { WorkflowClientInterceptor, WorkflowStartInput, WorkflowStartOutput } from './interceptors';
2+
3+
export function adaptWorkflowClientInterceptor(i: WorkflowClientInterceptor): WorkflowClientInterceptor {
4+
return adaptLegacyStartInterceptor(i);
5+
}
6+
7+
// Adapt legacy `start` interceptors to the new `startWithDetails` interceptor.
8+
function adaptLegacyStartInterceptor(i: WorkflowClientInterceptor): WorkflowClientInterceptor {
9+
// If it already has the new method, or doesn't have the legacy one, no adaptation is needed.
10+
// eslint-disable-next-line deprecation/deprecation
11+
if (i.startWithDetails || !i.start) {
12+
return i;
13+
}
14+
15+
// This interceptor has a legacy `start` but not `startWithDetails`. We'll adapt it.
16+
return {
17+
...i,
18+
startWithDetails: async (input, next): Promise<WorkflowStartOutput> => {
19+
let downstreamOut: WorkflowStartOutput | undefined;
20+
21+
// Patched `next` for legacy `start` interceptors.
22+
// Captures the full `WorkflowStartOutput` while returning `runId` as a string.
23+
const patchedNext = async (patchedInput: WorkflowStartInput): Promise<string> => {
24+
downstreamOut = await next(patchedInput);
25+
return downstreamOut.runId;
26+
};
27+
28+
const runIdFromLegacyInterceptor = await i.start!(input, patchedNext); // eslint-disable-line deprecation/deprecation
29+
30+
// If the interceptor short-circuited (didn't call `next`), `downstreamOut` will be undefined.
31+
// In that case, we can't have an eager start.
32+
if (downstreamOut === undefined) {
33+
return { runId: runIdFromLegacyInterceptor, eagerlyStarted: false };
34+
}
35+
36+
// If `next` was called, honor the `runId` from the legacy interceptor but preserve
37+
// the `eagerlyStarted` status from the actual downstream call.
38+
return { ...downstreamOut, runId: runIdFromLegacyInterceptor };
39+
},
40+
};
41+
}

packages/client/src/interceptors.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ export interface WorkflowStartUpdateInput {
3535
readonly options: WorkflowUpdateOptions;
3636
}
3737

38+
/** Output for WorkflowClientInterceptor.startWithDetails */
39+
export interface WorkflowStartOutput {
40+
readonly runId: string;
41+
readonly eagerlyStarted: boolean;
42+
}
43+
3844
/** Output for WorkflowClientInterceptor.startUpdate */
3945
export interface WorkflowStartUpdateOutput {
4046
readonly updateId: string;
@@ -118,8 +124,21 @@ export interface WorkflowClientInterceptor {
118124
*
119125
* If you implement this method,
120126
* {@link signalWithStart} most likely needs to be implemented too
127+
*
128+
* @deprecated in favour of {@link startWithDetails}
121129
*/
122130
start?: (input: WorkflowStartInput, next: Next<this, 'start'>) => Promise<string /* runId */>;
131+
132+
/**
133+
* Intercept a service call to startWorkflowExecution
134+
*
135+
* This method returns start details via {@link WorkflowStartOutput}.
136+
*
137+
* If you implement this method,
138+
* {@link signalWithStart} most likely needs to be implemented too
139+
*/
140+
startWithDetails?: (input: WorkflowStartInput, next: Next<this, 'startWithDetails'>) => Promise<WorkflowStartOutput>;
141+
123142
/**
124143
* Intercept a service call to updateWorkflowExecution
125144
*/

packages/client/src/types.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,17 @@ export interface ConnectionLike {
164164
withAbortSignal<R>(abortSignal: AbortSignal, fn: () => Promise<R>): Promise<R>;
165165
}
166166

167+
export const InternalConnectionLikeSymbol = Symbol('__temporal_internal_connection_like');
168+
export type InternalConnectionLike = ConnectionLike & {
169+
[InternalConnectionLikeSymbol]?: {
170+
/**
171+
* Capability flag that determines whether the connection supports eager workflow start.
172+
* This will only be true if the underlying connection is a {@link NativeConnection}.
173+
*/
174+
readonly supportsEagerStart?: boolean;
175+
};
176+
};
177+
167178
export const QueryRejectCondition = {
168179
NONE: 'NONE',
169180
NOT_OPEN: 'NOT_OPEN',

packages/client/src/workflow-client.ts

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,15 @@ import {
6161
WorkflowStartUpdateOutput,
6262
WorkflowStartUpdateWithStartInput,
6363
WorkflowStartUpdateWithStartOutput,
64+
WorkflowStartOutput,
6465
} from './interceptors';
6566
import {
6667
CountWorkflowExecution,
6768
DescribeWorkflowExecutionResponse,
6869
encodeQueryRejectCondition,
6970
GetWorkflowExecutionHistoryRequest,
71+
InternalConnectionLike,
72+
InternalConnectionLikeSymbol,
7073
QueryRejectCondition,
7174
RequestCancelWorkflowExecutionResponse,
7275
StartWorkflowExecutionRequest,
@@ -94,6 +97,7 @@ import {
9497
import { mapAsyncIterable } from './iterators-utils';
9598
import { WorkflowUpdateStage, encodeWorkflowUpdateStage } from './workflow-update-stage';
9699
import { InternalWorkflowStartOptionsSymbol, InternalWorkflowStartOptions } from './internal';
100+
import { adaptWorkflowClientInterceptor } from './interceptor-adapters';
97101

98102
const UpdateWorkflowExecutionLifecycleStage = temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage;
99103

@@ -263,6 +267,15 @@ export interface WorkflowHandleWithFirstExecutionRunId<T extends Workflow = Work
263267
readonly firstExecutionRunId: string;
264268
}
265269

270+
/**
271+
* This interface is exactly the same as {@link WorkflowHandleWithFirstExecutionRunId} except it
272+
* includes the `eagerlyStarted` returned from {@link WorkflowClient.start}.
273+
*/
274+
export interface WorkflowHandleWithStartDetails<T extends Workflow = Workflow>
275+
extends WorkflowHandleWithFirstExecutionRunId<T> {
276+
readonly eagerlyStarted: boolean;
277+
}
278+
266279
/**
267280
* This interface is exactly the same as {@link WorkflowHandle} except it
268281
* includes the `signaledRunId` returned from `signalWithStart`.
@@ -514,14 +527,19 @@ export class WorkflowClient extends BaseClient {
514527
workflowTypeOrFunc: string | T,
515528
options: WorkflowStartOptions<T>,
516529
interceptors: WorkflowClientInterceptor[]
517-
): Promise<string> {
530+
): Promise<WorkflowStartOutput> {
518531
const workflowType = extractWorkflowType(workflowTypeOrFunc);
519532
assertRequiredWorkflowOptions(options);
520533
const compiledOptions = compileWorkflowOptions(ensureArgs(options));
534+
const adaptedInterceptors = interceptors.map((i) => adaptWorkflowClientInterceptor(i));
521535

522-
const start = composeInterceptors(interceptors, 'start', this._startWorkflowHandler.bind(this));
536+
const startWithDetails = composeInterceptors(
537+
adaptedInterceptors,
538+
'startWithDetails',
539+
this._startWorkflowHandler.bind(this)
540+
);
523541

524-
return start({
542+
return startWithDetails({
525543
options: compiledOptions,
526544
headers: {},
527545
workflowType,
@@ -537,7 +555,6 @@ export class WorkflowClient extends BaseClient {
537555
const { signal, signalArgs, ...rest } = options;
538556
assertRequiredWorkflowOptions(rest);
539557
const compiledOptions = compileWorkflowOptions(ensureArgs(rest));
540-
541558
const signalWithStart = composeInterceptors(
542559
interceptors,
543560
'signalWithStart',
@@ -561,22 +578,25 @@ export class WorkflowClient extends BaseClient {
561578
public async start<T extends Workflow>(
562579
workflowTypeOrFunc: string | T,
563580
options: WorkflowStartOptions<T>
564-
): Promise<WorkflowHandleWithFirstExecutionRunId<T>> {
581+
): Promise<WorkflowHandleWithStartDetails<T>> {
565582
const { workflowId } = options;
566583
const interceptors = this.getOrMakeInterceptors(workflowId);
567-
const runId = await this._start(workflowTypeOrFunc, { ...options, workflowId }, interceptors);
584+
const wfStartOutput = await this._start(workflowTypeOrFunc, { ...options, workflowId }, interceptors);
568585
// runId is not used in handles created with `start*` calls because these
569586
// handles should allow interacting with the workflow if it continues as new.
570-
const handle = this._createWorkflowHandle({
587+
const baseHandle = this._createWorkflowHandle({
571588
workflowId,
572589
runId: undefined,
573-
firstExecutionRunId: runId,
574-
runIdForResult: runId,
590+
firstExecutionRunId: wfStartOutput.runId,
591+
runIdForResult: wfStartOutput.runId,
575592
interceptors,
576593
followRuns: options.followRuns ?? true,
577-
}) as WorkflowHandleWithFirstExecutionRunId<T>; // Cast is safe because we know we add the firstExecutionRunId below
578-
(handle as any) /* readonly */.firstExecutionRunId = runId;
579-
return handle;
594+
});
595+
return {
596+
...baseHandle,
597+
firstExecutionRunId: wfStartOutput.runId,
598+
eagerlyStarted: wfStartOutput.eagerlyStarted,
599+
};
580600
}
581601

582602
/**
@@ -1246,7 +1266,7 @@ export class WorkflowClient extends BaseClient {
12461266
*
12471267
* Used as the final function of the start interceptor chain
12481268
*/
1249-
protected async _startWorkflowHandler(input: WorkflowStartInput): Promise<string> {
1269+
protected async _startWorkflowHandler(input: WorkflowStartInput): Promise<WorkflowStartOutput> {
12501270
const req = await this.createStartWorkflowRequest(input);
12511271
const { options: opts, workflowType } = input;
12521272
const internalOptions = (opts as InternalWorkflowStartOptions)[InternalWorkflowStartOptionsSymbol];
@@ -1255,7 +1275,10 @@ export class WorkflowClient extends BaseClient {
12551275
if (internalOptions != null) {
12561276
internalOptions.backLink = response.link ?? undefined;
12571277
}
1258-
return response.runId;
1278+
return {
1279+
runId: response.runId,
1280+
eagerlyStarted: response.eagerWorkflowTask != null,
1281+
};
12591282
} catch (err: any) {
12601283
if (err.code === grpcStatus.ALREADY_EXISTS) {
12611284
throw new WorkflowExecutionAlreadyStartedError(
@@ -1272,6 +1295,15 @@ export class WorkflowClient extends BaseClient {
12721295
const { options: opts, workflowType, headers } = input;
12731296
const { identity, namespace } = this.options;
12741297
const internalOptions = (opts as InternalWorkflowStartOptions)[InternalWorkflowStartOptionsSymbol];
1298+
const supportsEagerStart = (this.connection as InternalConnectionLike)?.[InternalConnectionLikeSymbol]
1299+
?.supportsEagerStart;
1300+
1301+
if (opts.requestEagerStart && !supportsEagerStart) {
1302+
throw new Error(
1303+
'Eager workflow start requires a NativeConnection shared between client and worker. ' +
1304+
'Pass a NativeConnection via ClientOptions.connection, or disable requestEagerStart.'
1305+
);
1306+
}
12751307

12761308
return {
12771309
namespace,
@@ -1303,6 +1335,7 @@ export class WorkflowClient extends BaseClient {
13031335
userMetadata: await encodeUserMetadata(this.dataConverter, opts.staticSummary, opts.staticDetails),
13041336
priority: opts.priority ? compilePriority(opts.priority) : undefined,
13051337
versioningOverride: opts.versioningOverride ?? undefined,
1338+
requestEagerExecution: opts.requestEagerStart,
13061339
...filterNullAndUndefined(internalOptions ?? {}),
13071340
};
13081341
}

packages/client/src/workflow-options.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ export interface WorkflowOptions extends CommonWorkflowOptions {
5454
* @experimental Deployment based versioning is experimental and may change in the future.
5555
*/
5656
versioningOverride?: VersioningOverride;
57+
58+
/**
59+
* Potentially reduce the latency to start this workflow by requesting that the server
60+
* start it on a local worker running with this same client.
61+
*/
62+
requestEagerStart?: boolean;
5763
}
5864

5965
export type WithCompiledWorkflowOptions<T extends WorkflowOptions> = Replace<
@@ -97,7 +103,8 @@ export type WorkflowSignalWithStartOptions<SignalArgs extends any[] = []> = Sign
97103
? WorkflowSignalWithStartOptionsWithArgs<SignalArgs>
98104
: WorkflowSignalWithStartOptionsWithoutArgs<SignalArgs>;
99105

100-
export interface WorkflowSignalWithStartOptionsWithoutArgs<SignalArgs extends any[]> extends WorkflowOptions {
106+
export interface WorkflowSignalWithStartOptionsWithoutArgs<SignalArgs extends any[]>
107+
extends Omit<WorkflowOptions, 'requestEagerStart'> {
101108
/**
102109
* SignalDefinition or name of signal
103110
*/
@@ -109,7 +116,8 @@ export interface WorkflowSignalWithStartOptionsWithoutArgs<SignalArgs extends an
109116
signalArgs?: SignalArgs;
110117
}
111118

112-
export interface WorkflowSignalWithStartOptionsWithArgs<SignalArgs extends any[]> extends WorkflowOptions {
119+
export interface WorkflowSignalWithStartOptionsWithArgs<SignalArgs extends any[]>
120+
extends Omit<WorkflowOptions, 'requestEagerStart'> {
113121
/**
114122
* SignalDefinition or name of signal
115123
*/

packages/test/src/test-integration-workflows.ts

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { randomUUID } from 'crypto';
33
import asyncRetry from 'async-retry';
44
import { ExecutionContext } from 'ava';
55
import { firstValueFrom, Subject } from 'rxjs';
6-
import { WorkflowFailedError, WorkflowHandle } from '@temporalio/client';
6+
import { Client, WorkflowClient, WorkflowFailedError, WorkflowHandle } from '@temporalio/client';
77
import * as activity from '@temporalio/activity';
88
import { msToNumber, tsToMs } from '@temporalio/common/lib/time';
99
import { TestWorkflowEnvironment } from '@temporalio/testing';
@@ -1795,3 +1795,49 @@ test('Default handlers fail given reserved prefix', async (t) => {
17951795
await handle.terminate();
17961796
});
17971797
});
1798+
1799+
export async function helloWorkflow(name: string): Promise<string> {
1800+
return `Hello, ${name}!`;
1801+
}
1802+
1803+
test('Workflow can be started eagerly with shared NativeConnection', async (t) => {
1804+
const { createWorker, taskQueue } = helpers(t);
1805+
const client = new Client({
1806+
connection: t.context.env.nativeConnection,
1807+
namespace: t.context.env.client.options.namespace,
1808+
});
1809+
1810+
const worker = await createWorker();
1811+
await worker.runUntil(async () => {
1812+
const handle = await client.workflow.start(helloWorkflow, {
1813+
args: ['Temporal'],
1814+
workflowId: `eager-workflow-${randomUUID()}`,
1815+
taskQueue,
1816+
requestEagerStart: true,
1817+
workflowTaskTimeout: '1h', // hang if retry needed
1818+
});
1819+
1820+
t.true(handle.eagerlyStarted);
1821+
1822+
const result = await handle.result();
1823+
t.is(result, 'Hello, Temporal!');
1824+
});
1825+
});
1826+
1827+
test('Error thrown when requestEagerStart is used with regular Connection', async (t) => {
1828+
const { taskQueue } = helpers(t);
1829+
1830+
const client = new WorkflowClient({ connection: t.context.env.connection });
1831+
1832+
await t.throwsAsync(
1833+
client.start(helloWorkflow, {
1834+
args: ['Temporal'],
1835+
workflowId: `eager-workflow-error-${randomUUID()}`,
1836+
taskQueue,
1837+
requestEagerStart: true,
1838+
}),
1839+
{
1840+
message: /Eager workflow start requires a NativeConnection/,
1841+
}
1842+
);
1843+
});

packages/worker/src/connection.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
OperatorService,
1212
HealthService,
1313
TestService,
14+
InternalConnectionLikeSymbol,
1415
} from '@temporalio/client';
1516
import { InternalConnectionOptions, InternalConnectionOptionsSymbol } from '@temporalio/client/lib/connection';
1617
import { TransportError } from './errors';
@@ -93,6 +94,14 @@ export class NativeConnection implements ConnectionLike {
9394
false
9495
);
9596
}
97+
98+
// Set internal capability flag - not part of public API
99+
Object.defineProperty(this, InternalConnectionLikeSymbol, {
100+
value: { supportsEagerStart: true },
101+
writable: false,
102+
enumerable: false,
103+
configurable: false,
104+
});
96105
}
97106

98107
/**

0 commit comments

Comments
 (0)