Skip to content

Commit 1cfd491

Browse files
authored
Reset workflow api (#866)
Reset workflow api
1 parent 5021e6e commit 1cfd491

File tree

7 files changed

+248
-0
lines changed

7 files changed

+248
-0
lines changed
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import { type NextRequest } from 'next/server';
2+
3+
import { resetWorkflow } from '@/route-handlers/reset-workflow/reset-workflow';
4+
import { type RouteParams } from '@/route-handlers/reset-workflow/reset-workflow.types';
5+
import { routeHandlerWithMiddlewares } from '@/utils/route-handlers-middleware';
6+
import routeHandlersDefaultMiddlewares from '@/utils/route-handlers-middleware/config/route-handlers-default-middlewares.config';
7+
8+
export async function POST(
9+
request: NextRequest,
10+
options: { params: RouteParams }
11+
) {
12+
return routeHandlerWithMiddlewares(
13+
resetWorkflow,
14+
request,
15+
options,
16+
routeHandlersDefaultMiddlewares
17+
);
18+
}
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
import { NextRequest } from 'next/server';
2+
3+
import { GRPCError } from '@/utils/grpc/grpc-error';
4+
import { mockGrpcClusterMethods } from '@/utils/route-handlers-middleware/middlewares/__mocks__/grpc-cluster-methods';
5+
6+
import { resetWorkflow } from '../reset-workflow';
7+
import { type Context } from '../reset-workflow.types';
8+
9+
const defaultRequestBody = {
10+
reason: 'Resetting workflow from cadence-web UI',
11+
decisionFinishEventId: 4,
12+
requestId: '',
13+
skipSignalReapply: false,
14+
};
15+
16+
describe(resetWorkflow.name, () => {
17+
beforeEach(() => {
18+
jest.clearAllMocks();
19+
});
20+
21+
it('calls resetWorkflow and returns valid response', async () => {
22+
const { res, mockResetWorkflow } = await setup({});
23+
24+
expect(mockResetWorkflow).toHaveBeenCalledWith({
25+
domain: 'mock-domain',
26+
workflowExecution: {
27+
workflowId: 'mock-wfid',
28+
runId: 'mock-runid',
29+
},
30+
...defaultRequestBody,
31+
});
32+
33+
const responseJson = await res.json();
34+
expect(responseJson).toEqual({ runId: 'mock-runid-new' });
35+
});
36+
37+
it('calls resetWorkflow with passed request body', async () => {
38+
const { mockResetWorkflow } = await setup({
39+
requestBody: JSON.stringify({
40+
reason: 'This workflow needs to be reset',
41+
decisionFinishEventId: 123,
42+
requestId: 'test-request-id',
43+
skipSignalReapply: true,
44+
}),
45+
});
46+
47+
expect(mockResetWorkflow).toHaveBeenCalledWith(
48+
expect.objectContaining({
49+
reason: 'This workflow needs to be reset',
50+
decisionFinishEventId: 123,
51+
requestId: 'test-request-id',
52+
skipSignalReapply: true,
53+
})
54+
);
55+
});
56+
57+
it('returns an error if something went wrong in the backend', async () => {
58+
const { res, mockResetWorkflow } = await setup({
59+
error: true,
60+
});
61+
62+
expect(mockResetWorkflow).toHaveBeenCalled();
63+
64+
expect(res.status).toEqual(500);
65+
const responseJson = await res.json();
66+
expect(responseJson).toEqual(
67+
expect.objectContaining({
68+
message: 'Could not reset workflow',
69+
})
70+
);
71+
});
72+
73+
it('returns an error if the request body is not in an expected format', async () => {
74+
const { res, mockResetWorkflow } = await setup({
75+
requestBody: JSON.stringify({
76+
reason: 5, // should be a string
77+
decisionFinishEventId: 'not-a-number', // should be a number
78+
}),
79+
});
80+
81+
expect(mockResetWorkflow).not.toHaveBeenCalled();
82+
83+
const responseJson = await res.json();
84+
expect(responseJson).toEqual(
85+
expect.objectContaining({
86+
message: 'Invalid values provided for workflow reset',
87+
})
88+
);
89+
});
90+
});
91+
92+
async function setup({
93+
requestBody = JSON.stringify(defaultRequestBody),
94+
error,
95+
}: {
96+
requestBody?: string;
97+
error?: true;
98+
}) {
99+
const mockResetWorkflow = jest
100+
.spyOn(mockGrpcClusterMethods, 'resetWorkflow')
101+
.mockImplementationOnce(async () => {
102+
if (error) {
103+
throw new GRPCError('Could not reset workflow');
104+
}
105+
return { runId: 'mock-runid-new' };
106+
});
107+
108+
const res = await resetWorkflow(
109+
new NextRequest('http://localhost', {
110+
method: 'POST',
111+
body: requestBody ?? '{}',
112+
}),
113+
{
114+
params: {
115+
domain: 'mock-domain',
116+
cluster: 'mock-cluster',
117+
workflowId: 'mock-wfid',
118+
runId: 'mock-runid',
119+
},
120+
},
121+
{
122+
grpcClusterMethods: mockGrpcClusterMethods,
123+
} as Context
124+
);
125+
126+
return { res, mockResetWorkflow };
127+
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import { type NextRequest, NextResponse } from 'next/server';
2+
3+
import decodeUrlParams from '@/utils/decode-url-params';
4+
import { getHTTPStatusCode, GRPCError } from '@/utils/grpc/grpc-error';
5+
import logger, { type RouteHandlerErrorPayload } from '@/utils/logger';
6+
7+
import { type Context, type RequestParams } from './reset-workflow.types';
8+
import resetWorkflowRequestBodySchema from './schemas/reset-workflow-request-body-schema';
9+
10+
export async function resetWorkflow(
11+
request: NextRequest,
12+
requestParams: RequestParams,
13+
ctx: Context
14+
) {
15+
const requestBody = await request.json();
16+
const { data, error } = resetWorkflowRequestBodySchema.safeParse(requestBody);
17+
18+
if (error) {
19+
return NextResponse.json(
20+
{
21+
message: 'Invalid values provided for workflow reset',
22+
validationErrors: error.errors,
23+
},
24+
{ status: 400 }
25+
);
26+
}
27+
28+
const decodedParams = decodeUrlParams(requestParams.params);
29+
30+
try {
31+
const response = await ctx.grpcClusterMethods.resetWorkflow({
32+
domain: decodedParams.domain,
33+
workflowExecution: {
34+
workflowId: decodedParams.workflowId,
35+
runId: decodedParams.runId,
36+
},
37+
reason: data.reason,
38+
decisionFinishEventId: data.decisionFinishEventId,
39+
requestId: data.requestId,
40+
skipSignalReapply: data.skipSignalReapply,
41+
// TODO: add user identity
42+
});
43+
44+
return NextResponse.json(response);
45+
} catch (e) {
46+
logger.error<RouteHandlerErrorPayload>(
47+
{ requestParams: decodedParams, error: e },
48+
'Error resetting workflow'
49+
);
50+
51+
return NextResponse.json(
52+
{
53+
message:
54+
e instanceof GRPCError ? e.message : 'Error resetting workflow',
55+
cause: e,
56+
},
57+
{ status: getHTTPStatusCode(e) }
58+
);
59+
}
60+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import { type ResetWorkflowExecutionResponse } from '@/__generated__/proto-ts/uber/cadence/api/v1/ResetWorkflowExecutionResponse';
2+
import { type DefaultMiddlewaresContext } from '@/utils/route-handlers-middleware';
3+
4+
export type RouteParams = {
5+
domain: string;
6+
cluster: string;
7+
workflowId: string;
8+
runId: string;
9+
};
10+
11+
export type RequestParams = {
12+
params: RouteParams;
13+
};
14+
15+
export type ResetWorkflowResponse = ResetWorkflowExecutionResponse;
16+
17+
export type Context = DefaultMiddlewaresContext;
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import { z } from 'zod';
2+
3+
const resetWorkflowRequestBodySchema = z.object({
4+
reason: z
5+
.string()
6+
.optional()
7+
.default('Resetting workflow from cadence-web UI'),
8+
decisionFinishEventId: z.union([z.string(), z.number()]),
9+
requestId: z.string().optional(),
10+
skipSignalReapply: z.boolean().optional(),
11+
});
12+
13+
export default resetWorkflowRequestBodySchema;

src/utils/grpc/grpc-client.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ import { type QueryWorkflowRequest__Input } from '@/__generated__/proto-ts/uber/
2424
import { type QueryWorkflowResponse } from '@/__generated__/proto-ts/uber/cadence/api/v1/QueryWorkflowResponse';
2525
import { type RequestCancelWorkflowExecutionRequest__Input } from '@/__generated__/proto-ts/uber/cadence/api/v1/RequestCancelWorkflowExecutionRequest';
2626
import { type RequestCancelWorkflowExecutionResponse } from '@/__generated__/proto-ts/uber/cadence/api/v1/RequestCancelWorkflowExecutionResponse';
27+
import { type ResetWorkflowExecutionRequest__Input } from '@/__generated__/proto-ts/uber/cadence/api/v1/ResetWorkflowExecutionRequest';
28+
import { type ResetWorkflowExecutionResponse } from '@/__generated__/proto-ts/uber/cadence/api/v1/ResetWorkflowExecutionResponse';
2729
import { type RestartWorkflowExecutionRequest__Input } from '@/__generated__/proto-ts/uber/cadence/api/v1/RestartWorkflowExecutionRequest';
2830
import { type RestartWorkflowExecutionResponse } from '@/__generated__/proto-ts/uber/cadence/api/v1/RestartWorkflowExecutionResponse';
2931
import { type SignalWorkflowExecutionRequest__Input } from '@/__generated__/proto-ts/uber/cadence/api/v1/SignalWorkflowExecutionRequest';
@@ -102,6 +104,9 @@ export type GRPCClusterMethods = {
102104
restartWorkflow: (
103105
payload: RestartWorkflowExecutionRequest__Input
104106
) => Promise<RestartWorkflowExecutionResponse>;
107+
resetWorkflow: (
108+
payload: ResetWorkflowExecutionRequest__Input
109+
) => Promise<ResetWorkflowExecutionResponse>;
105110
};
106111

107112
// cache services instances
@@ -295,6 +300,13 @@ const getClusterServicesMethods = async (
295300
method: 'RestartWorkflowExecution',
296301
metadata: metadata,
297302
}),
303+
resetWorkflow: workflowService.request<
304+
ResetWorkflowExecutionRequest__Input,
305+
ResetWorkflowExecutionResponse
306+
>({
307+
method: 'ResetWorkflowExecution',
308+
metadata: metadata,
309+
}),
298310
};
299311
};
300312

src/utils/route-handlers-middleware/middlewares/__mocks__/grpc-cluster-methods.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,5 @@ export const mockGrpcClusterMethods: GRPCClusterMethods = {
1919
terminateWorkflow: jest.fn(),
2020
requestCancelWorkflow: jest.fn(),
2121
restartWorkflow: jest.fn(),
22+
resetWorkflow: jest.fn(),
2223
};

0 commit comments

Comments
 (0)