diff --git a/workspaces/orchestrator/entities/users.yaml b/workspaces/orchestrator/entities/users.yaml index cfe6e440c0..a5d19f5503 100644 --- a/workspaces/orchestrator/entities/users.yaml +++ b/workspaces/orchestrator/entities/users.yaml @@ -2,9 +2,9 @@ apiVersion: backstage.io/v1alpha1 kind: User metadata: - name: mareklibra + name: lholmquist spec: profile: - displayName: Marek Libra - email: foo@xyzbar.cz + displayName: Lucas Holmquist + email: lholmqui@redhat.com memberOf: [] diff --git a/workspaces/orchestrator/plugins/orchestrator-backend/package.json b/workspaces/orchestrator/plugins/orchestrator-backend/package.json index c1328b6468..19e877fc58 100644 --- a/workspaces/orchestrator/plugins/orchestrator-backend/package.json +++ b/workspaces/orchestrator/plugins/orchestrator-backend/package.json @@ -76,11 +76,13 @@ "@red-hat-developer-hub/backstage-plugin-orchestrator-common": "workspace:^", "@urql/core": "^4.1.4", "ajv-formats": "^2.1.1", - "cloudevents": "^8.0.0", + "cloudevents": "^10.0.0", "express": "^4.21.2", "express-promise-router": "^4.1.1", "fs-extra": "^10.1.0", "isomorphic-git": "^1.23.0", + "js-yaml": "^4.1.0", + "kafkajs": "^2.2.4", "lodash": "^4.17.21", "luxon": "^3.7.2", "openapi-backend": "^5.10.5", diff --git a/workspaces/orchestrator/plugins/orchestrator-backend/src/service/DataIndexService.ts b/workspaces/orchestrator/plugins/orchestrator-backend/src/service/DataIndexService.ts index 22231c52a5..f487db32ae 100644 --- a/workspaces/orchestrator/plugins/orchestrator-backend/src/service/DataIndexService.ts +++ b/workspaces/orchestrator/plugins/orchestrator-backend/src/service/DataIndexService.ts @@ -307,6 +307,7 @@ export class DataIndexService { }); this.logger.debug(`GraphQL query: ${graphQlQuery}`); + console.log(`GraphQL query: ${graphQlQuery}`); const result = await this.client.query<{ ProcessInstances: ProcessInstance[]; diff --git a/workspaces/orchestrator/plugins/orchestrator-backend/src/service/Helper.ts b/workspaces/orchestrator/plugins/orchestrator-backend/src/service/Helper.ts index 73fd4cb74d..8453415d7c 100644 --- a/workspaces/orchestrator/plugins/orchestrator-backend/src/service/Helper.ts +++ b/workspaces/orchestrator/plugins/orchestrator-backend/src/service/Helper.ts @@ -1,5 +1,5 @@ /* - * Copyright 2024 The Backstage Authors + * Copyright Red Hat, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,6 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + import type { LoggerService } from '@backstage/backend-plugin-api'; import type { Config } from '@backstage/config'; diff --git a/workspaces/orchestrator/plugins/orchestrator-backend/src/service/OrchestratorService.ts b/workspaces/orchestrator/plugins/orchestrator-backend/src/service/OrchestratorService.ts index 714112149e..46f9474274 100644 --- a/workspaces/orchestrator/plugins/orchestrator-backend/src/service/OrchestratorService.ts +++ b/workspaces/orchestrator/plugins/orchestrator-backend/src/service/OrchestratorService.ts @@ -139,6 +139,19 @@ export class OrchestratorService { }); } + // TODO: Execute as CE + public async executeWorkflowAsCloudEvent(args: { + definitionId: string; + workflowSource: string; + workflowEventType: string; + contextAttribute: string; + inputData?: ProcessInstanceVariables; + authTokens?: Array; + backstageToken?: string | undefined; + }) { + return await this.sonataFlowService.executeWorkflowAsCloudEvent(args); + } + public async executeWorkflow(args: { definitionId: string; serviceUrl: string; diff --git a/workspaces/orchestrator/plugins/orchestrator-backend/src/service/SonataFlowService.ts b/workspaces/orchestrator/plugins/orchestrator-backend/src/service/SonataFlowService.ts index 5408c88e1b..3f7fc9ee01 100644 --- a/workspaces/orchestrator/plugins/orchestrator-backend/src/service/SonataFlowService.ts +++ b/workspaces/orchestrator/plugins/orchestrator-backend/src/service/SonataFlowService.ts @@ -16,6 +16,8 @@ import { LoggerService } from '@backstage/backend-plugin-api'; +import { CloudEvent, Kafka as KafkaCE } from 'cloudevents'; +import { Kafka, logLevel } from 'kafkajs'; import capitalize from 'lodash/capitalize'; import { @@ -31,13 +33,22 @@ import { WorkflowOverview, } from '@red-hat-developer-hub/backstage-plugin-orchestrator-common'; +import { randomUUID } from 'node:crypto'; + import { Pagination } from '../types/pagination'; import { DataIndexService } from './DataIndexService'; +export type OrchestratorKafkaService = { + clientId: string; + brokers: string[]; + logLevel?: string; +}; + export class SonataFlowService { constructor( private readonly dataIndexService: DataIndexService, private readonly logger: LoggerService, + private readonly kafkaService: OrchestratorKafkaService, ) {} public async fetchWorkflowInfoOnService(args: { @@ -103,6 +114,72 @@ export class SonataFlowService { return items.filter((item): item is WorkflowOverview => !!item); } + public async executeWorkflowAsCloudEvent(args: { + definitionId: string; + workflowSource: string; + workflowEventType: string; + contextAttribute: string; + inputData?: ProcessInstanceVariables; + authTokens?: Array; + backstageToken?: string | undefined; + }): Promise { + const contextAttributeId = randomUUID(); + // TODO: something about adding a header as an extension + const triggeringCloudEvent = new CloudEvent({ + datacontenttype: 'application/json', + type: args.workflowEventType, + source: args.workflowSource, + [args.contextAttribute]: contextAttributeId, + data: { + ...args.inputData, + [args.contextAttribute]: contextAttributeId, // Need to be able to correlate the workflow run somehow + }, + }); + + // Put the CE in the format needed to send as a Kafka message + const lockEventBinding = KafkaCE.binary(triggeringCloudEvent); + // Create the message event that will be sent to the kafka topic + const messageEvent = { + key: '', + value: JSON.stringify(KafkaCE.toEvent(lockEventBinding)), + }; + + // TODO: look at the community plugins kafka backend plugin to see how to do the ssl type stuff + // It looks like that plugin just passes the whole options from the app-config to the kafkajs constructor + const kafkaConnectionBinding = { + logLevel: logLevel.DEBUG, // TODO: need to figure out this with Typing + brokers: this.kafkaService.brokers, + clientId: this.kafkaService.clientId, + }; + + const kfk = new Kafka(kafkaConnectionBinding); + + try { + const producer = kfk.producer(); + // Connect the producer + await producer.connect(); + + // Send the message + await producer.send({ + topic: args.workflowEventType, + messages: [messageEvent], + }); + + // Disconnect the producer + await producer.disconnect(); + } catch (error) { + throw new Error( + `Error with Kafka client with connection options: ${kafkaConnectionBinding}`, + ); + } + + // Since sending to kafka doesn't return anything, send back the contextAttributeId here + // Then we will query the workflow instances to see if it showed up yet + return { + id: contextAttributeId, + }; + } + public async executeWorkflow(args: { definitionId: string; serviceUrl: string; diff --git a/workspaces/orchestrator/plugins/orchestrator-backend/src/service/api/v2.ts b/workspaces/orchestrator/plugins/orchestrator-backend/src/service/api/v2.ts index 9793cf76ee..fa59ea1536 100644 --- a/workspaces/orchestrator/plugins/orchestrator-backend/src/service/api/v2.ts +++ b/workspaces/orchestrator/plugins/orchestrator-backend/src/service/api/v2.ts @@ -1,3 +1,4 @@ +/* eslint-disable no-else-return */ /* * Copyright Red Hat, Inc. * @@ -14,6 +15,7 @@ * limitations under the License. */ +import { load } from 'js-yaml'; import { ParsedRequest } from 'openapi-backend'; import { @@ -26,6 +28,7 @@ import { ProcessInstanceState, RetriggerInstanceRequestDTO, WorkflowDTO, + WorkflowExecutionResponse, WorkflowInfo, WorkflowOverviewDTO, WorkflowOverviewListResultDTO, @@ -195,17 +198,125 @@ export class V2 { if (!definition.serviceUrl) { throw new Error(`ServiceURL is not defined for workflow ${workflowId}`); } - const executionResponse = await this.orchestratorService.executeWorkflow({ - definitionId: workflowId, - inputData: { - workflowdata: executeWorkflowRequestDTO.inputData, - initiatorEntity: initiatorEntity, - targetEntity: executeWorkflowRequestDTO.targetEntity, - }, - authTokens: executeWorkflowRequestDTO.authTokens as Array, - serviceUrl: definition.serviceUrl, - backstageToken, - }); + + let executionResponse: WorkflowExecutionResponse | undefined; + + // Figure out if we are sending as cloudevent are regular + // The frontend will send the isEvent=true parameter in the inputData if it running as an Event + const isEventType = + (executeWorkflowRequestDTO.inputData as any)?.isEvent || false; + + // definition.source will be the yaml + const parsedDefinitionSource: any = load(definition.source as string); + + if (isEventType) { + // This is where we call the cloud event execute + // All workflows should have this? + const start = parsedDefinitionSource.start; + + // Find the start state from the list of states + const startState = parsedDefinitionSource.states.filter( + (val: { name: any }) => { + return val.name === start; + }, + ); + + if (startState.length < 1) { + // Remove this: + console.log('No States that match the start state'); + throw new Error( + 'Error executing workflow with id ${workflowId}, No States that match the start state', + ); + } + + // Look at the onEvents to get what it responds to + // What happens if there are multiple events and event refs? + // The kafka topic name will be: ${eventName}` + const eventName = startState[0].onEvents[0].eventRefs[0]; + + const workflowEventToUse = parsedDefinitionSource.events.filter( + (val: { name: any }) => { + return val.name === eventName; + }, + ); + + if (workflowEventToUse.length < 1) { + throw new Error( + 'Error executing workflow with id ${workflowId}, No Events that match the start state eventRef', + ); + } + + // This will be the key value for event correlation: ${correlationContextAttributeName} + const correlationContextAttributeName = + workflowEventToUse[0].correlation[0].contextAttributeName; + + executionResponse = + await this.orchestratorService.executeWorkflowAsCloudEvent({ + definitionId: workflowId, + workflowSource: workflowEventToUse[0].source, + workflowEventType: eventName, + contextAttribute: correlationContextAttributeName, + inputData: { + workflowdata: executeWorkflowRequestDTO.inputData, + initiatorEntity: initiatorEntity, + targetEntity: executeWorkflowRequestDTO.targetEntity, + }, + authTokens: executeWorkflowRequestDTO.authTokens as Array, + backstageToken, + }); + + // We need to return the workflow instance ID + // This is what is returned when executing a "normal" workflow + // Wait a small amount so the workflow has a chance to get triggered + // There is a very good possibility that the workflow will not be ready yet when we query here + + let currentInstanceToReturn: string | any[] = []; + for (let i = 0; i < FETCH_INSTANCE_MAX_ATTEMPTS; i++) { + const response = await this.orchestratorService.fetchInstances({ + workflowIds: [workflowId], + }); + + // eslint-disable-next-line no-loop-func + currentInstanceToReturn = response.filter((val: any) => { + return ( + val.variables.workflowdata[correlationContextAttributeName] === + executionResponse?.id + ); + }); + + if (currentInstanceToReturn.length > 0) { + break; + } + } + + let currentInstanceIDToReturn; + if (currentInstanceToReturn.length < 1) { + // nothing returned yet, + // doesn't mean this is an error since it might take time for the trigger to happen + // return something else so the front-end knows to just show the list of workflow runs + currentInstanceIDToReturn = 'kafkaEvent'; + } else { + currentInstanceIDToReturn = currentInstanceToReturn[0]?.id; + } + + // Return just the id of the response, which will be the instanceID or some identifier + // to let the front end know the workflow isn't ready yet + return { + id: currentInstanceIDToReturn, + }; + } else { + executionResponse = await this.orchestratorService.executeWorkflow({ + definitionId: workflowId, + inputData: { + workflowdata: executeWorkflowRequestDTO.inputData, + initiatorEntity: initiatorEntity, + targetEntity: executeWorkflowRequestDTO.targetEntity, + }, + authTokens: executeWorkflowRequestDTO.authTokens as Array, + serviceUrl: definition.serviceUrl, + backstageToken, + }); + } if (!executionResponse) { throw new Error(`Couldn't execute workflow ${workflowId}`); diff --git a/workspaces/orchestrator/plugins/orchestrator-backend/src/service/router.ts b/workspaces/orchestrator/plugins/orchestrator-backend/src/service/router.ts index eac669444e..7a1935131f 100644 --- a/workspaces/orchestrator/plugins/orchestrator-backend/src/service/router.ts +++ b/workspaces/orchestrator/plugins/orchestrator-backend/src/service/router.ts @@ -60,7 +60,10 @@ import { V2 } from './api/v2'; import { DataIndexService } from './DataIndexService'; import { DataInputSchemaService } from './DataInputSchemaService'; import { OrchestratorService } from './OrchestratorService'; -import { SonataFlowService } from './SonataFlowService'; +import { + OrchestratorKafkaService, + SonataFlowService, +} from './SonataFlowService'; import { WorkflowCacheService } from './WorkflowCacheService'; interface PublicServices { @@ -250,8 +253,15 @@ function initPublicServices( scheduler: SchedulerService, ): PublicServices { const dataIndexUrl = config.getString('orchestrator.dataIndexService.url'); + // TODO: what happens when this isn't there? + const orchestratorKafka: OrchestratorKafkaService = + config.get('orchestrator.kafka'); const dataIndexService = new DataIndexService(dataIndexUrl, logger); - const sonataFlowService = new SonataFlowService(dataIndexService, logger); + const sonataFlowService = new SonataFlowService( + dataIndexService, + logger, + orchestratorKafka, + ); const workflowCacheService = new WorkflowCacheService( logger, diff --git a/workspaces/orchestrator/plugins/orchestrator/src/components/ExecuteWorkflowPage/ExecuteWorkflowPage.tsx b/workspaces/orchestrator/plugins/orchestrator/src/components/ExecuteWorkflowPage/ExecuteWorkflowPage.tsx index 483d8cf4c9..bf4a5c47f4 100644 --- a/workspaces/orchestrator/plugins/orchestrator/src/components/ExecuteWorkflowPage/ExecuteWorkflowPage.tsx +++ b/workspaces/orchestrator/plugins/orchestrator/src/components/ExecuteWorkflowPage/ExecuteWorkflowPage.tsx @@ -51,6 +51,7 @@ import { entityInstanceRouteRef, executeWorkflowRouteRef, workflowInstanceRouteRef, + workflowRunsRouteRef, } from '../../routes'; import { getErrorObject } from '../../utils/ErrorUtils'; import { BaseOrchestratorPage } from '../ui/BaseOrchestratorPage'; @@ -70,6 +71,7 @@ export const ExecuteWorkflowPage = () => { const navigate = useNavigate(); const instanceLink = useRouteRef(workflowInstanceRouteRef); const entityInstanceLink = useRouteRef(entityInstanceRouteRef); + const workflowRunsLink = useRouteRef(workflowRunsRouteRef); const { value, loading, @@ -108,7 +110,10 @@ export const ExecuteWorkflowPage = () => { const [kind, namespace, name] = targetEntity?.split(/[:\/]/) || []; + // The Kafka triggered workflows might not be available for a period of time, so it would be good to just go to that workflows runs page + // the return value would have to have some identifier that it is not an error, but no workflow instance run id yet const handleExecute = useCallback( + // eslint-disable-next-line consistent-return async (parameters: JsonObject) => { setUpdateError(undefined); try { @@ -120,6 +125,12 @@ export const ExecuteWorkflowPage = () => { authTokens, targetEntity: targetEntity ?? undefined, }); + + // Response Data id will be "kafkaEvent" if this was run as a CloudEvent and the workflow isnt ready yet + // If this happens, just navigate to the main workflow runs page. + if (response.data.id === 'kafkaEvent') { + return navigate(workflowRunsLink({ workflowId: workflowId })); + } const url = targetEntity ? entityInstanceLink({ namespace, @@ -148,6 +159,7 @@ export const ExecuteWorkflowPage = () => { kind, namespace, name, + workflowRunsLink, ], ); diff --git a/workspaces/orchestrator/plugins/orchestrator/src/components/ExecuteWorkflowPage/MissingSchemaNotice.tsx b/workspaces/orchestrator/plugins/orchestrator/src/components/ExecuteWorkflowPage/MissingSchemaNotice.tsx index ee8a971253..5b6499b864 100644 --- a/workspaces/orchestrator/plugins/orchestrator/src/components/ExecuteWorkflowPage/MissingSchemaNotice.tsx +++ b/workspaces/orchestrator/plugins/orchestrator/src/components/ExecuteWorkflowPage/MissingSchemaNotice.tsx @@ -51,6 +51,12 @@ const MissingSchemaNotice = ({ > Run + handleExecute({ isEvent: true })} + > + Run As Event + ); diff --git a/workspaces/orchestrator/yarn.lock b/workspaces/orchestrator/yarn.lock index 383775a99d..438395c914 100644 --- a/workspaces/orchestrator/yarn.lock +++ b/workspaces/orchestrator/yarn.lock @@ -12700,11 +12700,13 @@ __metadata: "@types/luxon": ^3.7.1 "@urql/core": ^4.1.4 ajv-formats: ^2.1.1 - cloudevents: ^8.0.0 + cloudevents: ^10.0.0 express: ^4.21.2 express-promise-router: ^4.1.1 fs-extra: ^10.1.0 isomorphic-git: ^1.23.0 + js-yaml: ^4.1.0 + kafkajs: ^2.2.4 lodash: ^4.17.21 luxon: ^3.7.2 openapi-backend: ^5.10.5 @@ -19397,9 +19399,9 @@ __metadata: languageName: node linkType: hard -"cloudevents@npm:^8.0.0": - version: 8.0.2 - resolution: "cloudevents@npm:8.0.2" +"cloudevents@npm:^10.0.0": + version: 10.0.0 + resolution: "cloudevents@npm:10.0.0" dependencies: ajv: ^8.11.0 ajv-formats: ^2.1.1 @@ -19407,7 +19409,7 @@ __metadata: process: ^0.11.10 util: ^0.12.4 uuid: ^8.3.2 - checksum: acec1dde5bf21c6ce88da4e94a0976c5967f0184b6a9470b1b9917be34ca85862c02655ed71ec32b5229f7d818676f9360fb88e10115650861ae31f473b3332f + checksum: 6afaaa7e3af9f951344b3567321eb5745f6b8ad8b864282fc6eb57fa67bfbc4f9b4ab10798301755dbb4eb410aa3862239252a644aafcc29a55246c756c2b55b languageName: node linkType: hard @@ -27500,6 +27502,13 @@ __metadata: languageName: node linkType: hard +"kafkajs@npm:^2.2.4": + version: 2.2.4 + resolution: "kafkajs@npm:2.2.4" + checksum: 83e9e8bc50a09b142f4ff79f6a2bd88ecc21b83bcefe6621ab1716118d624886befb7371731274f67812ce35dd50b53140ff3b49a06e5d9169fe6b164d72fea5 + languageName: node + linkType: hard + "keygrip@npm:~1.1.0": version: 1.1.0 resolution: "keygrip@npm:1.1.0"