forked from fippo/rtcstats-server
-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathdemux.js
More file actions
355 lines (296 loc) · 11.8 KB
/
demux.js
File metadata and controls
355 lines (296 loc) · 11.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
const fs = require('fs');
const sizeof = require('object-sizeof');
const { Writable } = require('stream');
const util = require('util');
const PromCollector = require('./metrics/PromCollector.js');
const utils = require('./utils/utils');
const { uuidV4 } = require('./utils/utils.js');
// we are using this because we want the regular file descriptors returned,
// not the FileHandle objects from fs.promises.open
const fsOpen = util.promisify(fs.open);
/**
*
*/
class RequestData {
/**
*
* @param {*} param0
*/
constructor({ timestamp, sequenceNumber }) {
this.timestamp = timestamp;
this.sequenceNumber = sequenceNumber;
}
}
/**
* Stream designed to handle API requests and write them to a sink (file WritableStream at this point)
*/
class DemuxSink extends Writable {
/**
*
* C'tor
*
* @param {string} dumpFolder - Path to where sink files will be temporarily stored.
* @param {Object} connectionInfo - Object containing information about the ws connection which stream the data.
* @param {Object} log - Log object.
* @param {boolean} persistDump - Flag used for generating a complete dump of the data coming to the stream.
* Required when creating mock tests.
*/
constructor({ tempPath, dumpFolder, dumpPath, connectionInfo, log, persistDump = false }) {
super({ objectMode: true });
this.dumpFolder = dumpFolder;
this.dumpPath = dumpPath;
this.connectionInfo = connectionInfo;
this.log = log;
this.timeoutId = -1;
this.sinkMap = new Map();
this.persistDump = persistDump;
this.lastSequenceNumber = 0;
this.lastTimestamp = -1;
this.tempPath = tempPath;
// TODO move this as a separate readable/writable stream so we don't pollute this class.
if (this.persistDump) {
this.persistDumpPath = `${dumpFolder}/stream-${uuidV4()}`;
this.streamDumpFile = fs.createWriteStream(this.persistDumpPath);
}
}
/**
* Close all the opened sinks and stop the timeout, this should be called once
* the stream has ended.
*
*/
_clearState() {
this.log.debug('[Demux] Clearing demux state');
clearTimeout(this.timeoutId);
for (const sinkData of this.sinkMap.values()) {
this._sinkClose(sinkData);
}
}
/**
* Stream level timeout, this will trigger if nothing gets written to stream for a predefined amount of time.
*/
_timeout() {
this.log.debug('[Demux] Timeout reached');
// The stream will eventually call _destroy which will properly handle the cleanup.
this.end();
}
/**
* Implementation of the stream api, will be called when stream closes regardless if it's because of an error
* or happy flow. We use this chance to clear the states of the demux.
*
* @param {Error} - Error or null if nothing went wrong.
* @param {Function} - Needs to be called in order to successfully end the state of the stream.
*/
_destroy(err, cb) {
this.log.debug('[Demux] Destroy called with err:', err);
this._clearState();
// Forward the state in which the stream closed, required by the stream api.
cb(err);
}
/**
* Implementation of the stream API. Receive inbound objects and direct them to the API implemented
* in _handSinkEvent.
*
* @param {Object} obj - Object and not buffer because of object mode.
* @param {string} encoding - Would be the string encoding, ignore because we're in object mode.
* @param {Function} cb - Needs to be called as dictated by the stream api.
*/
_write(obj, encoding, cb) {
if (this.persistDump) {
this.streamDumpFile.write(`${JSON.stringify(obj)}\n`);
}
// We received something so reset the timeout.
clearTimeout(this.timeoutId);
this.timeoutId = setTimeout(this._timeout.bind(this), 60000);
this._handleRequest(obj)
.then(cb)
.catch(cb);
}
/**
* Close the target sink.
*
* @param {string} id - UniqueId associated with the sink
* @param {WriteStream} sink - Opened file writable stream associated with the id
*/
_sinkClose({ id, sink }) {
this.log.info('[Demux] close-sink %s', id);
sink.end();
}
/**
* Once the sink has notified us that it finished writing we can notify the clients that they can now, process
* the generated file.
*
* @param {string} id - sink id as saved in the sinkMap
*/
_handleSinkClose(id) {
const sinkData = this.sinkMap.get(id);
// Sanity check, make sure the data is available if not log an error and just send the id such that any
// listening client has s chance to handle the sink.
if (sinkData) {
// we need to emit this on file stream finish
this.emit('close-sink', {
id: sinkData.id
});
} else {
this.log.error('[Demux] sink on close meta should be available id:', id);
this.emit('close-sink', { id });
}
this.sinkMap.delete(id);
}
/**
* Open a writable file stream and associate it with the provided unique id.
*
* @param {string} id - unique id.
* @returns {Object} - Associated metadata object that will be saved in the local map.
*/
async _sinkCreate(id) {
PromCollector.sessionCount.inc();
const resolvedId = id;
const isReconnect = fs.existsSync(this.dumpPath);
const fd = await fsOpen(this.dumpPath, 'a');
this.log.info('[Demux] open-sink id: %s; path %s; connection: %o', id, this.dumpPath, this.connectionInfo);
const sink = fs.createWriteStream(this.dumpPath, { fd });
// Add the associated data to a map in order to properly direct requests to the appropriate sink.
const sinkData = {
id: resolvedId,
sink,
meta: {
startDate: this.startDate,
dumpPath: this.dumpPath
}
};
this.sinkMap.set(id, sinkData);
sink.on('error', error => this.log.error('[Demux] sink on error id: ', id, ' error:', error));
// The close event should be emitted both on error and happy flow.
sink.on('close', this._handleSinkClose.bind(this, id));
if (!isReconnect) {
// Initialize the dump file by adding the connection metadata at the beginning. This data is usually used
// by visualizer tools for identifying the originating client (browser, jvb or other).
this._sinkWrite(
sink,
JSON.stringify([ 'connectionInfo',
null, JSON.stringify(this.connectionInfo), Date.now() ]));
}
return sinkData;
}
/**
* Update metadata in the local map and write it to the sink.
*
* @param {Object} sinkData - Current sink metadata
* @param {Object} data - New metadata.
*/
async _sinkUpdateMetadata(sinkData, data) {
// We expect metadata to be objects thus we need to stringify them before writing to the sink.
this._sinkWrite(sinkData.sink, JSON.stringify(data));
}
/**
* Self explanatory.
*
* @param {WritableStream} sink - Target sink.
* @param {string} data - Data to write as a string.
*/
_sinkWrite(sink, data) {
// TODO Add support for objects as well, in case we receive an object just serialize it.
if (data) {
sink.write(`${data}\n`);
}
}
/**
* Precondition that checks that a requests has the expected fields.
*
* @param {string} clientId
* @param {string} type
*/
_requestPrecondition({ statsSessionId, type }) {
if (!statsSessionId) {
throw new Error('[Demux] statsSessionId missing from request!');
}
if (!type) {
throw new Error('[Demux] type missing from request!');
}
}
/**
* Validates first if a previously connected sessionId's data has been processed
* and checks if there is no missing sequence number
*
* @param {*} statsSessionId
* @param {*} sequenceNumber
* @param {*} lastSequenceNumber
*/
_validateSequenceNumber(statsSessionId, sequenceNumber, lastSequenceNumber) {
if ((sequenceNumber - lastSequenceNumber > 1)
&& !fs.existsSync(utils.getDumpPath(this.tempPath, statsSessionId))) {
PromCollector.dataIsAlreadyProcessedCount.inc();
throw new Error(`[Demux] Session reconnected but file was already processed! sessionId: ${statsSessionId}`);
}
if (sequenceNumber - this.lastSequenceNumber > 1) {
this.log.error(`[Demux] sequence number is missing!
sessionId: ${statsSessionId}
sequenceNumber: ${sequenceNumber}
lastSequenceNumber: ${this.lastSequenceNumber}`
);
PromCollector.missingSequenceNumberCount.inc();
}
}
/**
*
* @param {*} data
* @returns {RequestData}
*/
_toRequestData(data) {
const jsonData = Array.isArray(data) ? data : JSON.parse(data);
const sequenceNumber = jsonData[4];
const timestamp = jsonData[3];
return new RequestData({ timestamp,
sequenceNumber });
}
/**
* Handle API requests.
*
* @param {Object} request - Request object
*/
async _handleRequest(request) {
this._requestPrecondition(request);
PromCollector.requestSizeBytes.observe(sizeof(request));
const { statsSessionId, type, data } = request;
// save the last sequence number to notify the frontend
if (data) {
const requestData = this._toRequestData(data);
this._validateSequenceNumber(statsSessionId, requestData.sequenceNumber, this.lastSequenceNumber);
this.lastTimestamp = requestData.timestamp;
this.lastSequenceNumber = requestData.sequenceNumber;
}
// If this is the first request coming from this client id ,create a new sink (file write stream in this case)
// and it's associated metadata.
// In case of reconnects the incremental sink naming convention described in _sinkCreate
// will take care of it.
const sinkData = this.sinkMap.get(statsSessionId) || await this._sinkCreate(statsSessionId);
if (!sinkData) {
this.log.warn('[Demux] Received data for already closed sink: ', statsSessionId);
return;
}
switch (type) {
// Close will be sent by a client when operations on a statsSessionId have been completed.
// Subsequent operations will be taken by services in the upper level, like upload to store and persist
// metadata do a db.
case 'close':
this.log.info('[Demux] sink closed');
return this._sinkClose(sinkData);
// Identity requests will update the local metadata and also write it to the sink.
// Metadata associated with a sink will be propagated through an event to listeners when the sink closes,
// either on an explicit close or when the timeout mechanism triggers.
case 'identity':
return this._sinkUpdateMetadata(sinkData, data);
// Generic request with stats data, simply write it to the sink.
case 'stats-entry':
return this._sinkWrite(sinkData.sink, data);
// Request sent by clients in order to keep the timeout from triggering.
case 'keepalive':
this.log.debug('[Demux] Keepalive received for :', statsSessionId);
return;
default:
this.log.warn('[Demux] Invalid API Request: ', request);
return;
}
}
}
module.exports = DemuxSink;