-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathworker.js
More file actions
238 lines (197 loc) · 6.67 KB
/
worker.js
File metadata and controls
238 lines (197 loc) · 6.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
const { Worker, QueueScheduler } = require('bullmq');
const Redis = require('ioredis');
const fs = require('fs-extra');
const path = require('path');
const puppeteer = require('puppeteer');
const { uploadPosterToCloud } = require('./cloudService');
const { addEvent, getPoster, updatePoster } = require('./store');
const { REDIS_CONNECTION_STRING } = require('../constants');
const CLIENT_URL = 'http://localhost:5000';
const RENDER_TIMEOUT = 24 * 60 * 60 * 1000;
const MAX_RENDER_ATTEMPTS = 3;
const SCALE = 96 / 72;
let browser = null;
let currentJob = null;
const outputPath = path.join(__dirname, '..', 'output');
const pdfPath = (id) => path.join(outputPath, `${id}.pdf`);
async function initialize() {
browser = await puppeteer.launch({ args: ['--no-sandbox', '--disable-setuid-sandbox'] });
browser.on('disconnected', () => {
browser = null;
});
}
/**
* Renders component to PDF file
* @returns {Promise}
*/
async function renderComponent(options) {
const { id, props, onInfo, onError } = options;
props.id = id;
const page = await browser.newPage();
page.on('error', async (error) => {
await page.close();
await browser.close();
onError(error);
});
// Puppeteer error logs.
page.on('pageerror', (error) => {
onError(error);
});
// Puppeteer console.
page.on('console', (msg) => {
if (['warning', 'log'].includes(msg.type())) {
onInfo(`Console(${msg.type()}): ${msg.text()}`);
}
});
const encodedProps = encodeURIComponent(JSON.stringify(props));
const renderUrl = `${CLIENT_URL}/?props=${encodedProps}`;
console.log(renderUrl);
await page.goto(renderUrl, { timeout: RENDER_TIMEOUT });
// Wait until the page informs to be ready or having error.
await page.waitForFunction('window.renderStatus !== undefined', {
timeout: RENDER_TIMEOUT,
});
// Check if there were error in the page. Note! This captures only properly handled errors in the page.
const wasError = await page.evaluate(() => window.renderStatus === 'error');
if (wasError) {
throw new Error('Rendering process failed due to an error on the page.');
}
// Get the dimensions of the page
const { width, height } = await page.evaluate(() => {
return {
width: document.getElementById('rootImageElement').offsetWidth,
height: document.getElementById('rootImageElement').offsetHeight,
};
});
await page.emulateMediaType('screen');
const printOptions = {
printBackground: true,
width: width * SCALE,
height: height * SCALE,
pageRanges: '1',
scale: SCALE,
timeout: 5 * 60000,
};
try {
const contents = await page.pdf(printOptions);
await fs.outputFile(pdfPath(id), contents);
await page.close();
await uploadPosterToCloud(pdfPath(id));
} catch (e) {
throw new Error('PDF Generation failed', e);
}
}
async function renderComponentRetry(options) {
const { onInfo, onError } = options;
for (let i = 0; i < MAX_RENDER_ATTEMPTS; i++) {
/* eslint-disable no-await-in-loop */
// Check if the job was manually cancelled. Do not restart the rendering process.
const poster = await getPoster({ id: options.id });
if (poster.status === 'FAILED' || !poster) {
onInfo('Failed or canceled');
return { success: false };
}
try {
onInfo(i > 0 ? 'Retrying' : 'Rendering');
if (!browser) {
onInfo('Creating new browser instance');
await initialize();
}
/* eslint-disable no-promise-executor-return */
const timeout = new Promise((resolve, reject) =>
setTimeout(reject, RENDER_TIMEOUT, new Error('Render timeout')),
);
/* eslint-enable no-promise-executor-return */
await Promise.race([renderComponent(options), timeout]);
onInfo('Rendered successfully');
return { success: true };
} catch (error) {
onError(error);
}
}
return { success: false };
}
/**
* Adds component to render queue
* @param {Object} options
* @param {string} options.id - Unique id
* @param {Object} options.props - Props to pass to component
*/
async function generate(options) {
const { id } = options;
currentJob = id;
const onInfo = (message) => {
const date = new Date().toUTCString();
console.log(`${date} ${id}: ${message}`); // eslint-disable-line no-console
addEvent({ posterId: id, type: 'INFO', message });
};
const onError = (error) => {
const date = new Date().toUTCString();
console.error(`${date} ${id}: ${error.message} ${error.stack}`); // eslint-disable-line no-console
addEvent({ posterId: id, type: 'ERROR', message: error.message });
};
const { success } = await renderComponentRetry({
...options,
onInfo,
onError,
});
updatePoster({ id, status: success ? 'READY' : 'FAILED' });
currentJob = null;
}
const bullRedisConnection = new Redis(REDIS_CONNECTION_STRING, {
maxRetriesPerRequest: null,
enableReadyCheck: false,
});
// Queue scheduler to restart stopped jobs.
// TODO: If multiple services, move to separeted microservice! Only few instances maximum needed for the cluster.
// Not needed for local dev environment.
const queueScheduler = new QueueScheduler('generator', { connection: bullRedisConnection });
// Worker implementation
const worker = new Worker(
'generator',
async (job) => {
const { options } = job.data;
await generate(options);
},
{ connection: bullRedisConnection },
);
console.log('Worker ready for jobs!');
worker.on('active', (job) => {
console.log(`Started to process ${job.id}`);
});
worker.on('completed', (job) => {
console.log(`${job.id} has completed!`);
});
worker.on('failed', (job, err) => {
console.log(`${job.id} has failed with ${err.message}`);
});
worker.on('drained', () => console.log('Job queue empty! Waiting for new jobs...'));
// While bullmq doesn't support cancelling the jobs, this helper will do it by closing the browser.
const cancelSignalRedis = new Redis(REDIS_CONNECTION_STRING);
cancelSignalRedis.subscribe('cancel', (err) => {
if (err) {
console.error('Failed to start listening to cancellation signals: %s', err.message);
} else {
console.log('Listening to cancellation signals.');
}
});
cancelSignalRedis.on('message', (channel, message) => {
if (channel === 'cancel') {
console.log(`Received cancellation signal for id ${message}`);
if (message === currentJob) {
console.log('The job was in progress on this worker! Terminating it...');
if (browser) {
browser.close();
}
}
}
});
// Make sure all the connections will be removed.
process.on('SIGINT', () => {
console.log('Shutting down worker...');
worker.close(true);
queueScheduler.close();
cancelSignalRedis.disconnect();
cancelSignalRedis.quit();
process.exit(0);
});