Skip to content

Commit ab2645a

Browse files
committed
Nexus workflow client
1 parent 64cee58 commit ab2645a

File tree

14 files changed

+466
-23
lines changed

14 files changed

+466
-23
lines changed

package-lock.json

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/nexus/src/context.ts

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
import * as nexus from 'nexus-rpc';
22
import { HandlerContext as BaseHandlerContext, getHandlerContext, handlerLinks } from 'nexus-rpc/lib/handler';
33
import { Logger, LogLevel, LogMetadata, Workflow } from '@temporalio/common';
4-
import { Client, WorkflowStartOptions } from '@temporalio/client';
4+
import { Client, WorkflowStartOptions as ClientWorkflowStartOptions } from '@temporalio/client';
55
import { temporal } from '@temporalio/proto';
66
import { InternalWorkflowStartOptionsKey, InternalWorkflowStartOptions } from '@temporalio/client/lib/internal';
77
import { generateWorkflowRunOperationToken, loadWorkflowRunOperationToken } from './token';
88
import { convertNexusLinkToWorkflowEventLink, convertWorkflowEventLinkToNexusLink } from './link-converter';
9+
import { Replace } from '@temporalio/common/src/type-helpers';
910

1011
// Context used internally in the SDK to propagate information from the worker to the Temporal Nexus helpers.
1112
export interface HandlerContext extends BaseHandlerContext {
1213
log: Logger;
1314
client: Client;
1415
namespace: string;
16+
taskQueue: string;
1517
}
1618

