Skip to content

Commit 92afc1b

Browse files
pmuellrkibanamachineelasticmachine
authored andcommitted
[ResponseOps] Allow tasks to indicate they should disable themselves after run (elastic#230998)
resolves elastic#228306 resolves elastic#227894 Adds the capability of a task to disable itself after running, by returning an indication in the task response. We then use this with alerting to allow a rule's task to disable itself if it ran but found the rule itself was disabled. Specifically: - A rule throwing an error with a new reason `RuleExecutionStatusErrorReasons.Disabled` will cause the alerting task runner to return to task manager with a new flag `shouldDisableTask: true`. - During the post processing of the task, if the task indicated it should disable itself via the new flag, it will log a warning that it's going to do that, and then update the task with `enabled: false`. Task store has been updated as well, to use `overwrite: true` when doing `bulkSchedule`, as this code currently assumes that - at this point - the task doc does not exist, so the bulk update fails with a conflict if one does. --------- Co-authored-by: kibanamachine <[email protected]> Co-authored-by: Elastic Machine <[email protected]>
1 parent 7ea3b3f commit 92afc1b

File tree

13 files changed

+206
-2
lines changed

13 files changed

+206
-2
lines changed

x-pack/platform/plugins/shared/alerting/server/task_runner/task_runner.test.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import sinon from 'sinon';
99
import { usageCountersServiceMock } from '@kbn/usage-collection-plugin/server/usage_counters/usage_counters_service.mock';
10+
import type { SavedObject } from '@kbn/core/server';
1011
import type {
1112
RuleExecutorOptions,
1213
RuleTypeParams,
@@ -21,6 +22,7 @@ import {
2122
RuleExecutionStatusWarningReasons,
2223
DEFAULT_FLAPPING_SETTINGS,
2324
DEFAULT_QUERY_DELAY_SETTINGS,
25+
type RawRule,
2426
} from '../types';
2527
import type { ConcreteTaskInstance } from '@kbn/task-manager-plugin/server';
2628
import {
@@ -3425,6 +3427,37 @@ describe('Task Runner', () => {
34253427
});
34263428
});
34273429

3430+
test('should return shouldDisableTask when task is enabled but rule is not', async () => {
3431+
const taskRunner = new TaskRunner({
3432+
ruleType,
3433+
internalSavedObjectsRepository,
3434+
taskInstance: { ...mockedTaskInstance },
3435+
context: taskRunnerFactoryInitializerParams,
3436+
inMemoryMetrics,
3437+
});
3438+
expect(AlertingEventLogger).toHaveBeenCalled();
3439+
const mockedRuleTypeSavedObjectDisabled = {
3440+
...(mockedRuleTypeSavedObject as Rule),
3441+
enabled: false,
3442+
};
3443+
const mockedRawRuleSODisabled: SavedObject<RawRule> = {
3444+
...mockedRawRuleSO,
3445+
};
3446+
mockedRawRuleSODisabled.attributes.enabled = false;
3447+
3448+
mockGetRuleFromRaw.mockReturnValue(mockedRuleTypeSavedObjectDisabled);
3449+
encryptedSavedObjectsClient.getDecryptedAsInternalUser.mockResolvedValue(mockedRawRuleSO);
3450+
3451+
const result = await taskRunner.run();
3452+
3453+
expect(ruleType.executor).not.toHaveBeenCalled();
3454+
3455+
expect(result.shouldDisableTask).toEqual(true);
3456+
expect(result.taskRunError?.message).toBe(
3457+
'Rule failed to execute because rule ran after it was disabled.'
3458+
);
3459+
});
3460+
34283461
function testAlertingEventLogCalls({
34293462
ruleContext = alertingEventLoggerInitializer,
34303463
ruleTypeDef = ruleType,

x-pack/platform/plugins/shared/alerting/server/task_runner/task_runner.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -687,6 +687,7 @@ export class TaskRunner<
687687

688688
let runRuleResult: Result<RunRuleResult, Error>;
689689
let schedule: Result<IntervalSchedule, Error>;
690+
let shouldDisableTask = false;
690691
try {
691692
const validatedRuleData = await this.prepareToRun();
692693

@@ -705,6 +706,7 @@ export class TaskRunner<
705706

706707
runRuleResult = asErr(err);
707708
schedule = asErr(err);
709+
shouldDisableTask = err.reason === RuleExecutionStatusErrorReasons.Disabled;
708710
}
709711

710712
await withAlertingSpan('alerting:process-run-results-and-update-rule', () =>
@@ -736,6 +738,8 @@ export class TaskRunner<
736738
ruleTypeId: this.ruleType.id,
737739
ruleId,
738740
}),
741+
// added this way so we don't add shouldDisableTask: false explicitly
742+
...(shouldDisableTask ? { shouldDisableTask } : {}),
739743
};
740744
}
741745

