Skip to content

Commit c92cbb1

Browse files
committed
Add requestEagerStart
- add startWithDetails to WorkflowClientInterceptor to support return type of WorkflowStartOutput, inject if not defined at _start call
1 parent a1d91be commit c92cbb1

File tree

7 files changed

+135
-19
lines changed

7 files changed

+135
-19
lines changed

packages/client/src/interceptors.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,12 @@ export interface WorkflowStartUpdateInput {
3535
readonly options: WorkflowUpdateOptions;
3636
}
3737

38+
/** Output for WorkflowClientInterceptor.startWithDetails */
39+
export interface WorkflowStartOutput {
40+
readonly runId: string;
41+
readonly eagerlyStarted: boolean;
42+
}
43+
3844
/** Output for WorkflowClientInterceptor.startUpdate */
3945
export interface WorkflowStartUpdateOutput {
4046
readonly updateId: string;
@@ -120,6 +126,18 @@ export interface WorkflowClientInterceptor {
120126
* {@link signalWithStart} most likely needs to be implemented too
121127
*/
122128
start?: (input: WorkflowStartInput, next: Next<this, 'start'>) => Promise<string /* runId */>;
129+
130+
/**
131+
* Intercept a service call to startWorkflowExecution
132+
*
133+
* Successor to {@link start}. Unlike {@link start}, this method returns
134+
* start details via {@link WorkflowStartOutput}.
135+
*
136+
* If you implement this method,
137+
* {@link signalWithStart} most likely needs to be implemented too
138+
*/
139+
startWithDetails?: (input: WorkflowStartInput, next: Next<this, 'startWithDetails'>) => Promise<WorkflowStartOutput>;
140+
123141
/**
124142
* Intercept a service call to updateWorkflowExecution
125143
*/

packages/client/src/types.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,12 @@ export interface ConnectionLike {
147147
* @see https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal
148148
*/
149149
withAbortSignal<R>(abortSignal: AbortSignal, fn: () => Promise<R>): Promise<R>;
150+
151+
/**
152+
* Capability flag that determines whether the connection supports eager workflow start.
153+
* This will only be true if the underlying connection is a {@link NativeConnection}.
154+
*/
155+
readonly supportsEagerStart?: boolean;
150156
}
151157

152158
export const QueryRejectCondition = {

packages/client/src/workflow-client.ts

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ import {
5959
WorkflowStartUpdateOutput,
6060
WorkflowStartUpdateWithStartInput,
6161
WorkflowStartUpdateWithStartOutput,
62+
WorkflowStartOutput,
6263
} from './interceptors';
6364
import {
6465
CountWorkflowExecution,
@@ -260,6 +261,15 @@ export interface WorkflowHandleWithFirstExecutionRunId<T extends Workflow = Work
260261
readonly firstExecutionRunId: string;
261262
}
262263

264+
/**
265+
* This interface is exactly the same as {@link WorkflowHandleWithFirstExecutionRunId} except it
266+
* includes the `eagerlyStarted` returned from {@link WorkflowClient.start}.
267+
*/
268+
export interface WorkflowHandleWithStartDetails<T extends Workflow = Workflow>
269+
extends WorkflowHandleWithFirstExecutionRunId<T> {
270+
readonly eagerlyStarted: boolean;
271+
}
272+
263273
/**
264274
* This interface is exactly the same as {@link WorkflowHandle} except it
265275
* includes the `signaledRunId` returned from `signalWithStart`.
@@ -511,14 +521,22 @@ export class WorkflowClient extends BaseClient {
511521
workflowTypeOrFunc: string | T,
512522
options: WithWorkflowArgs<T, WorkflowOptions>,
513523
interceptors: WorkflowClientInterceptor[]
514-
): Promise<string> {
524+
): Promise<WorkflowStartOutput> {
515525
const workflowType = extractWorkflowType(workflowTypeOrFunc);
516526
assertRequiredWorkflowOptions(options);
517527
const compiledOptions = compileWorkflowOptions(ensureArgs(options));
518528

519-
const start = composeInterceptors(interceptors, 'start', this._startWorkflowHandler.bind(this));
529+
const adaptedInterceptors: WorkflowClientInterceptor[] = interceptors.map((i) =>
530+
i.startWithDetails ? i : { ...i, startWithDetails: (input, next) => next(input) }
531+
);
532+
533+
const startWithDetails = composeInterceptors(
534+
adaptedInterceptors,
535+
'startWithDetails',
536+
this._startWorkflowHandler.bind(this)
537+
);
520538

521-
return start({
539+
return startWithDetails({
522540
options: compiledOptions,
523541
headers: {},
524542
workflowType,
@@ -534,7 +552,6 @@ export class WorkflowClient extends BaseClient {
534552
const { signal, signalArgs, ...rest } = options;
535553
assertRequiredWorkflowOptions(rest);
536554
const compiledOptions = compileWorkflowOptions(ensureArgs(rest));
537-
538555
const signalWithStart = composeInterceptors(
539556
interceptors,
540557
'signalWithStart',
@@ -558,22 +575,25 @@ export class WorkflowClient extends BaseClient {
558575
public async start<T extends Workflow>(
559576
workflowTypeOrFunc: string | T,
560577
options: WorkflowStartOptions<T>
561-
): Promise<WorkflowHandleWithFirstExecutionRunId<T>> {
578+
): Promise<WorkflowHandleWithStartDetails<T>> {
562579
const { workflowId } = options;
563580
const interceptors = this.getOrMakeInterceptors(workflowId);
564-
const runId = await this._start(workflowTypeOrFunc, { ...options, workflowId }, interceptors);
581+
const wfStartOutput = await this._start(workflowTypeOrFunc, { ...options, workflowId }, interceptors);
565582
// runId is not used in handles created with `start*` calls because these
566583
// handles should allow interacting with the workflow if it continues as new.
567-
const handle = this._createWorkflowHandle({
584+
const baseHandle = this._createWorkflowHandle({
568585
workflowId,
569586
runId: undefined,
570-
firstExecutionRunId: runId,
571-
runIdForResult: runId,
587+
firstExecutionRunId: wfStartOutput.runId,
588+
runIdForResult: wfStartOutput.runId,
572589
interceptors,
573590
followRuns: options.followRuns ?? true,
574-
}) as WorkflowHandleWithFirstExecutionRunId<T>; // Cast is safe because we know we add the firstExecutionRunId below
575-
(handle as any) /* readonly */.firstExecutionRunId = runId;
576-
return handle;
591+
});
592+
return {
593+
...baseHandle,
594+
firstExecutionRunId: wfStartOutput.runId,
595+
eagerlyStarted: wfStartOutput.eagerlyStarted,
596+
};
577597
}
578598

