Skip to content

Commit 53d2b03

Browse files
ymao1pmuellr
andauthored
[Response Ops][Task Manager] Allow schedule updates when ensureScheduled API is used (#238026)
Resolves #222089 ## Summary Updates `ensureScheduled` API to update task schedule if task already exists using the existing `bulkUpdateSchedules` method. Currently `bulkUpdateSchedules` only handle schedule intervals (not rrule schedules) so this only works for intervals, which I believe is ok because rrule is not widely used yet. --------- Co-authored-by: Patrick Mueller <[email protected]>
1 parent 609f415 commit 53d2b03

File tree

5 files changed

+164
-5
lines changed

5 files changed

+164
-5
lines changed

x-pack/platform/plugins/shared/task_manager/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,8 @@ The danger is that in such a situation, a Task with that same `id` might already
462462

463463
To achieve this you should use the `ensureScheduled` api which has the exact same behavior as `schedule`, except it allows the scheduling of a Task with an `id` that's already in assigned to another Task and it will assume that the existing Task is the one you wished to `schedule`, treating this as a successful operation.
464464

465+
The only exception to this is if you use `ensureScheduled` to schedule a task with a recurring schedule interval. In this case, if a task with the same `id` already exists, the API will attempt to update the schedule of the existing task if it has changed.
466+
465467
#### runSoon
466468

467469
Use `runSoon` to instruct TaskManager to run an existing task as soon as possible by updating the next scheduled run date to be `now`.

x-pack/platform/plugins/shared/task_manager/server/task_scheduling.test.ts

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import moment from 'moment';
1010

1111
import { TaskScheduling } from './task_scheduling';
1212
import { asOk } from './lib/result_type';
13+
import type { ConcreteTaskInstance } from './task';
1314
import { TaskStatus } from './task';
1415
import { createInitialMiddleware } from './lib/middleware';
1516
import { taskStoreMock } from './task_store.mock';
@@ -35,6 +36,22 @@ jest.mock('elastic-apm-node', () => ({
3536
},
3637
}));
3738

