Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions workspaces/orchestrator/entities/users.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
apiVersion: backstage.io/v1alpha1
kind: User
metadata:
name: mareklibra
name: lholmquist
spec:
profile:
displayName: Marek Libra
email: [email protected]
displayName: Lucas Holmquist
email: [email protected]
memberOf: []
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,13 @@
"@red-hat-developer-hub/backstage-plugin-orchestrator-common": "workspace:^",
"@urql/core": "^4.1.4",
"ajv-formats": "^2.1.1",
"cloudevents": "^8.0.0",
"cloudevents": "^10.0.0",
"express": "^4.21.2",
"express-promise-router": "^4.1.1",
"fs-extra": "^10.1.0",
"isomorphic-git": "^1.23.0",
"js-yaml": "^4.1.0",
"kafkajs": "^2.2.4",
"lodash": "^4.17.21",
"luxon": "^3.7.2",
"openapi-backend": "^5.10.5",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ export class DataIndexService {
});

this.logger.debug(`GraphQL query: ${graphQlQuery}`);
console.log(`GraphQL query: ${graphQlQuery}`);

const result = await this.client.query<{
ProcessInstances: ProcessInstance[];
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2024 The Backstage Authors
* Copyright Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import type { LoggerService } from '@backstage/backend-plugin-api';
import type { Config } from '@backstage/config';

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,19 @@ export class OrchestratorService {
});
}

// TODO: Execute as CE
public async executeWorkflowAsCloudEvent(args: {
definitionId: string;
workflowSource: string;
workflowEventType: string;
contextAttribute: string;
inputData?: ProcessInstanceVariables;
authTokens?: Array<AuthToken>;
backstageToken?: string | undefined;
}) {
return await this.sonataFlowService.executeWorkflowAsCloudEvent(args);
}

