Skip to content

Commit 3d63ae1

Browse files
committed
feat: triggering workflows using cloud events
1 parent 7d7c169 commit 3d63ae1

File tree

11 files changed

+265
-23
lines changed

11 files changed

+265
-23
lines changed

workspaces/orchestrator/entities/users.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22
apiVersion: backstage.io/v1alpha1
33
kind: User
44
metadata:
5-
name: mareklibra
5+
name: lholmquist
66
spec:
77
profile:
8-
displayName: Marek Libra
9-
email: foo@xyzbar.cz
8+
displayName: Lucas Holmquist
9+
email: lholmqui@redhat.com
1010
memberOf: []

workspaces/orchestrator/plugins/orchestrator-backend/package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,11 +76,13 @@
7676
"@red-hat-developer-hub/backstage-plugin-orchestrator-common": "workspace:^",
7777
"@urql/core": "^4.1.4",
7878
"ajv-formats": "^2.1.1",
79-
"cloudevents": "^8.0.0",
79+
"cloudevents": "^10.0.0",
8080
"express": "^4.21.2",
8181
"express-promise-router": "^4.1.1",
8282
"fs-extra": "^10.1.0",
8383
"isomorphic-git": "^1.23.0",
84+
"js-yaml": "^4.1.0",
85+
"kafkajs": "^2.2.4",
8486
"lodash": "^4.17.21",
8587
"luxon": "^3.7.2",
8688
"openapi-backend": "^5.10.5",

workspaces/orchestrator/plugins/orchestrator-backend/src/service/DataIndexService.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,7 @@ export class DataIndexService {
307307
});
308308

309309
this.logger.debug(`GraphQL query: ${graphQlQuery}`);
310+
console.log(`GraphQL query: ${graphQlQuery}`);
310311

311312
const result = await this.client.query<{
312313
ProcessInstances: ProcessInstance[];

workspaces/orchestrator/plugins/orchestrator-backend/src/service/Helper.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2024 The Backstage Authors
2+
* Copyright Red Hat, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -13,6 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16+
1617
import type { LoggerService } from '@backstage/backend-plugin-api';
1718
import type { Config } from '@backstage/config';
1819

workspaces/orchestrator/plugins/orchestrator-backend/src/service/OrchestratorService.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,19 @@ export class OrchestratorService {
139139
});
140140
}
141141

