Skip to content

Commit 71ab7a0

Browse files
skynetigorKodeRad
authored andcommitted
feat: Implement workflow cancellation (elastic#233884)
## Summary Closes: elastic/security-team#13502 https://github.com/user-attachments/assets/b8d4b5b0-71df-4395-9ae2-6fd9d52dc73c ### Checklist Check the PR satisfies following conditions. Reviewers should verify this PR satisfies this list as well. - [ ] Any text added follows [EUI's writing guidelines](https://elastic.github.io/eui/#/guidelines/writing), uses sentence case text and includes [i18n support](https://github.com/elastic/kibana/blob/main/src/platform/packages/shared/kbn-i18n/README.md) - [ ] [Documentation](https://www.elastic.co/guide/en/kibana/master/development-documentation.html) was added for features that require explanation or tutorials - [ ] [Unit or functional tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html) were updated or added to match the most common scenarios - [ ] If a plugin configuration key changed, check if it needs to be allowlisted in the cloud and added to the [docker list](https://github.com/elastic/kibana/blob/main/src/dev/build/tasks/os_packages/docker_generator/resources/base/bin/kibana-docker) - [ ] This was checked for breaking HTTP API changes, and any breaking changes have been approved by the breaking-change committee. The `release_note:breaking` label should be applied in these situations. - [ ] [Flaky Test Runner](https://ci-stats.kibana.dev/trigger_flaky_test_runner/1) was used on any tests changed - [ ] The PR description includes the appropriate Release Notes section, and the correct `release_note:*` label is applied per the [guidelines](https://www.elastic.co/guide/en/kibana/master/contributing.html#kibana-release-notes-process) - [ ] Review the [backport guidelines](https://docs.google.com/document/d/1VyN5k91e5OVumlc0Gb9RPa3h1ewuPE705nRtioPiTvY/edit?usp=sharing) and apply applicable `backport:*` labels. ### Identify risks Does this PR introduce any risks? For example, consider risks like hard to test bugs, performance regression, potential of data loss. Describe the risk, its severity, and mitigation for each identified risk. Invite stakeholders and evaluate how to proceed before merging. - [ ] [See some risk examples](https://github.com/elastic/kibana/blob/main/RISK_MATRIX.mdx) - [ ] ...
1 parent 48ec46a commit 71ab7a0

File tree

13 files changed

+473
-117
lines changed

13 files changed

+473
-117
lines changed
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
export class WorkflowExecutionNotFoundError extends Error {
11+
constructor(executionId: string) {
12+
super(`Workflow execution with id "${executionId}" not found.`);
13+
this.name = 'WorkflowExecutionNotFoundError';
14+
}
15+
}

src/platform/packages/shared/kbn-workflows/types/v1.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ export interface EsWorkflowExecution {
4141
createdBy: string;
4242
startedAt: string;
4343
finishedAt: string;
44+
cancelRequested: boolean;
45+
cancelledAt?: string;
46+
cancelledBy?: string;
4447
duration: number;
4548
triggeredBy?: string; // 'manual' or 'scheduled'
4649
traceId?: string; // APM trace ID for observability

src/platform/plugins/shared/workflows_execution_engine/server/plugin.ts

Lines changed: 77 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import type {
1717
import type { EsWorkflowExecution, WorkflowExecutionEngineModel } from '@kbn/workflows';
1818
import { ExecutionStatus } from '@kbn/workflows';
1919
import { convertToWorkflowGraph } from '@kbn/workflows/graph';
20+
import { WorkflowExecutionNotFoundError } from '@kbn/workflows/common/errors';
2021

2122
import type { Client } from '@elastic/elasticsearch';
2223
import type { PluginStartContract as ActionsPluginStartContract } from '@kbn/actions-plugin/server';
@@ -84,19 +85,25 @@ export class WorkflowsExecutionEnginePlugin
8485
const esClient = coreStart.elasticsearch.client.asInternalUser as Client;
8586
const workflowExecutionRepository = new WorkflowExecutionRepository(esClient);
8687

87-
const { workflowRuntime, workflowLogger, nodesFactory } = await createContainer(
88-
workflowRunId,
89-
spaceId,
90-
actions,
91-
taskManager,
92-
esClient,
93-
logger,
94-
config,
95-
workflowExecutionRepository
96-
);
97-
await workflowRuntime.start();
88+
const { workflowRuntime, workflowExecutionState, workflowLogger, nodesFactory } =
89+
await createContainer(
90+
workflowRunId,
91+
spaceId,
92+
actions,
93+
taskManager,
94+
esClient,
95+
logger,
96+
config,
97+
workflowExecutionRepository
98+
);
9899

99-
await workflowExecutionLoop(workflowRuntime, workflowLogger, nodesFactory);
100+
await workflowRuntime.start();
101+
await workflowExecutionLoop(
102+
workflowRuntime,
103+
workflowExecutionState,
104+
workflowLogger,
105+
nodesFactory
106+
);
100107
},
101108
async cancel() {
102109
// Cancel function for the task
@@ -124,19 +131,25 @@ export class WorkflowsExecutionEnginePlugin
124131
const esClient = coreStart.elasticsearch.client.asInternalUser as Client;
125132
const workflowExecutionRepository = new WorkflowExecutionRepository(esClient);
126133

127-
const { workflowRuntime, workflowLogger, nodesFactory } = await createContainer(
128-
workflowRunId,
129-
spaceId,
130-
actions,
131-
taskManager,
132-
esClient,
133-
logger,
134-
config,
135-
workflowExecutionRepository
136-
);
134+
const { workflowRuntime, workflowExecutionState, workflowLogger, nodesFactory } =
135+
await createContainer(
136+
workflowRunId,
137+
spaceId,
138+
actions,
139+
taskManager,
140+
esClient,
141+
logger,
142+
config,
143+
workflowExecutionRepository
144+
);
137145
await workflowRuntime.resume();
138146

139-
await workflowExecutionLoop(workflowRuntime, workflowLogger, nodesFactory);
147+
await workflowExecutionLoop(
148+
workflowRuntime,
149+
workflowExecutionState,
150+
workflowLogger,
151+
nodesFactory
152+
);
140153
},
141154
async cancel() {},
142155
};
@@ -199,8 +212,48 @@ export class WorkflowsExecutionEnginePlugin
199212
};
200213
};
201214