public async executeWorkflow(args: {
definitionId: string;
serviceUrl: string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import { LoggerService } from '@backstage/backend-plugin-api';

import { CloudEvent, Kafka as KafkaCE } from 'cloudevents';
import { Kafka, logLevel } from 'kafkajs';
import capitalize from 'lodash/capitalize';

import {
Expand All @@ -31,13 +33,22 @@ import {
WorkflowOverview,
} from '@red-hat-developer-hub/backstage-plugin-orchestrator-common';

import { randomUUID } from 'node:crypto';

import { Pagination } from '../types/pagination';
import { DataIndexService } from './DataIndexService';

export type OrchestratorKafkaService = {
clientId: string;
brokers: string[];
logLevel?: string;
};

export class SonataFlowService {
constructor(
private readonly dataIndexService: DataIndexService,
private readonly logger: LoggerService,
private readonly kafkaService: OrchestratorKafkaService,
) {}

public async fetchWorkflowInfoOnService(args: {
Expand Down Expand Up @@ -103,6 +114,72 @@ export class SonataFlowService {
return items.filter((item): item is WorkflowOverview => !!item);
}

public async executeWorkflowAsCloudEvent(args: {
definitionId: string;
workflowSource: string;
workflowEventType: string;
contextAttribute: string;
inputData?: ProcessInstanceVariables;
authTokens?: Array<AuthToken>;
backstageToken?: string | undefined;
}): Promise<WorkflowExecutionResponse | undefined> {
const contextAttributeId = randomUUID();
// TODO: something about adding a header as an extension
const triggeringCloudEvent = new CloudEvent({
datacontenttype: 'application/json',
type: args.workflowEventType,
source: args.workflowSource,
[args.contextAttribute]: contextAttributeId,
data: {
...args.inputData,
[args.contextAttribute]: contextAttributeId, // Need to be able to correlate the workflow run somehow
},
});

// Put the CE in the format needed to send as a Kafka message
const lockEventBinding = KafkaCE.binary(triggeringCloudEvent);
// Create the message event that will be sent to the kafka topic
const messageEvent = {
key: '',
value: JSON.stringify(KafkaCE.toEvent(lockEventBinding)),
};

// TODO: look at the community plugins kafka backend plugin to see how to do the ssl type stuff
// It looks like that plugin just passes the whole options from the app-config to the kafkajs constructor
const kafkaConnectionBinding = {
logLevel: logLevel.DEBUG, // TODO: need to figure out this with Typing
brokers: this.kafkaService.brokers,
clientId: this.kafkaService.clientId,
};

const kfk = new Kafka(kafkaConnectionBinding);

try {
const producer = kfk.producer();
// Connect the producer
await producer.connect();

// Send the message
await producer.send({
topic: args.workflowEventType,
messages: [messageEvent],
});

// Disconnect the producer
await producer.disconnect();
} catch (error) {
throw new Error(
`Error with Kafka client with connection options: ${kafkaConnectionBinding}`,
);
}

// Since sending to kafka doesn't return anything, send back the contextAttributeId here
// Then we will query the workflow instances to see if it showed up yet
return {
id: contextAttributeId,
};
}

public async executeWorkflow(args: {
definitionId: string;
serviceUrl: string;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable no-else-return */
/*
* Copyright Red Hat, Inc.
*
Expand All @@ -14,6 +15,7 @@
* limitations under the License.
*/

import { load } from 'js-yaml';
import { ParsedRequest } from 'openapi-backend';

import {
Expand All @@ -26,6 +28,7 @@ import {
ProcessInstanceState,
RetriggerInstanceRequestDTO,
WorkflowDTO,
WorkflowExecutionResponse,
WorkflowInfo,
WorkflowOverviewDTO,
WorkflowOverviewListResultDTO,
Expand Down Expand Up @@ -195,17 +198,125 @@ export class V2 {
if (!definition.serviceUrl) {
throw new Error(`ServiceURL is not defined for workflow ${workflowId}`);
}
const executionResponse = await this.orchestratorService.executeWorkflow({
definitionId: workflowId,
inputData: {
workflowdata: executeWorkflowRequestDTO.inputData,
initiatorEntity: initiatorEntity,
targetEntity: executeWorkflowRequestDTO.targetEntity,
},
authTokens: executeWorkflowRequestDTO.authTokens as Array<AuthToken>,
serviceUrl: definition.serviceUrl,
backstageToken,
});

let executionResponse: WorkflowExecutionResponse | undefined;

// Figure out if we are sending as cloudevent are regular
// The frontend will send the isEvent=true parameter in the inputData if it running as an Event
const isEventType =
(executeWorkflowRequestDTO.inputData as any)?.isEvent || false;

// definition.source will be the yaml
const parsedDefinitionSource: any = load(definition.source as string);

if (isEventType) {
// This is where we call the cloud event execute
// All workflows should have this?
const start = parsedDefinitionSource.start;

// Find the start state from the list of states
const startState = parsedDefinitionSource.states.filter(
(val: { name: any }) => {
return val.name === start;
},
);

if (startState.length < 1) {
// Remove this:
console.log('No States that match the start state');
throw new Error(
'Error executing workflow with id ${workflowId}, No States that match the start state',
);
}

// Look at the onEvents to get what it responds to
// What happens if there are multiple events and event refs?
// The kafka topic name will be: ${eventName}`
const eventName = startState[0].onEvents[0].eventRefs[0];

const workflowEventToUse = parsedDefinitionSource.events.filter(
(val: { name: any }) => {
return val.name === eventName;
},
);

if (workflowEventToUse.length < 1) {
throw new Error(
'Error executing workflow with id ${workflowId}, No Events that match the start state eventRef',
);
}

// This will be the key value for event correlation: ${correlationContextAttributeName}
const correlationContextAttributeName =
workflowEventToUse[0].correlation[0].contextAttributeName;

executionResponse =
await this.orchestratorService.executeWorkflowAsCloudEvent({
definitionId: workflowId,
workflowSource: workflowEventToUse[0].source,
workflowEventType: eventName,
contextAttribute: correlationContextAttributeName,
inputData: {
workflowdata: executeWorkflowRequestDTO.inputData,
initiatorEntity: initiatorEntity,
targetEntity: executeWorkflowRequestDTO.targetEntity,
},
authTokens: executeWorkflowRequestDTO.authTokens as Array<AuthToken>,
backstageToken,
});

// We need to return the workflow instance ID
// This is what is returned when executing a "normal" workflow
// Wait a small amount so the workflow has a chance to get triggered
// There is a very good possibility that the workflow will not be ready yet when we query here

let currentInstanceToReturn: string | any[] = [];
for (let i = 0; i < FETCH_INSTANCE_MAX_ATTEMPTS; i++) {
const response = await this.orchestratorService.fetchInstances({
workflowIds: [workflowId],
});

// eslint-disable-next-line no-loop-func
currentInstanceToReturn = response.filter((val: any) => {
return (
val.variables.workflowdata[correlationContextAttributeName] ===
executionResponse?.id
);
});

if (currentInstanceToReturn.length > 0) {
break;
}
}

let currentInstanceIDToReturn;
if (currentInstanceToReturn.length < 1) {
// nothing returned yet,
// doesn't mean this is an error since it might take time for the trigger to happen
// return something else so the front-end knows to just show the list of workflow runs
currentInstanceIDToReturn = 'kafkaEvent';
} else {
currentInstanceIDToReturn = currentInstanceToReturn[0]?.id;
}

// Return just the id of the response, which will be the instanceID or some identifier
// to let the front end know the workflow isn't ready yet
return {
id: currentInstanceIDToReturn,
};
} else {
executionResponse = await this.orchestratorService.executeWorkflow({
definitionId: workflowId,
inputData: {
workflowdata: executeWorkflowRequestDTO.inputData,
initiatorEntity: initiatorEntity,
targetEntity: executeWorkflowRequestDTO.targetEntity,
},
authTokens: executeWorkflowRequestDTO.authTokens as Array<AuthToken>,
serviceUrl: definition.serviceUrl,
backstageToken,
});
}

if (!executionResponse) {
throw new Error(`Couldn't execute workflow ${workflowId}`);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ import { V2 } from './api/v2';
import { DataIndexService } from './DataIndexService';
import { DataInputSchemaService } from './DataInputSchemaService';
import { OrchestratorService } from './OrchestratorService';
import { SonataFlowService } from './SonataFlowService';
import {
OrchestratorKafkaService,
SonataFlowService,
} from './SonataFlowService';
import { WorkflowCacheService } from './WorkflowCacheService';

interface PublicServices {
Expand Down Expand Up @@ -250,8 +253,15 @@ function initPublicServices(
scheduler: SchedulerService,
): PublicServices {
const dataIndexUrl = config.getString('orchestrator.dataIndexService.url');
// TODO: what happens when this isn't there?
const orchestratorKafka: OrchestratorKafkaService =
config.get('orchestrator.kafka');
const dataIndexService = new DataIndexService(dataIndexUrl, logger);
const sonataFlowService = new SonataFlowService(dataIndexService, logger);
const sonataFlowService = new SonataFlowService(
dataIndexService,
logger,
orchestratorKafka,
);

const workflowCacheService = new WorkflowCacheService(
logger,
Expand Down
Loading
Loading