x-pack/platform/plugins/shared/alerting/server/task_runner/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ export interface RuleTaskRunResult {
6161
schedule: IntervalSchedule | undefined;
6262
taskRunError?: DecoratedError;
6363
shouldDeleteTask?: boolean;
64+
shouldDisableTask?: boolean;
6465
}
6566

6667
export const getDeleteRuleTaskRunResult = (): RuleTaskRunResult => ({

x-pack/platform/plugins/shared/task_manager/server/task.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ export type SuccessfulRunResult = {
7878
taskRunError?: DecoratedError;
7979
shouldValidate?: boolean;
8080
shouldDeleteTask?: boolean;
81+
shouldDisableTask?: boolean;
8182
} & (
8283
| // ensure a SuccessfulRunResult can either specify a new `runAt` or a new `schedule`, but not both
8384
{

x-pack/platform/plugins/shared/task_manager/server/task_running/task_runner.test.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2722,6 +2722,43 @@ describe('TaskManagerRunner', () => {
27222722
tags: ['task:end', 'foo', 'bar'],
27232723
});
27242724
});
2725+
2726+
test('Disables task if shouldDisableTask: true is returned', async () => {
2727+
const { runner, store, logger } = await readyToRunStageSetup({
2728+
instance: {
2729+
schedule: {
2730+
interval: `1m`,
2731+
},
2732+
},
2733+
definitions: {
2734+
bar: {
2735+
title: 'Bar!',
2736+
createTaskRunner: () => ({
2737+
async run() {
2738+
return {
2739+
state: {},
2740+
shouldDisableTask: true,
2741+
};
2742+
},
2743+
}),
2744+
},
2745+
},
2746+
});
2747+
await runner.run();
2748+
expect(store.partialUpdate).toHaveBeenCalledWith(
2749+
expect.objectContaining({
2750+
enabled: false,
2751+
id: 'foo',
2752+
status: 'idle',
2753+
}),
2754+
expect.anything()
2755+
);
2756+
expect(logger.warn).toBeCalledTimes(1);
2757+
expect(logger.warn).toBeCalledWith(
2758+
'Disabling task bar:foo as it indicated it should disable itself',
2759+
{ tags: ['bar'] }
2760+
);
2761+
});
27252762
});
27262763

27272764
describe('isAdHocTaskAndOutOfAttempts', () => {

x-pack/platform/plugins/shared/task_manager/server/task_running/task_runner.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -664,6 +664,7 @@ export class TaskManagerRunner implements TaskRunner {
664664
result: Result<SuccessfulRunResult, FailedRunResult>
665665
): Promise<TaskRunResult> {
666666
const hasTaskRunFailed = isOk(result);
667+
let shouldTaskBeDisabled = false;
667668
const fieldUpdates: Partial<ConcreteTaskInstance> & Pick<ConcreteTaskInstance, 'status'> = flow(
668669
// if running the task has failed ,try to correct by scheduling a retry in the near future
669670
mapErr(this.rescheduleFailedRun),
@@ -675,12 +676,18 @@ export class TaskManagerRunner implements TaskRunner {
675676
state,
676677
attempts = 0,
677678
shouldDeleteTask,
679+
shouldDisableTask,
678680
}: SuccessfulRunResult & { attempts: number }) => {
679681
if (shouldDeleteTask) {
680682
// set the status to failed so task will get deleted
681683
return asOk({ status: TaskStatus.ShouldDelete });
682684
}
683685

686+
if (shouldDisableTask) {
687+
shouldTaskBeDisabled = true;
688+
return asOk({ status: TaskStatus.Idle });
689+
}
690+
684691
const updatedTaskSchedule = reschedule ?? this.instance.task.schedule;
685692
return asOk({
686693
runAt:
@@ -744,13 +751,22 @@ export class TaskManagerRunner implements TaskRunner {
744751
}
745752
} else {
746753
shouldUpdateTask = true;
754+
755+
if (shouldTaskBeDisabled) {
756+
const label = `${this.taskType}:${this.instance.task.id}`;
757+
this.logger.warn(`Disabling task ${label} as it indicated it should disable itself`, {
758+
tags: [this.taskType],
759+
});
760+
}
761+
747762
partialTask = {
748763
...partialTask,
749764
...fieldUpdates,
750765
// reset fields that track the lifecycle of the concluded `task run`
751766
startedAt: null,
752767
retryAt: null,
753768
ownerId: null,
769+
...(shouldTaskBeDisabled ? { enabled: false } : {}),
754770
};
755771
}
756772

x-pack/platform/plugins/shared/task_manager/server/task_store.test.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2464,7 +2464,10 @@ describe('TaskStore', () => {
24642464
},
24652465
},
24662466
],
2467-
{ refresh: false }
2467+
{
2468+
overwrite: true,
2469+
refresh: false,
2470+
}
24682471
);
24692472

24702473
expect(result).toEqual([
@@ -2913,7 +2916,10 @@ describe('TaskStore', () => {
29132916
},
29142917
},
29152918
],
2916-
{ refresh: false }
2919+
{
2920+
overwrite: true,
2921+
refresh: false,
2922+
}
29172923
);
29182924
});
29192925
});