215+
const cancelWorkflowExecution = async (workflowExecutionId: string, spaceId: string) => {
216+
const esClient = core.elasticsearch.client.asInternalUser as Client;
217+
const workflowExecutionRepository = new WorkflowExecutionRepository(esClient);
218+
const workflowExecution = await workflowExecutionRepository.getWorkflowExecutionById(
219+
workflowExecutionId,
220+
spaceId
221+
);
222+
223+
if (!workflowExecution) {
224+
throw new WorkflowExecutionNotFoundError(workflowExecutionId);
225+
}
226+
227+
if (
228+
[ExecutionStatus.CANCELLED, ExecutionStatus.COMPLETED, ExecutionStatus.FAILED].includes(
229+
workflowExecution.status
230+
)
231+
) {
232+
// Already in a terminal state or being canceled
233+
return;
234+
}
235+
236+
// Request cancellation
237+
await workflowExecutionRepository.updateWorkflowExecution({
238+
id: workflowExecution.id,
239+
cancelRequested: true,
240+
cancelledAt: new Date().toISOString(),
241+
cancelledBy: 'system', // TODO: set user if available
242+
});
243+
244+
if (
245+
[ExecutionStatus.WAITING, ExecutionStatus.WAITING_FOR_INPUT].includes(
246+
workflowExecution.status
247+
)
248+
) {
249+
// TODO: handle WAITING states
250+
// It should clean up resume tasks, etc
251+
}
252+
};
253+
202254
return {
203255
executeWorkflow,
256+
cancelWorkflowExecution,
204257
};
205258
}
206259

