Skip to content

Commit 1ac9aca

Browse files
committed
addressing pr feedback:
- symbol usage to mark if connection supports ews - separate file for interceptor adapters, adapt wf client interceptors on wf client creation - deprecate `start` wf client interceptor in favor of `startWithDetails`
1 parent b22143a commit 1ac9aca

File tree

6 files changed

+77
-46
lines changed

6 files changed

+77
-46
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import { WorkflowClientInterceptor, WorkflowStartInput, WorkflowStartOutput } from './interceptors';
2+
3+
// Apply adapters to workflow client interceptor.
4+
export function adaptWorkflowClientInterceptor(i: WorkflowClientInterceptor): WorkflowClientInterceptor {
5+
return adaptLegacyStartInterceptor(i);
6+
}
7+
8+
// Adapt legacy `start` interceptors to the new `startWithDetails` interceptor.
9+
function adaptLegacyStartInterceptor(i: WorkflowClientInterceptor): WorkflowClientInterceptor {
10+
// If it already has the new method, or doesn't have the legacy one, no adaptation is needed.
11+
// eslint-disable-next-line deprecation/deprecation
12+
if (i.startWithDetails || !i.start) {
13+
return i;
14+
}
15+
16+
// This interceptor has a legacy `start` but not `startWithDetails`. We'll adapt it.
17+
return {
18+
...i,
19+
startWithDetails: async (input, next): Promise<WorkflowStartOutput> => {
20+
let downstreamOut: WorkflowStartOutput | undefined;
21+
22+
// Patched `next` for legacy `start` interceptors.
23+
// Captures the full `WorkflowStartOutput` while returning `runId` as a string.
24+
const patchedNext = async (patchedInput: WorkflowStartInput): Promise<string> => {
25+
downstreamOut = await next(patchedInput);
26+
return downstreamOut.runId;
27+
};
28+
29+
const runIdFromLegacyInterceptor = await i.start!(input, patchedNext); // eslint-disable-line deprecation/deprecation
30+
31+
// If the interceptor short-circuited (didn't call `next`), `downstreamOut` will be undefined.
32+
// In that case, we can't have an eager start.
33+
if (downstreamOut === undefined) {
34+
return { runId: runIdFromLegacyInterceptor, eagerlyStarted: false };
35+
}
36+
37+
// If `next` was called, honor the `runId` from the legacy interceptor but preserve
38+
// the `eagerlyStarted` status from the actual downstream call.
39+
return { ...downstreamOut, runId: runIdFromLegacyInterceptor };
40+
},
41+
};
42+
}

