Skip to content

Commit d19a613

Browse files
Add new endpoint to manually trigger workflows (#1089)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 430d486 commit d19a613

File tree

2 files changed

+360
-0
lines changed

2 files changed

+360
-0
lines changed

packages/server/api/src/app/flows/flow/flow.controller.ts

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@ import {
22
FastifyPluginAsyncTypebox,
33
Type,
44
} from '@fastify/type-provider-typebox';
5+
import { TriggerStrategy } from '@openops/blocks-framework';
56
import {
67
ApplicationError,
78
CountFlowsRequest,
89
CreateEmptyFlowRequest,
910
CreateFlowFromTemplateRequest,
1011
ErrorCode,
12+
ExecutionType,
1113
FlowOperationRequest,
14+
FlowRunTriggerSource,
1215
FlowTemplateWithoutProjectInformation,
1316
FlowVersionMetadata,
1417
GetFlowQueryParamsRequest,
@@ -17,19 +20,25 @@ import {
1720
ListFlowsRequest,
1821
ListFlowVersionRequest,
1922
OpenOpsId,
23+
openOpsId,
2024
Permission,
2125
PopulatedFlow,
2226
Principal,
2327
PrincipalType,
28+
ProgressUpdateType,
29+
RunEnvironment,
2430
SeekPage,
2531
SERVICE_KEY_SECURITY_OPENAPI,
32+
TriggerType,
2633
TriggerWithOptionalId,
2734
} from '@openops/shared';
2835
import dayjs from 'dayjs';
2936
import { StatusCodes } from 'http-status-codes';
3037
import { entitiesMustBeOwnedByCurrentProject } from '../../authentication/authorization';
3138
import { projectService } from '../../project/project-service';
39+
import { flowRunService } from '../flow-run/flow-run-service';
3240
import { flowVersionService } from '../flow-version/flow-version.service';
41+
import { triggerUtils } from '../trigger/hooks/trigger-utils';
3342
import { flowService } from './flow.service';
3443

3544
const DEFAULT_PAGE_SIZE = 10;
@@ -139,6 +148,69 @@ export const flowController: FastifyPluginAsyncTypebox = async (app) => {
139148
cursorRequest: request.query.cursor ?? null,
140149
});
141150
});
151+
152+
app.post('/:id/run', RunFlowRequestOptions, async (request, reply) => {
153+
try {
154+
const flow = await flowService.getOnePopulatedOrThrow({
155+
id: request.params.id,
156+
projectId: request.principal.projectId,
157+
});
158+
159+
if (!flow.publishedVersionId) {
160+
return await reply.status(StatusCodes.BAD_REQUEST).send({
161+
success: false,
162+
message:
163+
'Workflow must be published before it can be triggered manually',
164+
});
165+
}
166+
167+
const publishedFlow = await flowService.getOnePopulatedOrThrow({
168+
id: request.params.id,
169+
projectId: request.principal.projectId,
170+
versionId: flow.publishedVersionId,
171+
});
172+
173+
const validationResult = await validateTriggerType(
174+
publishedFlow,
175+
request.principal.projectId,
176+
);
177+
if (!validationResult.success) {
178+
return await reply
179+
.status(StatusCodes.BAD_REQUEST)
180+
.send(validationResult);
181+
}
182+
183+
const flowRun = await flowRunService.start({
184+
environment: RunEnvironment.PRODUCTION,
185+
flowVersionId: publishedFlow.version.id,
186+
projectId: request.principal.projectId,
187+
payload: {},
188+
executionType: ExecutionType.BEGIN,
189+
synchronousHandlerId: undefined,
190+
executionCorrelationId: openOpsId(),
191+
progressUpdateType: ProgressUpdateType.NONE,
192+
triggerSource: FlowRunTriggerSource.MANUAL_RUN,
193+
});
194+
195+
return await reply.status(StatusCodes.OK).send({
196+
success: true,
197+
flowRunId: flowRun.id,
198+
status: flowRun.status,
199+
message: 'Workflow execution started successfully',
200+
});
201+
} catch (error) {
202+
if (
203+
error instanceof ApplicationError &&
204+
error.error?.code === ErrorCode.ENTITY_NOT_FOUND
205+
) {
206+
return reply.status(StatusCodes.BAD_REQUEST).send({
207+
success: false,
208+
message: `Something went wrong while triggering the workflow execution manually. ${error.message}`,
209+
});
210+
}
211+
throw error;
212+
}
213+
});
142214
};
143215

144216
async function createFromTemplate(
@@ -197,6 +269,42 @@ async function extractUserIdFromPrincipal(
197269
return project.ownerId;
198270
}
199271

272+
async function validateTriggerType(
273+
flow: PopulatedFlow,
274+
projectId: string,
275+
): Promise<{ success: boolean; message: string }> {
276+
const blockTrigger = flow.version.trigger;
277+
278+
if (blockTrigger.type !== TriggerType.BLOCK) {
279+
return {
280+
success: false,
281+
message: `Trigger type is not a block: type: ${blockTrigger.type}`,
282+
};
283+
}
284+
285+
try {
286+
const metadata = await triggerUtils.getBlockTriggerOrThrow({
287+
trigger: blockTrigger,
288+
projectId,
289+
});
290+
291+
return {
292+
success: metadata.type === TriggerStrategy.POLLING,
293+
message:
294+
metadata.type === TriggerStrategy.POLLING
295+
? 'Trigger type validation successful'
296+
: 'Only polling workflows can be triggered manually',
297+
};
298+
} catch (error) {
299+
return {
300+
success: false,
301+
message: `Something went wrong while validating the trigger type. ${
302+
error instanceof Error ? error.message : 'Unknown error'
303+
}`,
304+
};
305+
}
306+
}
307+
200308
const CreateFlowRequestOptions = {
201309
config: {
202310
allowedPrincipals: [PrincipalType.USER, PrincipalType.SERVICE],
@@ -325,3 +433,32 @@ const DeleteFlowRequestOptions = {
325433
},
326434
},
327435
};
436+
437+
const RunFlowRequestOptions = {
438+
config: {
439+
allowedPrincipals: [PrincipalType.USER],
440+
permission: Permission.WRITE_FLOW,
441+
preSerializationHook: entitiesMustBeOwnedByCurrentProject,
442+
},
443+
schema: {
444+
tags: ['flows'],
445+
description:
446+
'Manually trigger a workflow execution. Only works for polling-type workflows.',
447+
security: [SERVICE_KEY_SECURITY_OPENAPI],
448+
params: Type.Object({
449+
id: OpenOpsId,
450+
}),
451+
response: {
452+
[StatusCodes.OK]: Type.Object({
453+
success: Type.Boolean(),
454+
flowRunId: Type.String(),
455+
status: Type.String(),
456+
message: Type.String(),
457+
}),
458+
[StatusCodes.BAD_REQUEST]: Type.Object({
459+
success: Type.Boolean(),
460+
message: Type.String(),
461+
}),
462+
},
463+
},
464+
};

0 commit comments

Comments
 (0)