x-pack/platform/plugins/shared/task_manager/server/task_store.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -417,6 +417,7 @@ export class TaskStore {
417417
try {
418418
savedObjects = await soClient.bulkCreate<SerializedConcreteTaskInstance>(objects, {
419419
refresh: false,
420+
overwrite: true,
420421
});
421422
this.adHocTaskCounter.increment(
422423
taskInstances.filter((task) => {

x-pack/platform/test/alerting_api_integration/common/lib/task_manager_utils.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,4 +162,16 @@ export class TaskManagerUtils {
162162
}
163163
});
164164
}
165+
166+
async setTaskEnabled(id: string, enabled: boolean) {
167+
await this.es.update({
168+
id: `task:${id}`,
169+
index: '.kibana_task_manager',
170+
doc: {
171+
task: {
172+
enabled,
173+
},
174+
},
175+
});
176+
}
165177
}

x-pack/platform/test/alerting_api_integration/spaces_only/tests/alerting/group1/disable.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import expect from '@kbn/expect';
99
import { RULE_SAVED_OBJECT_TYPE } from '@kbn/alerting-plugin/server';
10+
import type { RawRule } from '@kbn/alerting-plugin/server/types';
1011
import { ES_TEST_INDEX_NAME } from '@kbn/alerting-api-integration-helpers';
1112
import { ALERT_STATUS } from '@kbn/rule-data-utils';
1213
import { Spaces } from '../../../scenarios';
@@ -19,6 +20,7 @@ import {
1920
getTestRuleData,
2021
ObjectRemover,
2122
getEventLog,
23+
TaskManagerUtils,
2224
} from '../../../../common/lib';
2325
import { validateEvent } from './event_log';
2426

@@ -29,6 +31,7 @@ export default function createDisableRuleTests({ getService }: FtrProviderContex
2931
const supertestWithoutAuth = getService('supertestWithoutAuth');
3032
const retry = getService('retry');
3133
const supertest = getService('supertest');
34+
const taskManagerUtils = new TaskManagerUtils(es, retry);
3235

3336
describe('disable', function () {
3437
this.tags('skipFIPS');
@@ -55,6 +58,14 @@ export default function createDisableRuleTests({ getService }: FtrProviderContex
5558
return scheduledTask._source!;
5659
}
5760

61+
async function getRule(id: string): Promise<RawRule> {
62+
const ruleDoc = await es.get<{ alert: RawRule }>({
63+
id: `alert:${id}`,
64+
index: '.kibana_alerting_cases',
65+
});
66+
return ruleDoc._source!.alert;
67+
}
68+
5869
it('should handle disable rule request appropriately', async () => {
5970
const { body: createdRule } = await supertestWithoutAuth
6071
.post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`)
@@ -265,5 +276,46 @@ export default function createDisableRuleTests({ getService }: FtrProviderContex
265276
id: createdRule.id,
266277
});
267278
});
279+
280+
it('should disable task if task is run but rule is disabled', async () => {
281+
const { body: createdRule } = await supertestWithoutAuth
282+
.post(`${getUrlPrefix(Spaces.space1.id)}/api/alerting/rule`)
283+
.set('kbn-xsrf', 'foo')
284+
.send(
285+
getTestRuleData({
286+
enabled: true,
287+
schedule: {
288+
interval: '3s',
289+
},
290+
})
291+
)
292+
.expect(200);
293+
objectRemover.add(Spaces.space1.id, createdRule.id, 'rule', 'alerting');
294+
295+
// wait for rule to run once
296+
await retry.try(async () => {
297+
const rule = await getRule(createdRule.id);
298+
expect(rule?.monitoring?.run?.last_run?.timestamp).not.to.be(undefined);
299+
});
300+
301+
// disable rule, and implicitly task
302+
await ruleUtils.disable(createdRule.id);
303+
304+
// wait for the task to be disabled
305+
await waitForDisabledTask(createdRule.scheduled_task_id);
306+
307+
// manually enable task
308+
await taskManagerUtils.setTaskEnabled(createdRule.scheduled_task_id, true);
309+
310+
// wait for the task to be disabled
311+
await waitForDisabledTask(createdRule.scheduled_task_id);
312+
});
313+
314+
async function waitForDisabledTask(taskId: string) {
315+
await retry.try(async () => {
316+
const taskDoc = await getScheduledTask(taskId);
317+
expect(taskDoc.task.enabled).to.be(false);
318+
});
319+
}
268320
});
269321
}

0 commit comments

Comments
 (0)