Skip to content

Commit 3db5623

Browse files
committed
Using catch/then for async jobs / bug fixes
1 parent b866938 commit 3db5623

File tree

2 files changed

+53
-28
lines changed

2 files changed

+53
-28
lines changed

package/server/imports/actions/execute/process.js

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,44 +2,60 @@ import { Utilities } from "../../utilities"
22
import { toolbelt } from "./toolbelt.js"
33
import { reschedule } from "../reschedule/"
44

5+
56
var process = async function (doc, callback) {
67
// Goals:
78
// 1- Execute the job
89
// 2- Update the document in database
910
// 3- Capture the result (if any)
1011

1112
var Toolbelt = new toolbelt(doc);
12-
var jobResult;
1313
var jobFunc = Utilities.registry.data[doc.name];
1414
var isAsync = jobFunc.constructor.name === "AsyncFunction";
1515

16-
try {
17-
if (isAsync) {
18-
jobResult = await jobFunc.apply(Toolbelt, doc.arguments);
19-
} else {
20-
jobResult = jobFunc.apply(Toolbelt, doc.arguments);
21-
}
22-
23-
24-
var resolution = Toolbelt.checkForResolution(jobResult);
25-
26-
if (typeof callback === "function") {
27-
return callback(undefined, jobResult);
28-
} else {
29-
return jobResult;
16+
if (isAsync) {
17+
jobResult = await jobFunc.apply(Toolbelt, doc.arguments).catch(function (error) {
18+
var failure = Toolbelt.failure(error);
19+
20+
Utilities.logger("Job failed to run due to code error: " + doc.name)
21+
console.log(error);
22+
23+
if (typeof callback === "function") {
24+
return callback(true, undefined);
25+
}
26+
}).then(function (jobResult) {
27+
var resolution = Toolbelt.checkForResolution(jobResult);
28+
29+
if (typeof callback === "function") {
30+
return callback(undefined, jobResult);
31+
} else {
32+
return jobResult;
33+
}
34+
});
35+
36+
} else {
37+
try {
38+
const jobResult = jobFunc.apply(Toolbelt, doc.arguments);
39+
var resolution = Toolbelt.checkForResolution(jobResult);
40+
41+
if (typeof callback === "function") {
42+
return callback(undefined, jobResult);
43+
} else {
44+
return jobResult;
45+
}
3046
}
31-
}
3247

33-
catch (e) {
34-
var failure = Toolbelt.failure(e.stack);
35-
36-
Utilities.logger("Job failed to run due to code error: " + doc.name)
37-
console.log(e);
48+
catch (e) {
49+
var failure = Toolbelt.failure(e.stack);
50+
51+
Utilities.logger("Job failed to run due to code error: " + doc.name)
52+
console.log(e);
3853

39-
if (typeof callback === "function") {
40-
return callback(true, undefined);
54+
if (typeof callback === "function") {
55+
return callback(true, undefined);
56+
}
4157
}
42-
}
58+
}
4359
}
4460

4561
export { process }

package/server/imports/actions/execute/toolbelt.js

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import { remove } from "../remove/"
66

77
var toolbelt = function (jobDoc) {
88
this.document = jobDoc;
9-
109
this.resolved = false;
1110

1211
this.set = function (key, value) {
@@ -152,7 +151,9 @@ var toolbelt = function (jobDoc) {
152151

153152
this.failure = function (result) {
154153
var docId = this.document._id;
155-
154+
var queueName = this.document.name;
155+
156+
// Update the document
156157
var update = Utilities.collection.update(docId, {
157158
$set: {
158159
state: "failure",
@@ -167,6 +168,14 @@ var toolbelt = function (jobDoc) {
167168
}
168169
})
169170

171+
// Stop the queue
172+
Utilities.logger([
173+
"Job has failed: " + queueName + ", " + docId,
174+
"Queue was stopped; please correct your job function and restart the server"
175+
]);
176+
177+
Operator.manager.queues[queueName].stop();
178+
170179
this.resolved = true;
171180

172181
return update;
@@ -225,9 +234,9 @@ var toolbelt = function (jobDoc) {
225234

226235
this.checkForResolution = function (result) {
227236
var docId = this.document._id;
228-
var queueName = this.document.name;
229237
var resolution = this.resolved;
230-
238+
239+
231240
if (!resolution) this.success(result)
232241
}
233242
}

0 commit comments

Comments
 (0)