@@ -281,6 +334,7 @@ async function createContainer(
281334

282335
return {
283336
workflowRuntime,
337+
workflowExecutionState,
284338
contextManager,
285339
connectorExecutor,
286340
workflowLogger,

src/platform/plugins/shared/workflows_execution_engine/server/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ export interface WorkflowsExecutionEnginePluginStart {
2525
workflow: WorkflowExecutionEngineModel,
2626
context: Record<string, any>
2727
): Promise<ExecuteWorkflowResponse>;
28+
cancelWorkflowExecution(workflowExecutionId: string, spaceId: string): Promise<void>;
2829
}
2930

3031
export interface WorkflowsExecutionEnginePluginSetupDeps {

src/platform/plugins/shared/workflows_execution_engine/server/workflow_context_manager/tests/workflow_execution_state.test.ts

Lines changed: 53 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,12 @@ describe('WorkflowExecutionState', () => {
154154
});
155155

156156
describe('flush', () => {
157+
beforeEach(() => {
158+
workflowExecutionRepository.getWorkflowExecutionById = jest
159+
.fn()
160+
.mockResolvedValue({} as EsWorkflowExecution);
161+
});
162+
157163
it('should flush workflow execution changes', async () => {
158164
const updatedWorkflowExecution = {
159165
id: 'test-workflow-execution-id',
@@ -172,6 +178,20 @@ describe('WorkflowExecutionState', () => {
172178
);
173179
});
174180

181+
it('should flush workflow execution changes with execution id even if execution id is not in change', async () => {
182+
const updatedWorkflowExecution = {} as EsWorkflowExecution;
183+
184+
underTest.updateWorkflowExecution(updatedWorkflowExecution);
185+
186+
await underTest.flush();
187+
188+
expect(workflowExecutionRepository.updateWorkflowExecution).toHaveBeenCalledWith(
189+
expect.objectContaining({
190+
id: 'test-workflow-execution-id',
191+
})
192+
);
193+
});
194+
175195
it('should flush new step executions', async () => {
176196
const mockUuid = 'fake-uuid';
177197
jest.requireMock('uuid').v4.mockImplementation(() => mockUuid);
@@ -188,12 +208,10 @@ describe('WorkflowExecutionState', () => {
188208
expect(stepExecutionRepository.createStepExecution).toHaveBeenCalledWith(
189209
expect.objectContaining({
190210
id: mockUuid,
191-
workflowRunId: 'test-workflow-execution-id',
192-
workflowId: 'test-workflow-id',
193211
stepId: 'test-step-execution-id',
194212
status: ExecutionStatus.RUNNING,
195213
startedAt: '2025-08-05T20:00:00.000Z',
196-
})
214+
} as EsWorkflowStepExecution)
197215
);
198216
});
199217

@@ -221,19 +239,17 @@ describe('WorkflowExecutionState', () => {
221239
await underTest.flush();
222240

223241
expect(stepExecutionRepository.updateStepExecutions).toHaveBeenCalledWith([
224-
expect.objectContaining({
242+
{
225243
id: fakeUuid,
226-
workflowRunId: 'test-workflow-execution-id',
227-
workflowId: 'test-workflow-id',
228244
stepId: 'test-step-execution-id',
229245
status: ExecutionStatus.COMPLETED,
230246
completedAt: '2025-08-05T20:01:00.000Z',
231247
executionTimeMs: 60000,
232-
}),
248+
} as EsWorkflowStepExecution,
233249
]);
234250
});
235251

236-
it('should be able to create step executions that changed multiple times', async () => {
252+
it('should be able to create step executions that changed multiple times by merging changes', async () => {
237253
// create initial step execution
238254
const fakeUuid = 'fake-uuid-1';
239255
jest.requireMock('uuid').v4.mockImplementationOnce(() => fakeUuid);
@@ -249,21 +265,20 @@ describe('WorkflowExecutionState', () => {
249265
stepId: 'test-step-execution-id',
250266
status: ExecutionStatus.COMPLETED,
251267
completedAt: '2025-08-05T20:01:00.000Z',
252-
executionTimeMs: 60000,
268+
executionTimeMs: 2000,
253269
} as EsWorkflowStepExecution);
254270

255271
await underTest.flush();
256272

257273
expect(stepExecutionRepository.createStepExecution).toHaveBeenCalledWith(
258274
expect.objectContaining({
259275
id: fakeUuid,
260-
workflowRunId: 'test-workflow-execution-id',
261-
workflowId: 'test-workflow-id',
262276
stepId: 'test-step-execution-id',
263277
status: ExecutionStatus.COMPLETED,
264278
completedAt: '2025-08-05T20:01:00.000Z',
265-
executionTimeMs: 60000,
266-
})
279+
startedAt: '2025-08-05T20:00:00.000Z',
280+
executionTimeMs: 2000,
281+
} as EsWorkflowStepExecution)
267282
);
268283
});
269284

@@ -312,6 +327,31 @@ describe('WorkflowExecutionState', () => {
312327
expect(stepExecutionRepository.createStepExecution).toHaveBeenCalledTimes(2); // create the first step execution and the second one
313328
expect(stepExecutionRepository.updateStepExecutions).toHaveBeenCalledTimes(1);
314329
});
330+
331+
it('should sync workflow execution with latest from repository', async () => {
332+
workflowExecutionRepository.getWorkflowExecutionById = jest.fn().mockResolvedValue({
333+
id: 'test-workflow-execution-id',
334+
status: ExecutionStatus.CANCELLED,
335+
cancelledAt: '2025-08-05T20:02:00.000Z',
336+
cancelledBy: 'user-123',
337+
} as EsWorkflowExecution);
338+
underTest.updateWorkflowExecution({
339+
status: ExecutionStatus.SKIPPED,
340+
});
341+
342+
await underTest.flush();
343+
344+
expect(workflowExecutionRepository.getWorkflowExecutionById).toHaveBeenCalledWith(
345+
'test-workflow-execution-id',
346+
undefined
347+
);
348+
expect(underTest.getWorkflowExecution()).toEqual({
349+
id: 'test-workflow-execution-id',
350+
status: ExecutionStatus.CANCELLED,
351+
cancelledAt: '2025-08-05T20:02:00.000Z',
352+
cancelledBy: 'user-123',
353+
} as EsWorkflowExecution);
354+
});
315355
});
316356

317357
describe('getStepExecutionsByStepId', () => {

0 commit comments

Comments
 (0)