|
1 | 1 | 'use strict';
|
2 | 2 |
|
3 |
| -const { Readable, PassThrough, Stream } = require('readable-stream'); |
4 |
| -const { parse } = require('JSONStream'); |
5 |
| -const mergeStream = require('merge-stream'); |
6 |
| -const { dedupe, sort, setOrder } = require('./util'); |
7 | 3 | const assert = require('assert');
|
8 | 4 |
|
9 |
| -module.exports = class Reader extends Readable { |
10 |
| - constructor(streams) { |
11 |
| - super(); |
12 |
| - |
13 |
| - assert( |
14 |
| - streams instanceof Stream || Array.isArray(streams), |
15 |
| - `Expected first argument to new Reader() to be a stream or array of streams. |
16 |
| - Instead got ${typeof stream}` |
17 |
| - ); |
18 |
| - |
19 |
| - if (!Array.isArray(streams)) { |
20 |
| - streams = [streams]; |
21 |
| - } |
22 |
| - |
23 |
| - assert( |
24 |
| - streams.every(stream => stream instanceof Stream), |
25 |
| - 'Expected any/all arguments given to Reader constructor to be subclasses of Stream.' |
26 |
| - ); |
27 |
| - |
28 |
| - assert( |
29 |
| - streams.length, |
30 |
| - 'Expected at least one stream to be provided to new Reader(). Got none.' |
31 |
| - ); |
32 |
| - |
33 |
| - const merged = mergeStream(); |
34 |
| - |
35 |
| - let count = 0; |
36 |
| - streams.forEach((readStream, index) => { |
37 |
| - const tmpStream = new PassThrough({ objectMode: true }); |
38 |
| - merged.add(tmpStream); |
39 |
| - |
40 |
| - readStream.on('file found', file => { |
41 |
| - this.emit('file found', file); |
42 |
| - readStream |
43 |
| - .pipe(parse('*')) |
44 |
| - .on('error', err => { |
45 |
| - this.emit('error', err); |
46 |
| - }) |
47 |
| - .pipe(setOrder(index)) |
48 |
| - .pipe(tmpStream); |
49 |
| - |
50 |
| - count++; |
51 |
| - if (count === streams.length) { |
52 |
| - this.emit('pipeline ready'); |
53 |
| - } |
54 |
| - }); |
55 |
| - |
56 |
| - readStream.on('file not found', file => { |
57 |
| - this.emit('file not found', file); |
58 |
| - tmpStream.end(); |
59 |
| - |
60 |
| - count++; |
61 |
| - if (count === streams.length) { |
62 |
| - this.emit('pipeline ready'); |
63 |
| - } |
64 |
| - }); |
65 |
| - }); |
66 |
| - |
67 |
| - this.data = merged.pipe(sort()).pipe(dedupe()); |
68 |
| - this.data.pause(); |
69 |
| - |
70 |
| - this.data.on('data', chunk => { |
71 |
| - this.push(`${chunk.content.trim()}\n\n`); |
72 |
| - }); |
73 |
| - |
74 |
| - this.data.on('end', () => { |
75 |
| - this.push(null); |
76 |
| - }); |
77 |
| - } |
78 |
| - |
79 |
| - _read() { |
80 |
| - return this.data.resume(); |
81 |
| - } |
| 5 | +module.exports = async function reader(feeds = []) { |
| 6 | + assert( |
| 7 | + feeds.length, |
| 8 | + `Expected at least 1 feed to be given. Instead got "${feeds.length}"` |
| 9 | + ); |
| 10 | + assert( |
| 11 | + feeds.every(Array.isArray), |
| 12 | + `Expected every feed to be an array. Instead got "${feeds.join(', ')}"` |
| 13 | + ); |
| 14 | + |
| 15 | + feeds = feeds.reduce((prev, current) => prev.concat(current), []); |
| 16 | + |
| 17 | + const map = new Map(); |
| 18 | + feeds.forEach(feed => { |
| 19 | + // because map preserves order, when we get a duplicate we actually |
| 20 | + // need to append to the end of the map. We do that by deleting |
| 21 | + // first and then adding to the map |
| 22 | + map.delete(feed.id); |
| 23 | + map.set(feed.id, feed); |
| 24 | + }); |
| 25 | + return Array.from(map.values()) |
| 26 | + .map(feed => feed.content.trim()) |
| 27 | + .join('\n\n'); |
82 | 28 | };
|
0 commit comments