39+
const getTask = (overrides = {}): ConcreteTaskInstance => ({
40+
id: 'my-foo-id',
41+
taskType: 'foo',
42+
params: {},
43+
state: {},
44+
schedule: { interval: '1m' },
45+
scheduledAt: new Date(),
46+
attempts: 0,
47+
startedAt: new Date(),
48+
retryAt: new Date(Date.now() + 5 * 60 * 1000),
49+
ownerId: '123',
50+
status: TaskStatus.Idle,
51+
runAt: new Date(),
52+
...overrides,
53+
});
54+
3855
describe('TaskScheduling', () => {
3956
beforeAll(() => {
4057
fakeTimer = sinon.useFakeTimers();
@@ -149,6 +166,101 @@ describe('TaskScheduling', () => {
149166
expect(result.id).toEqual('my-foo-id');
150167
});
151168

169+
test('tries to updates schedule for tasks that have already been scheduled', async () => {
170+
const task = getTask();
171+
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
172+
const bulkUpdateScheduleSpy = jest
173+
.spyOn(taskScheduling, 'bulkUpdateSchedules')
174+
.mockResolvedValue({ tasks: [task], errors: [] });
175+
mockTaskStore.schedule.mockRejectedValueOnce({
176+
statusCode: 409,
177+
});
178+
179+
const result = await taskScheduling.ensureScheduled(task);
180+
181+
expect(bulkUpdateScheduleSpy).toHaveBeenCalledWith(['my-foo-id'], { interval: '1m' });
182+
183+
expect(result.id).toEqual('my-foo-id');
184+
});
185+
186+
test('does not try to update schedule for tasks that have already been scheduled if no schedule is provided', async () => {
187+
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
188+
const bulkUpdateScheduleSpy = jest.spyOn(taskScheduling, 'bulkUpdateSchedules');
189+
mockTaskStore.schedule.mockRejectedValueOnce({
190+
statusCode: 409,
191+
});
192+
193+
const result = await taskScheduling.ensureScheduled({
194+
id: 'my-foo-id',
195+
taskType: 'foo',
196+
params: {},
197+
state: {},
198+
});
199+
200+
expect(bulkUpdateScheduleSpy).not.toHaveBeenCalled();
201+
202+
expect(result.id).toEqual('my-foo-id');
203+
});
204+
205+
test('propagates error when trying to update schedule for tasks that have already been scheduled', async () => {
206+
const task = getTask();
207+
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
208+
const bulkUpdateScheduleSpy = jest
209+
.spyOn(taskScheduling, 'bulkUpdateSchedules')
210+
.mockResolvedValue({
211+
tasks: [],
212+
errors: [
213+
{
214+
id: 'my-foo-id',
215+
type: 'task',
216+
error: {
217+
error: 'error',
218+
message: 'Failed to update schedule for reasons',
219+
statusCode: 500,
220+
},
221+
},
222+
],
223+
});
224+
mockTaskStore.schedule.mockRejectedValueOnce({
225+
statusCode: 409,
226+
});
227+
228+
await expect(taskScheduling.ensureScheduled(task)).rejects.toMatchInlineSnapshot(
229+
`[Error: Tried to update schedule for existing task "my-foo-id" but failed with error: Failed to update schedule for reasons]`
230+
);
231+
232+
expect(bulkUpdateScheduleSpy).toHaveBeenCalledWith(['my-foo-id'], { interval: '1m' });
233+
});
234+
235+
test('handles VERSION_CONFLICT_STATUS errors when trying to update schedule for tasks that have already been scheduled', async () => {
236+
const task = getTask();
237+
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
238+
const bulkUpdateScheduleSpy = jest
239+
.spyOn(taskScheduling, 'bulkUpdateSchedules')
240+
.mockResolvedValue({
241+
tasks: [],
242+
errors: [
243+
{
244+
id: 'my-foo-id',
245+
type: 'task',
246+
error: {
247+
error: 'error',
248+
message: 'Failed to update schedule due to version conflict',
249+
statusCode: 409,
250+
},
251+
},
252+
],
253+
});
254+
mockTaskStore.schedule.mockRejectedValueOnce({
255+
statusCode: 409,
256+
});
257+
258+
const result = await taskScheduling.ensureScheduled(task);
259+
260+
expect(bulkUpdateScheduleSpy).toHaveBeenCalledWith(['my-foo-id'], { interval: '1m' });
261+
expect(result.id).toEqual('my-foo-id');
262+
});
263+
152264
test('doesnt ignore failure to scheduling existing tasks for reasons other than already being scheduled', async () => {
153265
const taskScheduling = new TaskScheduling(taskSchedulingOpts);
154266
mockTaskStore.schedule.mockRejectedValueOnce({

x-pack/platform/plugins/shared/task_manager/server/task_scheduling.ts

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,12 +284,26 @@ export class TaskScheduling {
284284
*/
285285
public async ensureScheduled(
286286
taskInstance: TaskInstanceWithId,
287-
options?: Record<string, unknown>
287+
options?: ScheduleOptions
288288
): Promise<TaskInstanceWithId> {
289289
try {
290290
return await this.schedule(taskInstance, options);
291291
} catch (err) {
292292
if (err.statusCode === VERSION_CONFLICT_STATUS) {
293+
// check if task specifies a schedule interval
294+
// if so,try to update the just the schedule
295+
// only works for interval schedule
296+
if (taskInstance.schedule && taskInstance.schedule.interval) {
297+
const result = await this.bulkUpdateSchedules([taskInstance.id], taskInstance.schedule);
298+
if (
299+
result.errors.length &&
300+
result.errors[0].error.statusCode !== VERSION_CONFLICT_STATUS
301+
) {
302+
throw new Error(
303+
`Tried to update schedule for existing task "${taskInstance.id}" but failed with error: ${result.errors[0].error.message}`
304+
);
305+
}
306+
}
293307
return taskInstance;
294308
}
295309
throw err;

x-pack/platform/test/plugin_api_integration/plugins/sample_task_plugin/server/init_routes.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,12 +315,13 @@ export function initRoutes(
315315
params: schema.object({}),
316316
state: schema.maybe(schema.object({})),
317317
id: schema.maybe(schema.string()),
318+
schedule: schema.maybe(schema.object({ interval: schema.string() })),
318319
}),
319320
}),
320321
},
321322
},
322323
async function (
323-
context: RequestHandlerContext,
324+
_: RequestHandlerContext,
324325
req: KibanaRequest<any, any, any, any>,
325326
res: KibanaResponseFactory
326327
): Promise<IKibanaResponse<any>> {

x-pack/platform/test/plugin_api_integration/test_suites/task_manager/task_management.ts

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ export default function ({ getService }: FtrProviderContext) {
208208
.then((response: { body: BulkUpdateTaskResult }) => response.body);
209209
}
210210

211-
function scheduleTaskIfNotExists(task: Partial<ConcreteTaskInstance>) {
211+
function ensureTaskScheduled(task: Partial<ConcreteTaskInstance>) {
212212
return supertest
213213
.post('/api/sample_tasks/ensure_scheduled')
214214
.set('kbn-xsrf', 'xxx')
@@ -426,15 +426,15 @@ export default function ({ getService }: FtrProviderContext) {
426426
});
427427

428428
it('should allow a task with a given ID to be scheduled multiple times', async () => {
429-
const result = await scheduleTaskIfNotExists({
429+
const result = await ensureTaskScheduled({
430430
id: 'test-task-to-reschedule-in-task-manager',
431431
taskType: 'sampleTask',
432432
params: {},
433433
});
434434

435435
expect(result.id).to.be('test-task-to-reschedule-in-task-manager');
436436

437-
const rescheduleResult = await scheduleTaskIfNotExists({
437+
const rescheduleResult = await ensureTaskScheduled({
438438
id: 'test-task-to-reschedule-in-task-manager',
439439
taskType: 'sampleTask',
440440
params: {},
@@ -921,6 +921,36 @@ export default function ({ getService }: FtrProviderContext) {
921921
});
922922
});
923923

924+
it('should update schedule for existing task when calling ensureScheduled with a different schedule', async () => {
925+
// schedule the task
926+
const taskId = 'sample-recurring-task-id';
927+
await scheduleTask({
928+
id: taskId,
929+
taskType: 'sampleRecurringTask',
930+
schedule: { interval: '1d' },
931+
params: {},
932+
});
933+
934+
await retry.try(async () => {
935+
expect((await historyDocs()).length).to.eql(1);
936+
const task = await currentTask(taskId);
937+
expect(task.schedule?.interval).to.eql('1d');
938+
});
939+
940+
// call ensureScheduled with a different schedule
941+
await ensureTaskScheduled({
942+
id: taskId,
943+
taskType: 'sampleRecurringTask',
944+
params: {},
945+
schedule: { interval: '5m' },
946+
});
947+
948+
await retry.try(async () => {
949+
const task = await currentTask(taskId);
950+
expect(task.schedule?.interval).to.eql('5m');
951+
});
952+
});
953+
924954
function expectReschedule(
925955
originalRunAt: number,
926956
task: SerializedConcreteTaskInstance<any, any>,

0 commit comments

Comments
 (0)