142+
// TODO: Execute as CE
143+
public async executeWorkflowAsCloudEvent(args: {
144+
definitionId: string;
145+
workflowSource: string;
146+
workflowEventType: string;
147+
contextAttribute: string;
148+
inputData?: ProcessInstanceVariables;
149+
authTokens?: Array<AuthToken>;
150+
backstageToken?: string | undefined;
151+
}) {
152+
return await this.sonataFlowService.executeWorkflowAsCloudEvent(args);
153+
}
154+
142155
public async executeWorkflow(args: {
143156
definitionId: string;
144157
serviceUrl: string;

workspaces/orchestrator/plugins/orchestrator-backend/src/service/SonataFlowService.ts

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
import { LoggerService } from '@backstage/backend-plugin-api';
1818

19+
import { CloudEvent, Kafka as KafkaCE } from 'cloudevents';
20+
import { Kafka, logLevel } from 'kafkajs';
1921
import capitalize from 'lodash/capitalize';
2022

2123
import {
@@ -31,13 +33,22 @@ import {
3133
WorkflowOverview,
3234
} from '@red-hat-developer-hub/backstage-plugin-orchestrator-common';
3335

36+
import { randomUUID } from 'node:crypto';
37+
3438
import { Pagination } from '../types/pagination';
3539
import { DataIndexService } from './DataIndexService';
3640

41+
export type OrchestratorKafkaService = {
42+
clientId: string;
43+
brokers: string[];
44+
logLevel?: string;
45+
};
46+
3747
export class SonataFlowService {
3848
constructor(
3949
private readonly dataIndexService: DataIndexService,
4050
private readonly logger: LoggerService,
51+
private readonly kafkaService: OrchestratorKafkaService,
4152
) {}
4253

4354
public async fetchWorkflowInfoOnService(args: {
@@ -103,6 +114,72 @@ export class SonataFlowService {
103114
return items.filter((item): item is WorkflowOverview => !!item);
104115
}
105116

117+
public async executeWorkflowAsCloudEvent(args: {
118+
definitionId: string;
119+
workflowSource: string;
120+
workflowEventType: string;
121+
contextAttribute: string;
122+
inputData?: ProcessInstanceVariables;
123+
authTokens?: Array<AuthToken>;
124+
backstageToken?: string | undefined;
125+
}): Promise<WorkflowExecutionResponse | undefined> {
126+
const contextAttributeId = randomUUID();
127+
// TODO: something about adding a header as an extension
128+
const triggeringCloudEvent = new CloudEvent({
129+
datacontenttype: 'application/json',
130+
type: args.workflowEventType,
131+
source: args.workflowSource,
132+
[args.contextAttribute]: contextAttributeId,
133+
data: {
134+
...args.inputData,
135+
[args.contextAttribute]: contextAttributeId, // Need to be able to correlate the workflow run somehow
136+
},
137+
});
138+
139+
// Put the CE in the format needed to send as a Kafka message
140+
const lockEventBinding = KafkaCE.binary(triggeringCloudEvent);
141+
// Create the message event that will be sent to the kafka topic
142+
const messageEvent = {
143+
key: '',
144+
value: JSON.stringify(KafkaCE.toEvent(lockEventBinding)),
145+
};
146+
147+
// TODO: look at the community plugins kafka backend plugin to see how to do the ssl type stuff
148+
// It looks like that plugin just passes the whole options from the app-config to the kafkajs constructor
149+
const kafkaConnectionBinding = {
150+
logLevel: logLevel.DEBUG, // TODO: need to figure out this with Typing
151+
brokers: this.kafkaService.brokers,
152+
clientId: this.kafkaService.clientId,
153+
};
154+
155+
const kfk = new Kafka(kafkaConnectionBinding);
156+
157+
try {
158+
const producer = kfk.producer();
159+
// Connect the producer
160+
await producer.connect();
161+
162+
// Send the message
163+
await producer.send({
164+
topic: args.workflowEventType,
165+
messages: [messageEvent],
166+
});
167+
168+
// Disconnect the producer
169+
await producer.disconnect();
170+
} catch (error) {
171+
throw new Error(
172+
`Error with Kafka client with connection options: ${kafkaConnectionBinding}`,
173+
);
174+
}
175+
176+
// Since sending to kafka doesn't return anything, send back the contextAttributeId here
177+
// Then we will query the workflow instances to see if it showed up yet
178+
return {
179+
id: contextAttributeId,
180+
};
181+
}
182+
106183
public async executeWorkflow(args: {
107184
definitionId: string;
108185
serviceUrl: string;

workspaces/orchestrator/plugins/orchestrator-backend/src/service/api/v2.ts

Lines changed: 122 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
/* eslint-disable no-else-return */
12
/*
23
* Copyright Red Hat, Inc.
34
*
@@ -14,6 +15,7 @@
1415
* limitations under the License.
1516
*/
1617

18+
import { load } from 'js-yaml';
1719
import { ParsedRequest } from 'openapi-backend';
1820

1921
import {
@@ -26,6 +28,7 @@ import {
2628
ProcessInstanceState,
2729
RetriggerInstanceRequestDTO,
2830
WorkflowDTO,
31+
WorkflowExecutionResponse,
2932
WorkflowInfo,
3033
WorkflowOverviewDTO,
3134
WorkflowOverviewListResultDTO,
@@ -195,17 +198,125 @@ export class V2 {
195198
if (!definition.serviceUrl) {
196199
throw new Error(`ServiceURL is not defined for workflow ${workflowId}`);
197200
}
198-
const executionResponse = await this.orchestratorService.executeWorkflow({
199-
definitionId: workflowId,
200-
inputData: {
201-
workflowdata: executeWorkflowRequestDTO.inputData,
202-
initiatorEntity: initiatorEntity,
203-
targetEntity: executeWorkflowRequestDTO.targetEntity,
204-
},
205-
authTokens: executeWorkflowRequestDTO.authTokens as Array<AuthToken>,
206-
serviceUrl: definition.serviceUrl,
207-
backstageToken,
208-
});
201+
202+
let executionResponse: WorkflowExecutionResponse | undefined;
203+
204+
// Figure out if we are sending as cloudevent are regular
205+
// The frontend will send the isEvent=true parameter in the inputData if it running as an Event
206+
const isEventType =
207+
(executeWorkflowRequestDTO.inputData as any)?.isEvent || false;
208+
209+
// definition.source will be the yaml
210+
const parsedDefinitionSource: any = load(definition.source as string);
211+
212+
if (isEventType) {
213+
// This is where we call the cloud event execute
214+
// All workflows should have this?
215+
const start = parsedDefinitionSource.start;
216+
217+
// Find the start state from the list of states
218+
const startState = parsedDefinitionSource.states.filter(
219+
(val: { name: any }) => {
220+
return val.name === start;
221+
},
222+
);
223+
224+
if (startState.length < 1) {
225+
// Remove this:
226+
console.log('No States that match the start state');
227+
throw new Error(
228+
'Error executing workflow with id ${workflowId}, No States that match the start state',
229+
);
230+
}
231+
232+
// Look at the onEvents to get what it responds to
233+
// What happens if there are multiple events and event refs?
234+
// The kafka topic name will be: ${eventName}`
235+
const eventName = startState[0].onEvents[0].eventRefs[0];
236+
237+
const workflowEventToUse = parsedDefinitionSource.events.filter(
238+
(val: { name: any }) => {
239+
return val.name === eventName;
240+
},
241+
);
242+
243+
if (workflowEventToUse.length < 1) {
244+
throw new Error(
245+
'Error executing workflow with id ${workflowId}, No Events that match the start state eventRef',
246+
);
247+
}
248+
249+
// This will be the key value for event correlation: ${correlationContextAttributeName}
250+
const correlationContextAttributeName =
251+
workflowEventToUse[0].correlation[0].contextAttributeName;
252+
253+
executionResponse =
254+
await this.orchestratorService.executeWorkflowAsCloudEvent({
255+
definitionId: workflowId,
256+
workflowSource: workflowEventToUse[0].source,
257+
workflowEventType: eventName,
258+
contextAttribute: correlationContextAttributeName,
259+
inputData: {
260+
workflowdata: executeWorkflowRequestDTO.inputData,
261+
initiatorEntity: initiatorEntity,
262+
targetEntity: executeWorkflowRequestDTO.targetEntity,
263+
},
264+
authTokens: executeWorkflowRequestDTO.authTokens as Array<AuthToken>,
265+
backstageToken,
266+
});
267+
268+
// We need to return the workflow instance ID
269+
// This is what is returned when executing a "normal" workflow
270+
// Wait a small amount so the workflow has a chance to get triggered
271+
// There is a very good possibility that the workflow will not be ready yet when we query here
272+
273+
let currentInstanceToReturn: string | any[] = [];
274+
for (let i = 0; i < FETCH_INSTANCE_MAX_ATTEMPTS; i++) {
275+
const response = await this.orchestratorService.fetchInstances({
276+
workflowIds: [workflowId],
277+
});
278+
279+
// eslint-disable-next-line no-loop-func
280+
currentInstanceToReturn = response.filter((val: any) => {
281+
return (
282+
val.variables.workflowdata[correlationContextAttributeName] ===
283+
executionResponse?.id
284+
);
285+
});
286+
287+
if (currentInstanceToReturn.length > 0) {
288+
break;
289+
}
290+
}
291+
292+
let currentInstanceIDToReturn;
293+
if (currentInstanceToReturn.length < 1) {
294+
// nothing returned yet,
295+
// doesn't mean this is an error since it might take time for the trigger to happen
296+
// return something else so the front-end knows to just show the list of workflow runs
297+
currentInstanceIDToReturn = 'kafkaEvent';
298+
} else {
299+
currentInstanceIDToReturn = currentInstanceToReturn[0]?.id;
300+
}
301+
302+
// Return just the id of the response, which will be the instanceID or some identifier
303+
// to let the front end know the workflow isn't ready yet
304+
return {
305+
id: currentInstanceIDToReturn,
306+
};
307+
} else {
308+
executionResponse = await this.orchestratorService.executeWorkflow({
309+
definitionId: workflowId,
310+
inputData: {
311+
workflowdata: executeWorkflowRequestDTO.inputData,
312+
initiatorEntity: initiatorEntity,
313+
targetEntity: executeWorkflowRequestDTO.targetEntity,
314+
},
315+
authTokens: executeWorkflowRequestDTO.authTokens as Array<AuthToken>,
316+
serviceUrl: definition.serviceUrl,
317+
backstageToken,
318+
});
319+
}
209320

210321
if (!executionResponse) {
211322
throw new Error(`Couldn't execute workflow ${workflowId}`);

workspaces/orchestrator/plugins/orchestrator-backend/src/service/router.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,10 @@ import { V2 } from './api/v2';
6060
import { DataIndexService } from './DataIndexService';
6161
import { DataInputSchemaService } from './DataInputSchemaService';
6262
import { OrchestratorService } from './OrchestratorService';
63-
import { SonataFlowService } from './SonataFlowService';
63+
import {
64+
OrchestratorKafkaService,
65+
SonataFlowService,
66+
} from './SonataFlowService';
6467
import { WorkflowCacheService } from './WorkflowCacheService';
6568

6669
interface PublicServices {
@@ -250,8 +253,15 @@ function initPublicServices(
250253
scheduler: SchedulerService,
251254
): PublicServices {
252255
const dataIndexUrl = config.getString('orchestrator.dataIndexService.url');
256+
// TODO: what happens when this isn't there?
257+
const orchestratorKafka: OrchestratorKafkaService =
258+
config.get('orchestrator.kafka');
253259
const dataIndexService = new DataIndexService(dataIndexUrl, logger);
254-
const sonataFlowService = new SonataFlowService(dataIndexService, logger);
260+
const sonataFlowService = new SonataFlowService(
261+
dataIndexService,
262+
logger,
263+
orchestratorKafka,
264+
);
255265

256266
const workflowCacheService = new WorkflowCacheService(
257267
logger,

0 commit comments

Comments
 (0)