Skip to content

Commit f391650

Browse files
Saas 5485 - fixed race condition bug caused by tasks priority
* added tasks priority and sorted mechanism to avoid task creation and deletion race conditions. Create tasks, should always be executed and resolved before Delete tasks.
1 parent 987565f commit f391650

13 files changed

+111
-17
lines changed

constants.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,12 @@ const AGENT_MODES = {
33
IN_CLUSTER: 'InCluster',
44
};
55

6+
const TASK_PRIORITY = {
7+
HIGH: 1,
8+
MEDIUM: 2,
9+
LOW: 3
10+
};
11+
612
const LOGGER_MODES = {
713
// print logs more pretty
814
PRETTY: 'pretty',
@@ -23,4 +29,5 @@ module.exports = {
2329
LOGGER_NAMESPACES,
2430
LOGGER_MODES,
2531
CRON,
32+
TASK_PRIORITY,
2633
};

jobs/TaskPullerJob/TaskPuller.job.js

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ const Promise = require('bluebird');
22
const Chance = require('chance');
33
const _ = require('lodash');
44
const Base = require('../BaseJob');
5+
const { TASK_PRIORITY } = require('../../constants');
56
const CreatePod = require('./tasks/CreatePod.task');
67
const DeletePod = require('./tasks/DeletePod.task');
78
const CreatePvc = require('./tasks/CreatePvc.task');
@@ -12,33 +13,41 @@ const ERROR_MESSAGES = {
1213
};
1314

1415
class TaskPullerJob extends Base {
16+
constructor(...args) {
17+
super(...args);
18+
19+
this.typeToTaskMap = {
20+
'CreatePod': { executor: this._executeTask(CreatePod), priority: CreatePod.priority },
21+
'DeletePod': { executor: this._executeTask(DeletePod), priority: DeletePod.priority },
22+
'CreatePvc': { executor: this._executeTask(CreatePvc), priority: CreatePvc.priority },
23+
'DeletePvc': { executor: this._executeTask(DeletePvc), priority: DeletePvc.priority },
24+
'NOOP': { executor: _.noop, priority: TASK_PRIORITY.LOW },
25+
};
26+
}
27+
1528
run() {
1629
return this.codefreshAPI.pullTasks(this.logger)
1730
.catch((err) => {
1831
const message = `${ERROR_MESSAGES.FAILED_TO_EXECUTE_TASK} with message: ${err.message}`;
1932
this.logger.error(message);
2033
throw new Error(message);
2134
})
22-
.then((res = []) => {
35+
.then(async (res = []) => {
2336
this.logger.info(`Got ${res.length} tasks`);
24-
const promises = _.chain(res)
25-
.map((task) => {
26-
// TODO auto load all tasks
27-
const typeToTaskMap = {
28-
'CreatePod': this._executeTask(CreatePod),
29-
'DeletePod': this._executeTask(DeletePod),
30-
'CreatePvc': this._executeTask(CreatePvc),
31-
'DeletePvc': this._executeTask(DeletePvc),
32-
};
37+
38+
const tasks = _.chain(res)
39+
.map((task) => {
3340
const type = _.get(task, 'type');
3441
this.logger.info(`Got request to run task with type: ${type}`);
35-
const fn = typeToTaskMap[type] || _.noop;
36-
return fn(task);
42+
const { executor, priority } = _.get(this.typeToTaskMap, type, this.typeToTaskMap.NOOP);
43+
return { task, priority, executor };
3744
})
38-
.compact()
39-
.flattenDeep()
45+
.filter(({ executor }) => executor !== _.noop)
46+
.sortBy(({ priority }) => priority)
4047
.value();
41-
return Promise.all(promises);
48+
49+
// resolves each promise sequentially, in sorted order
50+
return await Promise.mapSeries(tasks, ({ task, executor }) => executor(task));
4251
});
4352
}
4453

@@ -57,5 +66,6 @@ class TaskPullerJob extends Base {
5766
return;
5867
}
5968
}
69+
6070
TaskPullerJob.Errors = ERROR_MESSAGES;
6171
module.exports = TaskPullerJob;

jobs/TaskPullerJob/__tests__/TaskPuller.job.spec.js

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,17 @@
11
const _ = require('lodash');
2+
const Promise = require('bluebird');
23
const { create: createLogger } = require('../../../services/Logger');
34
const TaskPullerJob = require('../TaskPuller.job');
45
const CreatePodTask = require('../tasks/CreatePod.task');
6+
const DeletePodTask = require('../tasks/DeletePod.task');
7+
const CreatePvcTask = require('../tasks/CreatePvc.task');
8+
const DeletePvcTask = require('../tasks/DeletePvc.task');
59

610
jest.mock('./../../../services/Logger');
711
jest.mock('./../tasks/CreatePod.task');
12+
jest.mock('./../tasks/DeletePod.task');
13+
jest.mock('./../tasks/CreatePvc.task');
14+
jest.mock('./../tasks/DeletePvc.task');
815

916
describe('TaskPullerJob unit tests', () => {
1017
it('Should throw an error when codefresh service call failed', () => {
@@ -78,4 +85,42 @@ describe('TaskPullerJob unit tests', () => {
7885
}, _.noop(), logger);
7986
return expect(task.exec()).resolves.toEqual([]);
8087
});
88+
89+
it('Should always execute and resolve the HIGH priority task first', async () => {
90+
const tasks = [
91+
{
92+
type: 'DeletePod'
93+
},
94+
{
95+
type: 'DeletePvc'
96+
},
97+
{
98+
type: 'CreatePod'
99+
},
100+
{
101+
type: 'CreatePvc'
102+
},
103+
];
104+
105+
DeletePodTask.mockImplementationOnce(() => ({
106+
exec: jest.fn(async () => Promise.delay(10,'DeletePod'))
107+
}));
108+
DeletePvcTask.mockImplementationOnce(() => ({
109+
exec: jest.fn(async () => Promise.delay(10,'DeletePvc'))
110+
}));
111+
CreatePodTask.mockImplementationOnce(() => ({
112+
exec: jest.fn(async () => Promise.delay(400, 'CreatePod'))
113+
}));
114+
CreatePvcTask.mockImplementationOnce(() => ({
115+
exec: jest.fn(async () => Promise.delay(400, 'CreatePvc'))
116+
}));
117+
118+
const logger = createLogger();
119+
const job = new TaskPullerJob({
120+
pullTasks: jest.fn().mockResolvedValueOnce(tasks)
121+
}, _.noop, logger);
122+
123+
const results = await job.exec();
124+
expect(results).toEqual(['CreatePod', 'CreatePvc', 'DeletePod', 'DeletePvc']);
125+
});
81126
});

jobs/TaskPullerJob/tasks/CreatePod.task.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
const Joi = require('joi');
22
const Base = require('../../BaseJob');
3+
const { TASK_PRIORITY } = require('../../../constants');
34

45
const ERROR_MESSAGES = {
56
FAILED_TO_EXECUTE_TASK: 'Failed to run task CreatePod',
@@ -22,6 +23,8 @@ class CreatePodTask extends Base {
2223
return Joi.validate(task, CreatePodTask.validationSchema);
2324
}
2425
}
26+
27+
CreatePodTask.priority = TASK_PRIORITY.HIGH;
2528
CreatePodTask.Errors = ERROR_MESSAGES;
2629
CreatePodTask.validationSchema = Joi.object().keys({
2730
spec: Joi.object().required(),

jobs/TaskPullerJob/tasks/CreatePvc.task.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
const Joi = require('joi');
22
const Base = require('../../BaseJob');
3+
const { TASK_PRIORITY } = require('../../../constants');
34

45
const ERROR_MESSAGES = {
56
FAILED_TO_EXECUTE_TASK: 'Failed to run task CreatePvc',
@@ -22,8 +23,11 @@ class CreatePvcTask extends Base {
2223
return Joi.validate(task, CreatePvcTask.validationSchema);
2324
}
2425
}
26+
27+
CreatePvcTask.priority = TASK_PRIORITY.HIGH;
2528
CreatePvcTask.Errors = ERROR_MESSAGES;
2629
CreatePvcTask.validationSchema = Joi.object().keys({
2730
spec: Joi.object().required(),
2831
}).options({ stripUnknown: true });
32+
2933
module.exports = CreatePvcTask;

jobs/TaskPullerJob/tasks/DeletePod.task.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
const _ = require('lodash');
22
const Joi = require('joi');
33
const Base = require('../../BaseJob');
4+
const { TASK_PRIORITY } = require('../../../constants');
45

56
const ERROR_MESSAGES = {
67
FAILED_TO_EXECUTE_TASK: 'Failed to run task DeletePod',
@@ -28,11 +29,13 @@ class DeletePodTask extends Base {
2829
}
2930
}
3031

32+
DeletePodTask.priority = TASK_PRIORITY.LOW;
3133
DeletePodTask.Errors = ERROR_MESSAGES;
3234
DeletePodTask.validationSchema = Joi.object().keys({
3335
spec: Joi.object().keys({
3436
namespace: Joi.string().required(),
3537
name: Joi.string().required()
3638
}).required(),
3739
}).options({ stripUnknown: true });
40+
3841
module.exports = DeletePodTask;

jobs/TaskPullerJob/tasks/DeletePvc.task.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
const _ = require('lodash');
22
const Joi = require('joi');
33
const Base = require('../../BaseJob');
4+
const { TASK_PRIORITY } = require('../../../constants');
45

56
const ERROR_MESSAGES = {
67
FAILED_TO_EXECUTE_TASK: 'Failed to run task DeletePvc',
@@ -28,6 +29,7 @@ class DeletePvcTask extends Base {
2829
}
2930
}
3031

32+
DeletePvcTask.priority = TASK_PRIORITY.LOW;
3133
DeletePvcTask.Errors = ERROR_MESSAGES;
3234
DeletePvcTask.validationSchema = Joi.object().keys({
3335
spec: Joi.object().keys({

jobs/TaskPullerJob/tasks/__tests__/CreatePod.task.spec.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
const _ = require('lodash');
22
const { create: createLogger } = require('../../../../services/Logger');
33
const CreatePodTask = require('../CreatePod.task');
4+
const { TASK_PRIORITY } = require('../../../../constants');
45

56
jest.mock('./../../../../services/Logger');
67

@@ -61,6 +62,10 @@ describe('CreatePod task unit tests', () => {
6162
const task = new CreatePodTask(_.noop(), kubernetesAPIMock, logger);
6263
return expect(task.run(taskDef)).resolves.toEqual(spyResult);
6364
});
65+
66+
it('Should have a HIGH priority', () => {
67+
expect(CreatePodTask.priority).toBe(TASK_PRIORITY.HIGH);
68+
});
6469
});
6570

6671
});

jobs/TaskPullerJob/tasks/__tests__/CreatePvc.task.spec.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
const _ = require('lodash');
22
const { create: createLogger } = require('../../../../services/Logger');
33
const CreatePvcTask = require('../CreatePvc.task');
4+
const { TASK_PRIORITY } = require('../../../../constants');
45

56
jest.mock('./../../../../services/Logger');
67

@@ -71,6 +72,10 @@ describe('CreatePvc task unit tests', () => {
7172
const task = new CreatePvcTask(_.noop(), kubernetesAPIMock, logger);
7273
return expect(task.run(taskDef)).resolves.toEqual(spyResult);
7374
});
75+
76+
it('Should have a HIGH priority', () => {
77+
expect(CreatePvcTask.priority).toBe(TASK_PRIORITY.HIGH);
78+
});
7479
});
7580

7681
});

jobs/TaskPullerJob/tasks/__tests__/DeletePod.task.spec.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
const _ = require('lodash');
22
const { create: createLogger } = require('../../../../services/Logger');
33
const DeletePodTask = require('../DeletePod.task');
4+
const { TASK_PRIORITY } = require('../../../../constants');
45

56
jest.mock('./../../../../services/Logger');
67

@@ -114,5 +115,9 @@ describe('DeletePod task unit tests', () => {
114115
const task = new DeletePodTask(_.noop(), kubernetesAPIMock, logger);
115116
return expect(task.exec(taskDef)).resolves.toEqual('OK');
116117
});
118+
119+
it('Should have a LOW priority', () => {
120+
expect(DeletePodTask.priority).toBe(TASK_PRIORITY.LOW);
121+
});
117122
});
118123
});

0 commit comments

Comments
 (0)