packages/client/src/interceptors.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,8 @@ export interface WorkflowClientInterceptor {
124124
*
125125
* If you implement this method,
126126
* {@link signalWithStart} most likely needs to be implemented too
127+
*
128+
* @deprecated in favour of {@link startWithDetails}
127129
*/
128130
start?: (input: WorkflowStartInput, next: Next<this, 'start'>) => Promise<string /* runId */>;
129131

packages/client/src/types.ts

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -162,14 +162,19 @@ export interface ConnectionLike {
162162
* @see https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal
163163
*/
164164
withAbortSignal<R>(abortSignal: AbortSignal, fn: () => Promise<R>): Promise<R>;
165-
166-
/**
167-
* Capability flag that determines whether the connection supports eager workflow start.
168-
* This will only be true if the underlying connection is a {@link NativeConnection}.
169-
*/
170-
readonly supportsEagerStart?: boolean;
171165
}
172166

167+
export const InternalConnectionLikeSymbol = Symbol('__temporal_internal_connection_like');
168+
export type InternalConnectionLike = ConnectionLike & {
169+
[InternalConnectionLikeSymbol]?: {
170+
/**
171+
* Capability flag that determines whether the connection supports eager workflow start.
172+
* This will only be true if the underlying connection is a {@link NativeConnection}.
173+
*/
174+
readonly supportsEagerStart?: boolean;
175+
};
176+
};
177+
173178
export const QueryRejectCondition = {
174179
NONE: 'NONE',
175180
NOT_OPEN: 'NOT_OPEN',

packages/client/src/workflow-client.ts

Lines changed: 12 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ import {
6868
DescribeWorkflowExecutionResponse,
6969
encodeQueryRejectCondition,
7070
GetWorkflowExecutionHistoryRequest,
71+
InternalConnectionLike,
72+
InternalConnectionLikeSymbol,
7173
QueryRejectCondition,
7274
RequestCancelWorkflowExecutionResponse,
7375
StartWorkflowExecutionRequest,
@@ -95,6 +97,7 @@ import {
9597
import { mapAsyncIterable } from './iterators-utils';
9698
import { WorkflowUpdateStage, encodeWorkflowUpdateStage } from './workflow-update-stage';
9799
import { InternalWorkflowStartOptionsSymbol, InternalWorkflowStartOptions } from './internal';
100+
import { adaptWorkflowClientInterceptor } from './interceptor-adapters';
98101

99102
const UpdateWorkflowExecutionLifecycleStage = temporal.api.enums.v1.UpdateWorkflowExecutionLifecycleStage;
100103

@@ -529,43 +532,8 @@ export class WorkflowClient extends BaseClient {
529532
assertRequiredWorkflowOptions(options);
530533
const compiledOptions = compileWorkflowOptions(ensureArgs(options));
531534

532-
// Adapt legacy `start` interceptors to the new `startWithDetails` interface.
533-
const adaptedInterceptors: WorkflowClientInterceptor[] = interceptors.map((i) => {
534-
// If it already has the new method, or doesn't have the legacy one, no adaptation is needed.
535-
if (i.startWithDetails || !i.start) {
536-
return i;
537-
}
538-
539-
// This interceptor has a legacy `start` but not `startWithDetails`. We'll adapt it.
540-
return {
541-
...i,
542-
startWithDetails: async (input, next): Promise<WorkflowStartOutput> => {
543-
let downstreamOut: WorkflowStartOutput | undefined;
544-
545-
// Patched `next` for legacy `start` interceptors.
546-
// Captures the full `WorkflowStartOutput` while returning `runId` as a string.
547-
const patchedNext = async (patchedInput: WorkflowStartInput): Promise<string> => {
548-
downstreamOut = await next(patchedInput);
549-
return downstreamOut.runId;
550-
};
551-
552-
const runIdFromLegacyInterceptor = await i.start!(input, patchedNext);
553-
554-
// If the interceptor short-circuited (didn't call `next`), `downstreamOut` will be undefined.
555-
// In that case, we can't have an eager start.
556-
if (downstreamOut === undefined) {
557-
return { runId: runIdFromLegacyInterceptor, eagerlyStarted: false };
558-
}
559-
560-
// If `next` was called, honor the `runId` from the legacy interceptor but preserve
561-
// the `eagerlyStarted` status from the actual downstream call.
562-
return { ...downstreamOut, runId: runIdFromLegacyInterceptor };
563-
},
564-
};
565-
});
566-
567535
const startWithDetails = composeInterceptors(
568-
adaptedInterceptors,
536+
interceptors,
569537
'startWithDetails',
570538
this._startWorkflowHandler.bind(this)
571539
);
@@ -1326,8 +1294,10 @@ export class WorkflowClient extends BaseClient {
13261294
const { options: opts, workflowType, headers } = input;
13271295
const { identity, namespace } = this.options;
13281296
const internalOptions = (opts as InternalWorkflowStartOptions)[InternalWorkflowStartOptionsSymbol];
1297+
const supportsEagerStart = (this.connection as InternalConnectionLike)?.[InternalConnectionLikeSymbol]
1298+
?.supportsEagerStart;
13291299

1330-
if (opts.requestEagerStart && !('supportsEagerStart' in this.connection && this.connection.supportsEagerStart)) {
1300+
if (opts.requestEagerStart && !supportsEagerStart) {
13311301
throw new Error(
13321302
'Eager workflow start requires a NativeConnection shared between client and worker. ' +
13331303
'Pass a NativeConnection via ClientOptions.connection, or disable requestEagerStart.'
@@ -1681,7 +1651,11 @@ export class WorkflowClient extends BaseClient {
16811651
const factories = (this.options.interceptors as WorkflowClientInterceptors).calls ?? [];
16821652
return factories.map((ctor) => ctor({ workflowId, runId }));
16831653
}
1684-
return Array.isArray(this.options.interceptors) ? (this.options.interceptors as WorkflowClientInterceptor[]) : [];
1654+
const interceptors = Array.isArray(this.options.interceptors)
1655+
? (this.options.interceptors as WorkflowClientInterceptor[])
1656+
: [];
1657+
// Apply adapters to workflow client interceptors.
1658+
return interceptors.map((i) => adaptWorkflowClientInterceptor(i));
16851659
}
16861660
}
16871661

packages/test/src/test-integration-workflows.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { randomUUID } from 'crypto';
33
import asyncRetry from 'async-retry';
44
import { ExecutionContext } from 'ava';
55
import { firstValueFrom, Subject } from 'rxjs';
6-
import { Client, Connection, WorkflowClient, WorkflowFailedError, WorkflowHandle } from '@temporalio/client';
6+
import { Client, WorkflowClient, WorkflowFailedError, WorkflowHandle } from '@temporalio/client';
77
import * as activity from '@temporalio/activity';
88
import { msToNumber, tsToMs } from '@temporalio/common/lib/time';
99
import { TestWorkflowEnvironment } from '@temporalio/testing';

packages/worker/src/connection.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
OperatorService,
1212
HealthService,
1313
TestService,
14+
InternalConnectionLikeSymbol
1415
} from '@temporalio/client';
1516
import { InternalConnectionOptions, InternalConnectionOptionsSymbol } from '@temporalio/client/lib/connection';
1617
import { TransportError } from './errors';
@@ -25,7 +26,6 @@ import { Runtime } from './runtime';
2526
* This class can be used to power `@temporalio/client`'s Client objects.
2627
*/
2728
export class NativeConnection implements ConnectionLike {
28-
public readonly supportsEagerStart = true;
2929
/**
3030
* referenceHolders is used internally by the framework, it can be accessed with `extractReferenceHolders` (below)
3131
*/
@@ -94,6 +94,14 @@ export class NativeConnection implements ConnectionLike {
9494
false
9595
);
9696
}
97+
98+
// Set internal capability flag - not part of public API
99+
Object.defineProperty(this, InternalConnectionLikeSymbol, {
100+
value: { supportsEagerStart: true },
101+
writable: false,
102+
enumerable: false,
103+
configurable: false,
104+
});
97105
}
98106

99107
/**

0 commit comments

Comments
 (0)