1719
function getLogger() {
@@ -61,16 +63,24 @@ export interface WorkflowHandle<_T> {
6163
readonly runId: string;
6264
}
6365

66+
/**
67+
* Options for starting a workflow using {@link startWorkflow}, this type is identical to the client's
68+
* `WorkflowStartOptions` with the exception that `taskQueue` is optional and defaults to the current worker's task
69+
* queue.
70+
*/
71+
export type WorkflowStartOptions<T extends Workflow> = Replace<ClientWorkflowStartOptions<T>, { taskQueue?: string }>;
72+
6473
/**
6574
* Starts a workflow run for a {@link WorkflowRunOperationHandler}, linking the execution chain to a Nexus Operation
6675
* (subsequent runs started from continue-as-new and retries). Automatically propagates the callback, request ID, and
6776
* back and forward links from the Nexus options to the Workflow.
6877
*/
6978
export async function startWorkflow<T extends Workflow>(
7079
workflowTypeOrFunc: string | T,
71-
workflowOptions: WorkflowStartOptions<T>,
72-
nexusOptions: nexus.StartOperationOptions
80+
nexusOptions: nexus.StartOperationOptions,
81+
workflowOptions: WorkflowStartOptions<T>
7382
): Promise<WorkflowHandle<T>> {
83+
const { client, taskQueue } = getHandlerContext<HandlerContext>();
7484
const links = Array<temporal.api.common.v1.ILink>();
7585
if (nexusOptions.links?.length > 0) {
7686
for (const l of nexusOptions.links) {
@@ -101,8 +111,13 @@ export async function startWorkflow<T extends Workflow>(
101111
},
102112
];
103113
}
104-
(workflowOptions as any)[InternalWorkflowStartOptionsKey] = internalOptions;
105-
const handle = await getClient().workflow.start<T>(workflowTypeOrFunc, workflowOptions);
114+
const { taskQueue: userSpeficiedTaskQueue, ...rest } = workflowOptions;
115+
const startOptions: ClientWorkflowStartOptions = {
116+
...rest,
117+
taskQueue: userSpeficiedTaskQueue || taskQueue,
118+
[InternalWorkflowStartOptionsKey]: internalOptions,
119+
};
120+
const handle = await client.workflow.start(workflowTypeOrFunc, startOptions);
106121
if (internalOptions.backLink?.workflowEvent != null) {
107122
try {
108123
handlerLinks().push(convertWorkflowEventLinkToNexusLink(internalOptions.backLink.workflowEvent));
@@ -125,7 +140,7 @@ export type WorkflowRunOperationHandler<I, O> = (
125140
* A Nexus Operation implementation that is backed by a Workflow run.
126141
*/
127142
export class WorkflowRunOperation<I, O> implements nexus.OperationHandler<I, O> {
128-
constructor(readonly handler: WorkflowRunOperationHandler<I, O>) {}
143+
constructor(readonly handler: WorkflowRunOperationHandler<I, O>) { }
129144

130145
async start(input: I, options: nexus.StartOperationOptions): Promise<nexus.HandlerStartOperationResult<O>> {
131146
const { namespace } = getHandlerContext<HandlerContext>();

packages/nexus/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ export {
55
WorkflowHandle,
66
WorkflowRunOperation,
77
WorkflowRunOperationHandler,
8+
WorkflowStartOptions,
89
} from './context';

packages/test/src/helpers-integration.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,15 +126,15 @@ export function makeConfigurableEnvironmentTestFn<T>(opts: {
126126
return test;
127127
}
128128

129-
export function makeTestFunction(opts: {
129+
export interface TestFunctionOptions {
130130
workflowsPath: string;
131131
workflowEnvironmentOpts?: LocalTestWorkflowEnvironmentOptions;
132132
workflowInterceptorModules?: string[];
133133
recordedLogs?: { [workflowId: string]: LogEntry[] };
134-
}): TestFn<Context> {
135-
return makeConfigurableEnvironmentTestFn<Context>({
136-
recordedLogs: opts.recordedLogs,
137-
createTestContext: async (_t: ExecutionContext): Promise<Context> => {
134+
}
135+
136+
export function makeDefaultTestContextFunction(opts: TestFunctionOptions) {
137+
return async (_t: ExecutionContext): Promise<Context> => {
138138
let env: TestWorkflowEnvironment;
139139
if (process.env.TEMPORAL_SERVICE_ADDRESS) {
140140
env = await TestWorkflowEnvironment.createFromExistingServer({
@@ -150,7 +150,13 @@ export function makeTestFunction(opts: {
150150
}),
151151
env,
152152
};
153-
},
153+
}
154+
}
155+
156+
export function makeTestFunction(opts: TestFunctionOptions): TestFn<Context> {
157+
return makeConfigurableEnvironmentTestFn<Context>({
158+
recordedLogs: opts.recordedLogs,
159+
createTestContext: makeDefaultTestContextFunction(opts),
154160
teardown: async (c: Context) => {
155161
await c.env.teardown();
156162
},

packages/test/src/test-nexus-handler.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -613,13 +613,12 @@ test('WorkflowRunOperation attaches callback, link, and request ID', async (t) =
613613
testOp: new temporalnexus.WorkflowRunOperation<void, void>(async (_, options) => {
614614
return await temporalnexus.startWorkflow(
615615
'some-workflow',
616+
options,
616617
{
617618
workflowId,
618-
taskQueue,
619619
// To test attaching multiple callers to the same operation.
620620
workflowIdConflictPolicy: 'USE_EXISTING',
621621
},
622-
options
623622
);
624623
}),
625624
}
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
import * as nexus from 'nexus-rpc';
2+
import { WorkflowFailedError } from '@temporalio/client';
3+
import * as temporalnexus from '@temporalio/nexus';
4+
import * as workflow from '@temporalio/workflow';
5+
import { helpers, makeTestFunction } from './helpers-integration';
6+
import { randomUUID } from 'crypto';
7+
import { ApplicationFailure, CancelledFailure, NexusOperationFailure } from '@temporalio/common';
8+
9+
const service = nexus.service("test", {
10+
syncOp: nexus.operation<string, string>({ name: 'my-sync-op' }),
11+
asyncOp: nexus.operation<string, string>(),
12+
});
13+
14+
const test = makeTestFunction({
15+
workflowsPath: __filename,
16+
workflowInterceptorModules: [__filename],
17+
});
18+
19+
export async function caller(endpoint: string, op: keyof typeof service.operations, action: string): Promise<string> {
20+
const client = workflow.createNexusClient({
21+
endpoint,
22+
service,
23+
});
24+
return await workflow.CancellationScope.cancellable(async () => {
25+
const handle = await client.startOperation(op, action);
26+
if (action === 'waitForCancel') {
27+
workflow.CancellationScope.current().cancel();
28+
}
29+
return await handle.result();
30+
});
31+
}
32+
33+
export async function handler(action: string): Promise<string> {
34+
if (action === 'failWorkflow') {
35+
throw ApplicationFailure.create({
36+
nonRetryable: true,
37+
message: 'test asked to fail',
38+
type: 'IntentionalError',
39+
details: ['a detail'],
40+
});
41+
}
42+
if (action === 'waitForCancel') {
43+
await workflow.CancellationScope.current().cancelRequested;
44+
}
45+
return action;
46+
}
47+
48+
test('Nexus Operation from a Workflow', async (t) => {
49+
const { createWorker, executeWorkflow, taskQueue } = helpers(t);
50+
const endpoint = t.title.replaceAll(/[\s,]/g, '-') + '-' + randomUUID();
51+
await t.context.env.connection.operatorService.createNexusEndpoint({
52+
spec: {
53+
name: endpoint,
54+
target: {
55+
worker: {
56+
namespace: 'default',
57+
taskQueue,
58+
},
59+
},
60+
},
61+
});
62+
const worker = await createWorker({
63+
nexusServices: [
64+
nexus.serviceHandler(service, {
65+
async syncOp(action) {
66+
if (action === 'pass') {
67+
return action;
68+
}
69+
if (action === 'throwHandlerError') {
70+
throw new nexus.HandlerError({
71+
type: 'INTERNAL',
72+
retryable: false,
73+
message: 'test asked to fail',
74+
});
75+
}
76+
throw new nexus.HandlerError({
77+
type: 'BAD_REQUEST',
78+
message: 'invalid action',
79+
});
80+
},
81+
asyncOp: new temporalnexus.WorkflowRunOperation<string, string>(async (action, options) => {
82+
if (action === 'throwOperationError') {
83+
throw new nexus.OperationError({
84+
state: 'failed',
85+
message: 'some message',
86+
});
87+
}
88+
if (action === 'throwApplicationFailure') {
89+
throw ApplicationFailure.create({
90+
nonRetryable: true,
91+
message: 'test asked to fail',
92+
type: 'IntentionalError',
93+
details: ['a detail'],
94+
});
95+
}
96+
return await temporalnexus.startWorkflow(
97+
handler,
98+
options,
99+
{
100+
workflowId: randomUUID(),
101+
args: [action],
102+
},
103+
);
104+
}),
105+
}),
106+
],
107+
});
108+
await worker.runUntil(async () => {
109+
let res = await executeWorkflow(caller, {
110+
args: [endpoint, 'syncOp', 'pass'],
111+
});
112+
t.is(res, 'pass');
113+
let err = await t.throwsAsync(() => executeWorkflow(caller, {
114+
args: [endpoint, 'syncOp', 'throwHandlerError'],
115+
}), {
116+
instanceOf: WorkflowFailedError,
117+
});
118+
t.true(
119+
err instanceof WorkflowFailedError &&
120+
err.cause instanceof NexusOperationFailure &&
121+
err.cause.cause instanceof nexus.HandlerError &&
122+
err.cause.cause.type === 'INTERNAL'
123+
);
124+
125+
res = await executeWorkflow(caller, {
126+
args: [endpoint, 'asyncOp', 'pass'],
127+
});
128+
t.is(res, 'pass');
129+
err = await t.throwsAsync(() => executeWorkflow(caller, {
130+
args: [endpoint, 'asyncOp', 'waitForCancel'],
131+
}), {
132+
instanceOf: WorkflowFailedError,
133+
});
134+
t.true(
135+
err instanceof WorkflowFailedError &&
136+
err.cause instanceof NexusOperationFailure &&
137+
err.cause.cause instanceof CancelledFailure
138+
);
139+
140+
err = await t.throwsAsync(() => executeWorkflow(caller, {
141+
args: [endpoint, 'asyncOp', 'throwOperationError'],
142+
}), {
143+
instanceOf: WorkflowFailedError,
144+
});
145+
t.true(
146+
err instanceof WorkflowFailedError &&
147+
err.cause instanceof NexusOperationFailure &&
148+
err.cause.cause instanceof ApplicationFailure
149+
);
150+
151+
err = await t.throwsAsync(() => executeWorkflow(caller, {
152+
args: [endpoint, 'asyncOp', 'throwApplicationFailure'],
153+
}), {
154+
instanceOf: WorkflowFailedError,
155+
});
156+
t.true(
157+
err instanceof WorkflowFailedError &&
158+
err.cause instanceof NexusOperationFailure &&
159+
err.cause.cause instanceof nexus.HandlerError &&
160+
err.cause.cause.cause instanceof ApplicationFailure &&
161+
err.cause.cause.cause.message === 'test asked to fail' &&
162+
err.cause.cause.cause.details?.length === 1 &&
163+
err.cause.cause.cause.details[0] === 'a detail'
164+
);
165+
166+
err = await t.throwsAsync(() => executeWorkflow(caller, {
167+
args: [endpoint, 'asyncOp', 'failWorkflow'],
168+
}), {
169+
instanceOf: WorkflowFailedError,
170+
});
171+
t.true(
172+
err instanceof WorkflowFailedError &&
173+
err.cause instanceof NexusOperationFailure &&
174+
err.cause.cause instanceof ApplicationFailure &&
175+
err.cause.cause.message === 'test asked to fail' &&
176+
err.cause.cause.details?.length === 1 &&
177+
err.cause.cause.details[0] === 'a detail'
178+
);
179+
});
180+
});

packages/worker/src/nexus/index.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ export async function handlerErrorToProto(
6464
// TODO: this messes up the call stack and creates unnecessary nesting.
6565
//
6666
// Create an error without capturing a stack trace.
67-
const wrapped = Object.create(Error.prototype);
67+
const wrapped = Object.create(ApplicationFailure.prototype);
6868
wrapped.message = err.message;
6969
wrapped.stack = err.stack;
7070
cause = wrapped;
@@ -80,6 +80,7 @@ export class NexusHandler {
8080
constructor(
8181
public readonly taskToken: Uint8Array,
8282
public readonly namespace: string,
83+
public readonly taskQueue: string,
8384
public readonly info: nexus.HandlerInfo,
8485
public readonly client: Client,
8586
public readonly abortController: AbortController,
@@ -106,9 +107,10 @@ export class NexusHandler {
106107
let { cause } = err;
107108
if (cause == null) {
108109
// Create an error without capturing a stack trace.
109-
const wrapped = Object.create(Error.prototype);
110+
const wrapped = Object.create(ApplicationFailure.prototype);
110111
wrapped.message = err.message;
111112
wrapped.stack = err.stack;
113+
wrapped.nonRetryable = true;
112114
cause = wrapped;
113115
}
114116
return {
@@ -282,6 +284,7 @@ export class NexusHandler {
282284
info: this.info,
283285
client: this.client,
284286
namespace: this.namespace,
287+
taskQueue: this.taskQueue,
285288
links: [],
286289
log: withMetadata(this.workerLogger, { sdkComponent: SdkComponent.nexus }),
287290
};

packages/worker/src/worker.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1194,6 +1194,7 @@ export class Worker {
11941194
const nexusHandler = new NexusHandler(
11951195
taskToken,
11961196
this.options.namespace,
1197+
this.options.taskQueue,
11971198
info,
11981199
this.client!, // Must be defined if we are handling Nexus tasks.
11991200
ctrl,

packages/workflow/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
"scripts": {},
2424
"dependencies": {
2525
"@temporalio/common": "file:../common",
26-
"@temporalio/proto": "file:../proto"
26+
"@temporalio/proto": "file:../proto",
27+
"nexus-rpc": "file:../../../nexus-sdk-typescript"
2728
},
2829
"devDependencies": {
2930
"source-map": "^0.7.4"

packages/workflow/src/index.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ export {
9999
RootWorkflowInfo,
100100
StackTraceSDKInfo,
101101
StackTrace,
102+
StartNexusOperationOptions,
102103
UnsafeWorkflowInfo,
103104
WorkflowInfo,
104105
} from './interfaces';
@@ -119,3 +120,9 @@ export {
119120
// eslint-disable-next-line deprecation/deprecation
120121
LoggerSinksDeprecated as LoggerSinks,
121122
} from './logs';
123+
124+
export {
125+
createNexusClient,
126+
NexusClient,
127+
NexusOperationHandle,
128+
}from './nexus';

0 commit comments

Comments
 (0)