579599
/**
@@ -1242,11 +1262,15 @@ export class WorkflowClient extends BaseClient {
12421262
*
12431263
* Used as the final function of the start interceptor chain
12441264
*/
1245-
protected async _startWorkflowHandler(input: WorkflowStartInput): Promise<string> {
1265+
protected async _startWorkflowHandler(input: WorkflowStartInput): Promise<WorkflowStartOutput> {
12461266
const req = await this.createStartWorkflowRequest(input);
12471267
const { options: opts, workflowType } = input;
12481268
try {
1249-
return (await this.workflowService.startWorkflowExecution(req)).runId;
1269+
const resp = await this.workflowService.startWorkflowExecution(req);
1270+
return {
1271+
runId: resp.runId,
1272+
eagerlyStarted: resp.eagerWorkflowTask != null,
1273+
};
12501274
} catch (err: any) {
12511275
if (err.code === grpcStatus.ALREADY_EXISTS) {
12521276
throw new WorkflowExecutionAlreadyStartedError(
@@ -1263,6 +1287,13 @@ export class WorkflowClient extends BaseClient {
12631287
const { options: opts, workflowType, headers } = input;
12641288
const { identity, namespace } = this.options;
12651289

1290+
if (opts.requestEagerStart && !('supportsEagerStart' in this.connection && this.connection.supportsEagerStart)) {
1291+
throw new Error(
1292+
'Eager workflow start requires a NativeConnection shared between client and worker. ' +
1293+
'Pass a NativeConnection via ClientOptions.connection, or disable requestEagerStart.'
1294+
);
1295+
}
1296+
12661297
return {
12671298
namespace,
12681299
identity,
@@ -1292,6 +1323,7 @@ export class WorkflowClient extends BaseClient {
12921323
header: { fields: headers },
12931324
priority: opts.priority ? compilePriority(opts.priority) : undefined,
12941325
versioningOverride: opts.versioningOverride ?? undefined,
1326+
requestEagerExecution: opts.requestEagerStart,
12951327
};
12961328
}
12971329

packages/client/src/workflow-options.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ export interface WorkflowOptions extends CommonWorkflowOptions {
5454
* @experimental Deployment based versioning is experimental and may change in the future.
5555
*/
5656
versioningOverride?: VersioningOverride;
57+
58+
/**
59+
* Potentially reduce the latency to start this workflow by encouraging the server to
60+
* start it on a local worker running with this same client.
61+
*/
62+
requestEagerStart?: boolean;
5763
}
5864

5965
export type WithCompiledWorkflowOptions<T extends WorkflowOptions> = Replace<
@@ -97,7 +103,8 @@ export type WorkflowSignalWithStartOptions<SignalArgs extends any[] = []> = Sign
97103
? WorkflowSignalWithStartOptionsWithArgs<SignalArgs>
98104
: WorkflowSignalWithStartOptionsWithoutArgs<SignalArgs>;
99105

100-
export interface WorkflowSignalWithStartOptionsWithoutArgs<SignalArgs extends any[]> extends WorkflowOptions {
106+
export interface WorkflowSignalWithStartOptionsWithoutArgs<SignalArgs extends any[]>
107+
extends Omit<WorkflowOptions, 'requestEagerStart'> {
101108
/**
102109
* SignalDefinition or name of signal
103110
*/
@@ -109,7 +116,8 @@ export interface WorkflowSignalWithStartOptionsWithoutArgs<SignalArgs extends an
109116
signalArgs?: SignalArgs;
110117
}
111118

112-
export interface WorkflowSignalWithStartOptionsWithArgs<SignalArgs extends any[]> extends WorkflowOptions {
119+
export interface WorkflowSignalWithStartOptionsWithArgs<SignalArgs extends any[]>
120+
extends Omit<WorkflowOptions, 'requestEagerStart'> {
113121
/**
114122
* SignalDefinition or name of signal
115123
*/

packages/test/src/helpers.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@ import {
1818
} from '@temporalio/testing';
1919
import * as worker from '@temporalio/worker';
2020
import { Worker as RealWorker, WorkerOptions } from '@temporalio/worker';
21-
import { inWorkflowContext, WorkflowInfo } from '@temporalio/workflow';
22-
import { LoggerSinksInternal as DefaultLoggerSinks } from '@temporalio/workflow/lib/logs';
21+
import { inWorkflowContext } from '@temporalio/workflow';
2322

2423
export function u8(s: string): Uint8Array {
2524
// TextEncoder requires lib "dom"

packages/test/src/test-integration-workflows.ts

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { randomUUID } from 'crypto';
33
import asyncRetry from 'async-retry';
44
import { ExecutionContext } from 'ava';
55
import { firstValueFrom, Subject } from 'rxjs';
6-
import { WorkflowFailedError, WorkflowHandle } from '@temporalio/client';
6+
import { Client, Connection, WorkflowClient, WorkflowFailedError, WorkflowHandle } from '@temporalio/client';
77
import * as activity from '@temporalio/activity';
88
import { msToNumber, tsToMs } from '@temporalio/common/lib/time';
99
import { TestWorkflowEnvironment } from '@temporalio/testing';
@@ -1637,3 +1637,55 @@ test('Default handlers fail given reserved prefix', async (t) => {
16371637
await handle.terminate();
16381638
});
16391639
});
1640+
1641+
export async function helloWorkflow(name: string): Promise<string> {
1642+
return `Hello, ${name}!`;
1643+
}
1644+
1645+
test('Workflow can be started eagerly with shared NativeConnection', async (t) => {
1646+
const { createWorker, taskQueue } = helpers(t);
1647+
const client = new Client({
1648+
connection: t.context.env.nativeConnection,
1649+
namespace: t.context.env.client.options.namespace,
1650+
});
1651+
1652+
const worker = await createWorker();
1653+
await worker.runUntil(async () => {
1654+
const handle = await client.workflow.start(helloWorkflow, {
1655+
args: ['Temporal'],
1656+
workflowId: `eager-workflow-${randomUUID()}`,
1657+
taskQueue,
1658+
requestEagerStart: true,
1659+
workflowTaskTimeout: '1h', // hang if retry needed
1660+
});
1661+
1662+
t.true(handle.eagerlyStarted);
1663+
1664+
const result = await handle.result();
1665+
t.is(result, 'Hello, Temporal!');
1666+
});
1667+
});
1668+
1669+
test('Error thrown when requestEagerStart is used with regular Connection', async (t) => {
1670+
const { taskQueue } = helpers(t);
1671+
1672+
// Create a regular connection instead of native
1673+
const regularConnection = await Connection.connect();
1674+
const client = new WorkflowClient({ connection: regularConnection });
1675+
1676+
try {
1677+
await t.throwsAsync(
1678+
client.start(helloWorkflow, {
1679+
args: ['Temporal'],
1680+
workflowId: `eager-workflow-error-${randomUUID()}`,
1681+
taskQueue,
1682+
requestEagerStart: true,
1683+
}),
1684+
{
1685+
message: /Eager workflow start requires a NativeConnection/,
1686+
}
1687+
);
1688+
} finally {
1689+
await regularConnection.close();
1690+
}
1691+
});

packages/worker/src/connection.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import { Runtime } from './runtime';
1616
* This class can be used to power `@temporalio/client`'s Client objects.
1717
*/
1818
export class NativeConnection implements ConnectionLike {
19+
public readonly supportsEagerStart = true;
1920
/**
2021
* referenceHolders is used internally by the framework, it can be accessed with `extractReferenceHolders` (below)
2122
*/

0 commit comments

Comments
 (0)