Skip to content

Commit 8d972ca

Browse files
committed
Add requestEagerStart
- add startWithDetails to WorkflowClientInterceptor to support return type of WorkflowStartOutput, inject if not defined at _start call
1 parent 9089bc6 commit 8d972ca

File tree

6 files changed

+133
-17
lines changed

6 files changed

+133
-17
lines changed

packages/client/src/interceptors.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ export interface WorkflowStartUpdateInput {
3535
readonly options: WorkflowUpdateOptions;
3636
}
3737

38+
/** Output for WorkflowClientInterceptor.startWithDetails */
39+
export interface WorkflowStartOutput {
40+
readonly runId: string;
41+
readonly eagerlyStarted: boolean;
42+
}
43+
3844
/** Output for WorkflowClientInterceptor.startUpdate */
3945
export interface WorkflowStartUpdateOutput {
4046
readonly updateId: string;
@@ -120,6 +126,18 @@ export interface WorkflowClientInterceptor {
120126
* {@link signalWithStart} most likely needs to be implemented too
121127
*/
122128
start?: (input: WorkflowStartInput, next: Next<this, 'start'>) => Promise<string /* runId */>;
129+
130+
/**
131+
* Intercept a service call to startWorkflowExecution
132+
*
133+
* Successor to {@link start}. Unlike {@link start}, this method returns
134+
* start details via {@link WorkflowStartOutput}.
135+
*
136+
* If you implement this method,
137+
* {@link signalWithStart} most likely needs to be implemented too
138+
*/
139+
startWithDetails?: (input: WorkflowStartInput, next: Next<this, 'startWithDetails'>) => Promise<WorkflowStartOutput>;
140+
123141
/**
124142
* Intercept a service call to updateWorkflowExecution
125143
*/

packages/client/src/types.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,12 @@ export interface ConnectionLike {
162162
* @see https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal
163163
*/
164164
withAbortSignal<R>(abortSignal: AbortSignal, fn: () => Promise<R>): Promise<R>;
165+
166+
/**
167+
* Capability flag that determines whether the connection supports eager workflow start.
168+
* This will only be true if the underlying connection is a {@link NativeConnection}.
169+
*/
170+
readonly supportsEagerStart?: boolean;
165171
}
166172

167173
export const QueryRejectCondition = {

packages/client/src/workflow-client.ts

Lines changed: 45 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ import {
6161
WorkflowStartUpdateOutput,
6262
WorkflowStartUpdateWithStartInput,
6363
WorkflowStartUpdateWithStartOutput,
64+
WorkflowStartOutput,
6465
} from './interceptors';
6566
import {
6667
CountWorkflowExecution,
@@ -263,6 +264,15 @@ export interface WorkflowHandleWithFirstExecutionRunId<T extends Workflow = Work
263264
readonly firstExecutionRunId: string;
264265
}
265266

267+
/**
268+
* This interface is exactly the same as {@link WorkflowHandleWithFirstExecutionRunId} except it
269+
* includes the `eagerlyStarted` returned from {@link WorkflowClient.start}.
270+
*/
271+
export interface WorkflowHandleWithStartDetails<T extends Workflow = Workflow>
272+
extends WorkflowHandleWithFirstExecutionRunId<T> {
273+
readonly eagerlyStarted: boolean;
274+
}
275+
266276
/**
267277
* This interface is exactly the same as {@link WorkflowHandle} except it
268278
* includes the `signaledRunId` returned from `signalWithStart`.
@@ -514,14 +524,22 @@ export class WorkflowClient extends BaseClient {
514524
workflowTypeOrFunc: string | T,
515525
options: WorkflowStartOptions<T>,
516526
interceptors: WorkflowClientInterceptor[]
517-
): Promise<string> {
527+
): Promise<WorkflowStartOutput> {
518528
const workflowType = extractWorkflowType(workflowTypeOrFunc);
519529
assertRequiredWorkflowOptions(options);
520530
const compiledOptions = compileWorkflowOptions(ensureArgs(options));
521531

522-
const start = composeInterceptors(interceptors, 'start', this._startWorkflowHandler.bind(this));
532+
const adaptedInterceptors: WorkflowClientInterceptor[] = interceptors.map((i) =>
533+
i.startWithDetails ? i : { ...i, startWithDetails: (input, next) => next(input) }
534+
);
535+
536+
const startWithDetails = composeInterceptors(
537+
adaptedInterceptors,
538+
'startWithDetails',
539+
this._startWorkflowHandler.bind(this)
540+
);
523541

524-
return start({
542+
return startWithDetails({
525543
options: compiledOptions,
526544
headers: {},
527545
workflowType,
@@ -537,7 +555,6 @@ export class WorkflowClient extends BaseClient {
537555
const { signal, signalArgs, ...rest } = options;
538556
assertRequiredWorkflowOptions(rest);
539557
const compiledOptions = compileWorkflowOptions(ensureArgs(rest));
540-
541558
const signalWithStart = composeInterceptors(
542559
interceptors,
543560
'signalWithStart',
@@ -561,22 +578,25 @@ export class WorkflowClient extends BaseClient {
561578
public async start<T extends Workflow>(
562579
workflowTypeOrFunc: string | T,
563580
options: WorkflowStartOptions<T>
564-
): Promise<WorkflowHandleWithFirstExecutionRunId<T>> {
581+
): Promise<WorkflowHandleWithStartDetails<T>> {
565582
const { workflowId } = options;
566583
const interceptors = this.getOrMakeInterceptors(workflowId);
567-
const runId = await this._start(workflowTypeOrFunc, { ...options, workflowId }, interceptors);
584+
const wfStartOutput = await this._start(workflowTypeOrFunc, { ...options, workflowId }, interceptors);
568585
// runId is not used in handles created with `start*` calls because these
569586
// handles should allow interacting with the workflow if it continues as new.
570-
const handle = this._createWorkflowHandle({
587+
const baseHandle = this._createWorkflowHandle({
571588
workflowId,
572589
runId: undefined,
573-
firstExecutionRunId: runId,
574-
runIdForResult: runId,
590+
firstExecutionRunId: wfStartOutput.runId,
591+
runIdForResult: wfStartOutput.runId,
575592
interceptors,
576593
followRuns: options.followRuns ?? true,
577-
}) as WorkflowHandleWithFirstExecutionRunId<T>; // Cast is safe because we know we add the firstExecutionRunId below
578-
(handle as any) /* readonly */.firstExecutionRunId = runId;
579-
return handle;
594+
});
595+
return {
596+
...baseHandle,
597+
firstExecutionRunId: wfStartOutput.runId,
598+
eagerlyStarted: wfStartOutput.eagerlyStarted,
599+
};
580600
}
581601

