|
| 1 | +/* eslint-disable no-restricted-syntax,semi,comma-dangle,class-methods-use-this */ |
| 2 | + |
| 3 | +const { AttachmentProcessor } = require('@elastic.io/component-commons-library') |
| 4 | +const { Writable } = require('stream'); |
| 5 | +const { messages } = require('elasticio-node') |
| 6 | +const stream = require('stream') |
| 7 | +const util = require('util') |
| 8 | +const papa = require('papaparse') |
| 9 | + |
| 10 | +const pipeline = util.promisify(stream.pipeline); |
| 11 | +const attachmentProcessor = new AttachmentProcessor() |
| 12 | + |
| 13 | +// transform array to obj, for example: |
| 14 | +// ['aa', 'bb', 'cc'] => {column0: 'aa', column1: 'bb', column2: 'cc'} |
| 15 | +function arrayToObj(arr) { |
| 16 | + let columns = {} |
| 17 | + arr.forEach((value, index) => { |
| 18 | + columns = { ...columns, ...{ [`column${index}`]: value } } |
| 19 | + }) |
| 20 | + return columns |
| 21 | +} |
| 22 | + |
| 23 | +async function errHelper(text) { |
| 24 | + await this.logger.error(text) |
| 25 | + await this.emit('error', text) |
| 26 | + await this.emit('end') |
| 27 | +} |
| 28 | + |
| 29 | +async function readCSV(msg, cfg) { |
| 30 | + const that = this |
| 31 | + const emitAll = cfg.emitAll === true || cfg.emitAll === 'true' |
| 32 | + const { body } = msg |
| 33 | + |
| 34 | + // check if url provided in msg |
| 35 | + if (body.url && body.url.length > 0) { |
| 36 | + this.logger.info('URL found') |
| 37 | + } else { |
| 38 | + await errHelper.call(this, 'URL of the CSV is missing') |
| 39 | + return |
| 40 | + } |
| 41 | + |
| 42 | + if (body.header !== undefined |
| 43 | + && body.header !== '' |
| 44 | + && (typeof body.header) !== 'boolean') { |
| 45 | + await errHelper.call(this, 'Non-boolean values are not supported by "Contains headers" field') |
| 46 | + return |
| 47 | + } |
| 48 | + |
| 49 | + if (body.dynamicTyping !== undefined |
| 50 | + && body.dynamicTyping !== '' |
| 51 | + && (typeof body.dynamicTyping) !== 'boolean') { |
| 52 | + await errHelper.call(this, 'Non-boolean values are not supported by "Convert Data types" field') |
| 53 | + return |
| 54 | + } |
| 55 | + |
| 56 | + const parseOptions = { |
| 57 | + header: body.header, |
| 58 | + dynamicTyping: body.dynamicTyping, |
| 59 | + delimiter: body.delimiter |
| 60 | + } |
| 61 | + |
| 62 | + // if set "Fetch All" create object with results |
| 63 | + const outputMsg = { |
| 64 | + result: [], |
| 65 | + } |
| 66 | + |
| 67 | + let dataStream |
| 68 | + const parseStream = papa.parse(papa.NODE_STREAM_INPUT, parseOptions) |
| 69 | + |
| 70 | + try { |
| 71 | + dataStream = await attachmentProcessor.getAttachment(body.url, 'stream') |
| 72 | + this.logger.info('File received, trying to parse CSV') |
| 73 | + } catch (err) { |
| 74 | + this.logger.error(`URL - "${body.url}" unreachable: ${err}`); |
| 75 | + this.emit('error', `URL - "${body.url}" unreachable: ${err}`) |
| 76 | + this.emit('end') |
| 77 | + return |
| 78 | + } |
| 79 | + // control of node data stream |
| 80 | + class CsvWriter extends Writable { |
| 81 | + async write(chunk) { |
| 82 | + let data = {} |
| 83 | + if (parseOptions.header) { |
| 84 | + data = chunk |
| 85 | + } else { |
| 86 | + data = arrayToObj(chunk) |
| 87 | + } |
| 88 | + if (emitAll) { |
| 89 | + outputMsg.result.push(data) |
| 90 | + } else { |
| 91 | + parseStream.pause() |
| 92 | + await that.emit('data', messages.newMessageWithBody(data)) |
| 93 | + parseStream.resume() |
| 94 | + } |
| 95 | + } |
| 96 | + } |
| 97 | + const writerStream = new CsvWriter() |
| 98 | + writerStream.logger = this.logger |
| 99 | + |
| 100 | + try { |
| 101 | + await pipeline( |
| 102 | + dataStream.data, |
| 103 | + parseStream, |
| 104 | + writerStream |
| 105 | + ) |
| 106 | + this.logger.info('File parsed successfully') |
| 107 | + } catch (err) { |
| 108 | + this.logger.error(`error during file parse: ${err}`); |
| 109 | + this.emit('error', `error during file parse: ${err}`) |
| 110 | + this.emit('end') |
| 111 | + return |
| 112 | + } |
| 113 | + |
| 114 | + if (emitAll) { |
| 115 | + await this.emit('data', messages.newMessageWithBody(outputMsg)) |
| 116 | + } |
| 117 | + this.logger.info(`Complete, memory used: ${process.memoryUsage().heapUsed / 1024 / 1024} Mb`) |
| 118 | +} |
| 119 | + |
| 120 | +module.exports.process = readCSV |
0 commit comments