582602
/**
@@ -1246,7 +1266,7 @@ export class WorkflowClient extends BaseClient {
12461266
*
12471267
* Used as the final function of the start interceptor chain
12481268
*/
1249-
protected async _startWorkflowHandler(input: WorkflowStartInput): Promise<string> {
1269+
protected async _startWorkflowHandler(input: WorkflowStartInput): Promise<WorkflowStartOutput> {
12501270
const req = await this.createStartWorkflowRequest(input);
12511271
const { options: opts, workflowType } = input;
12521272
const internalOptions = (opts as InternalWorkflowStartOptions)[InternalWorkflowStartOptionsSymbol];
@@ -1255,7 +1275,10 @@ export class WorkflowClient extends BaseClient {
12551275
if (internalOptions != null) {
12561276
internalOptions.backLink = response.link ?? undefined;
12571277
}
1258-
return response.runId;
1278+
return {
1279+
runId: response.runId,
1280+
eagerlyStarted: response.eagerWorkflowTask != null,
1281+
};
12591282
} catch (err: any) {
12601283
if (err.code === grpcStatus.ALREADY_EXISTS) {
12611284
throw new WorkflowExecutionAlreadyStartedError(
@@ -1273,6 +1296,13 @@ export class WorkflowClient extends BaseClient {
12731296
const { identity, namespace } = this.options;
12741297
const internalOptions = (opts as InternalWorkflowStartOptions)[InternalWorkflowStartOptionsSymbol];
12751298

1299+
if (opts.requestEagerStart && !('supportsEagerStart' in this.connection && this.connection.supportsEagerStart)) {
1300+
throw new Error(
1301+
'Eager workflow start requires a NativeConnection shared between client and worker. ' +
1302+
'Pass a NativeConnection via ClientOptions.connection, or disable requestEagerStart.'
1303+
);
1304+
}
1305+
12761306
return {
12771307
namespace,
12781308
identity,
@@ -1303,6 +1333,7 @@ export class WorkflowClient extends BaseClient {
13031333
userMetadata: await encodeUserMetadata(this.dataConverter, opts.staticSummary, opts.staticDetails),
13041334
priority: opts.priority ? compilePriority(opts.priority) : undefined,
13051335
versioningOverride: opts.versioningOverride ?? undefined,
1336+
requestEagerExecution: opts.requestEagerStart,
13061337
...filterNullAndUndefined(internalOptions ?? {}),
13071338
};
13081339
}

packages/client/src/workflow-options.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ export interface WorkflowOptions extends CommonWorkflowOptions {
5454
* @experimental Deployment based versioning is experimental and may change in the future.
5555
*/
5656
versioningOverride?: VersioningOverride;
57+
58+
/**
59+
* Potentially reduce the latency to start this workflow by encouraging the server to
60+
* start it on a local worker running with this same client.
61+
*/
62+
requestEagerStart?: boolean;
5763
}
5864

5965
export type WithCompiledWorkflowOptions<T extends WorkflowOptions> = Replace<
@@ -97,7 +103,8 @@ export type WorkflowSignalWithStartOptions<SignalArgs extends any[] = []> = Sign
97103
? WorkflowSignalWithStartOptionsWithArgs<SignalArgs>
98104
: WorkflowSignalWithStartOptionsWithoutArgs<SignalArgs>;
99105

100-
export interface WorkflowSignalWithStartOptionsWithoutArgs<SignalArgs extends any[]> extends WorkflowOptions {
106+
export interface WorkflowSignalWithStartOptionsWithoutArgs<SignalArgs extends any[]>
107+
extends Omit<WorkflowOptions, 'requestEagerStart'> {
101108
/**
102109
* SignalDefinition or name of signal
103110
*/
@@ -109,7 +116,8 @@ export interface WorkflowSignalWithStartOptionsWithoutArgs<SignalArgs extends an
109116
signalArgs?: SignalArgs;
110117
}
111118

112-
export interface WorkflowSignalWithStartOptionsWithArgs<SignalArgs extends any[]> extends WorkflowOptions {
119+
export interface WorkflowSignalWithStartOptionsWithArgs<SignalArgs extends any[]>
120+
extends Omit<WorkflowOptions, 'requestEagerStart'> {
113121
/**
114122
* SignalDefinition or name of signal
115123
*/

packages/test/src/test-integration-workflows.ts

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { randomUUID } from 'crypto';
33
import asyncRetry from 'async-retry';
44
import { ExecutionContext } from 'ava';
55
import { firstValueFrom, Subject } from 'rxjs';
6-
import { WorkflowFailedError, WorkflowHandle } from '@temporalio/client';
6+
import { Client, Connection, WorkflowClient, WorkflowFailedError, WorkflowHandle } from '@temporalio/client';
77
import * as activity from '@temporalio/activity';
88
import { msToNumber, tsToMs } from '@temporalio/common/lib/time';
99
import { TestWorkflowEnvironment } from '@temporalio/testing';
@@ -1731,3 +1731,55 @@ test('Default handlers fail given reserved prefix', async (t) => {
17311731
await handle.terminate();
17321732
});
17331733
});
1734+
1735+
export async function helloWorkflow(name: string): Promise<string> {
1736+
return `Hello, ${name}!`;
1737+
}
1738+
1739+
test('Workflow can be started eagerly with shared NativeConnection', async (t) => {
1740+
const { createWorker, taskQueue } = helpers(t);
1741+
const client = new Client({
1742+
connection: t.context.env.nativeConnection,
1743+
namespace: t.context.env.client.options.namespace,
1744+
});
1745+
1746+
const worker = await createWorker();
1747+
await worker.runUntil(async () => {
1748+
const handle = await client.workflow.start(helloWorkflow, {
1749+
args: ['Temporal'],
1750+
workflowId: `eager-workflow-${randomUUID()}`,
1751+
taskQueue,
1752+
requestEagerStart: true,
1753+
workflowTaskTimeout: '1h', // hang if retry needed
1754+
});
1755+
1756+
t.true(handle.eagerlyStarted);
1757+
1758+
const result = await handle.result();
1759+
t.is(result, 'Hello, Temporal!');
1760+
});
1761+
});
1762+
1763+
test('Error thrown when requestEagerStart is used with regular Connection', async (t) => {
1764+
const { taskQueue } = helpers(t);
1765+
1766+
// Create a regular connection instead of native
1767+
const regularConnection = await Connection.connect();
1768+
const client = new WorkflowClient({ connection: regularConnection });
1769+
1770+
try {
1771+
await t.throwsAsync(
1772+
client.start(helloWorkflow, {
1773+
args: ['Temporal'],
1774+
workflowId: `eager-workflow-error-${randomUUID()}`,
1775+
taskQueue,
1776+
requestEagerStart: true,
1777+
}),
1778+
{
1779+
message: /Eager workflow start requires a NativeConnection/,
1780+
}
1781+
);
1782+
} finally {
1783+
await regularConnection.close();
1784+
}
1785+
});

packages/worker/src/connection.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import { Runtime } from './runtime';
2525
* This class can be used to power `@temporalio/client`'s Client objects.
2626
*/
2727
export class NativeConnection implements ConnectionLike {
28+
public readonly supportsEagerStart = true;
2829
/**
2930
* referenceHolders is used internally by the framework, it can be accessed with `extractReferenceHolders` (below)
3031
*/

0 commit comments

Comments
 (0)