From 138517714cc343e75fd1634b3203b1bcbfe7b9e2 Mon Sep 17 00:00:00 2001 From: Rotem Levi Date: Mon, 3 Feb 2020 11:47:53 +0200 Subject: [PATCH 1/6] add tracing --- package.json | 1 + src/Router.js | 3 ++- src/Worker.js | 50 +++++++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 50 insertions(+), 4 deletions(-) diff --git a/package.json b/package.json index 136c438..71912b0 100644 --- a/package.json +++ b/package.json @@ -26,6 +26,7 @@ "dependencies": { "aws-sdk": "^2.286.2", "bluebird": "^3.5.1", + "jaeger-tracer": "git+https://github.com/infrasrc/jaeger-tracer.git", "joi": "^13.5.2", "lodash": ">=4.17.11", "sqs-consumer": "^5.2.0" diff --git a/src/Router.js b/src/Router.js index 38102af..d72db2f 100644 --- a/src/Router.js +++ b/src/Router.js @@ -3,7 +3,8 @@ const Joi = require('joi'); class Router { - constructor() { + constructor(config) { + this.trace = _.getOr(false, 'trace')(config); this._controllers = new Map(); } diff --git a/src/Worker.js b/src/Worker.js index 26322a4..78074eb 100644 --- a/src/Worker.js +++ b/src/Worker.js @@ -3,7 +3,8 @@ const EventEmitter = require('events'); const _ = require('lodash/fp'); const Joi = require('joi'); - +const Tracer = require('jaeger-tracer'); +const { Tags, FORMAT_TEXT_MAP, globalTracer } = Tracer.opentracing; const messageStatuses = { processing: 'PROCESSING', @@ -16,6 +17,7 @@ class Worker extends EventEmitter { super(); this._consumer = consumer; this.router = router; + this.tracer = globalTracer(); } get messageSchema() { @@ -47,7 +49,39 @@ class Worker extends EventEmitter { this._queue.stop(); } + startTrace(traceId){ + const carrier = { + "uber-trace-id": traceId + }; + let spanOptions = { + tags: { [Tags.SPAN_KIND]: Tags.SPAN_KIND_MESSAGING_CONSUMER } + } + + if (traceId) { + spanOptions.childOf = this.tracer.extract(FORMAT_TEXT_MAP, carrier); + } + + return this.tracer.startSpan('handleMessage', spanOptions); + } + + endTrace(span){ + if((span) && (span.finish instanceof Function)) span.finish(); + } + + logEvent(span, event, value) { + if((!span) || (!span.log instanceof Function)) return; + span.log({ event, value }); + } + + logError(span, errorObject, message, stack) { + if((!span) || (!span.setTag instanceof Function)) return; + span.setTag(Tags.ERROR, true); + if(!span.log instanceof Function) return; + span.log({ 'event': 'error', 'error.object': errorObject, 'message': message, 'stack': stack }); + } + async _handleMessage(message, attributes) { + let span = null; try { const jsonMessage = JSON.parse(message); if (this._validateMessage(jsonMessage, this.messageSchema).error) return; @@ -57,9 +91,19 @@ class Worker extends EventEmitter { if (contentValidationResult.error) return; this.emit('message', { type: jsonMessage.type, status: messageStatuses.processing }); - await controller.handler(contentValidationResult.value, attributes); + + if(this.router.trace) { + const traceId = _.getOr(null, 'traceId.StringValue')(attributes); + span = this.startTrace(traceId); + this.logEvent(span, 'handleMessage Request', { messageContent: jsonMessage, attributes }); + } + await controller.handler(contentValidationResult.value, attributes, span); + this.endTrace(span); this.emit('message', { type: jsonMessage.type, status: messageStatuses.proceed }); - } catch (error) { + + } catch (error) { + this.logError(span, error, error.message, error.stack); + this.endTrace(span); this.emit('error', error); } } From 96336ed1d57ef67200fdd5a2159d812696dd2621 Mon Sep 17 00:00:00 2001 From: Rotem Levi Date: Mon, 3 Feb 2020 11:52:56 +0200 Subject: [PATCH 2/6] routerhandler gets span --- src/Router.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Router.js b/src/Router.js index d72db2f..a8fd5e2 100644 --- a/src/Router.js +++ b/src/Router.js @@ -22,7 +22,7 @@ class Router { } const validationResult = Joi.validate(config, Joi.object({ - handler : Joi.func().maxArity(2).required(), + handler : Joi.func().maxArity(3).required(), validation: Joi.object({ schema: Joi.object({ isJoi: Joi.valid(true).error(new Error('schema joi can be Joi schema only')) From cdf472e5cb327d3b5764561735ccf2c85723c4ac Mon Sep 17 00:00:00 2001 From: Rotem Levi Date: Mon, 3 Feb 2020 12:01:58 +0200 Subject: [PATCH 3/6] missing loadash from Router.js --- src/Router.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Router.js b/src/Router.js index a8fd5e2..8bfd234 100644 --- a/src/Router.js +++ b/src/Router.js @@ -1,6 +1,6 @@ const Joi = require('joi'); - +const _ = require('lodash/fp'); class Router { constructor(config) { From d0d1fc18940d7a5db5fd743bed7752855107a6d9 Mon Sep 17 00:00:00 2001 From: Rotem Levi Date: Mon, 3 Feb 2020 12:04:30 +0200 Subject: [PATCH 4/6] add lunch.json --- .vscode/launch.json | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 .vscode/launch.json diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..f22457a --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,45 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + + { + "type": "node", + "request": "launch", + "name": "Jest All", + "program": "${workspaceFolder}/node_modules/.bin/jest", + "args": [ + "--runInBand", + "--detectOpenHandles", + "--config", + "jest.config.json", + ], + "console": "integratedTerminal", + "internalConsoleOptions": "neverOpen", + "disableOptimisticBPs": true, + "windows": { + "program": "${workspaceFolder}/node_modules/jest/bin/jest", + } + }, + { + "type": "node", + "request": "launch", + "name": "Jest Current File", + "program": "${workspaceFolder}/node_modules/.bin/jest", + "args": [ + "${fileBasenameNoExtension}", + "--config", + "jest.config.json", + ], + "console": "integratedTerminal", + "internalConsoleOptions": "neverOpen", + "disableOptimisticBPs": true, + "windows": { + "program": "${workspaceFolder}/node_modules/jest/bin/jest", + }, + }, + + ] +} \ No newline at end of file From 750ad21e4220caf0a06b90f5f21a397d62b42252 Mon Sep 17 00:00:00 2001 From: Rotem Levi Date: Mon, 17 Feb 2020 12:24:05 +0200 Subject: [PATCH 5/6] format indent, add two hours timeout to close span after opening a span. --- index.js | 2 +- src/Router.js | 6 ++-- src/Worker.js | 49 ++++++++++++++++---------- src/consumers/SqsConsumer.js | 16 ++++----- test/spec/unit-tests/Router.ut.spec.js | 8 ++--- test/spec/unit-tests/Worker.ut.spec.js | 42 +++++++++++----------- test/spec/unit-tests/mocks/Consumer.js | 4 +-- 7 files changed, 70 insertions(+), 57 deletions(-) diff --git a/index.js b/index.js index b2cdca5..62eadbd 100644 --- a/index.js +++ b/index.js @@ -2,5 +2,5 @@ module.exports = { workerFactory: require('./src/workerFactory'), - Router : require('./src/Router') + Router: require('./src/Router') }; diff --git a/src/Router.js b/src/Router.js index 8bfd234..8a04c43 100644 --- a/src/Router.js +++ b/src/Router.js @@ -1,10 +1,10 @@ const Joi = require('joi'); -const _ = require('lodash/fp'); +const _ = require('lodash/fp'); class Router { constructor(config) { - this.trace = _.getOr(false, 'trace')(config); + this.trace = _.getOr(false, 'trace')(config); this._controllers = new Map(); } @@ -22,7 +22,7 @@ class Router { } const validationResult = Joi.validate(config, Joi.object({ - handler : Joi.func().maxArity(3).required(), + handler: Joi.func().maxArity(3).required(), validation: Joi.object({ schema: Joi.object({ isJoi: Joi.valid(true).error(new Error('schema joi can be Joi schema only')) diff --git a/src/Worker.js b/src/Worker.js index 78074eb..75fe4a9 100644 --- a/src/Worker.js +++ b/src/Worker.js @@ -1,16 +1,24 @@ 'use strict'; const EventEmitter = require('events'); -const _ = require('lodash/fp'); -const Joi = require('joi'); +const _ = require('lodash/fp'); +const Joi = require('joi'); const Tracer = require('jaeger-tracer'); const { Tags, FORMAT_TEXT_MAP, globalTracer } = Tracer.opentracing; +const TWO_HOURS_IN_MS = 2 * 60 * 60 * 1000; const messageStatuses = { processing: 'PROCESSING', - proceed : 'PROCEED' + proceed: 'PROCEED' }; +const timeout = (span) => { + span.timeout = setTimeout(() => { + span.finish(); + span.setTag("span.timeout", true); + }, TWO_HOURS_IN_MS); + return span; +} class Worker extends EventEmitter { constructor(consumer, router) { @@ -22,7 +30,7 @@ class Worker extends EventEmitter { get messageSchema() { return Joi.object({ - type : Joi.any().valid(_.values([...this.router.getRegisteredTypes()])).required(), + type: Joi.any().valid(_.values([...this.router.getRegisteredTypes()])).required(), content: Joi.any().required() }).required().unknown(false); } @@ -49,11 +57,11 @@ class Worker extends EventEmitter { this._queue.stop(); } - startTrace(traceId){ + startTrace(traceId) { const carrier = { "uber-trace-id": traceId }; - let spanOptions = { + let spanOptions = { tags: { [Tags.SPAN_KIND]: Tags.SPAN_KIND_MESSAGING_CONSUMER } } @@ -61,38 +69,43 @@ class Worker extends EventEmitter { spanOptions.childOf = this.tracer.extract(FORMAT_TEXT_MAP, carrier); } - return this.tracer.startSpan('handleMessage', spanOptions); + return timeout(this.tracer.startSpan('handleMessage', spanOptions)); } - endTrace(span){ - if((span) && (span.finish instanceof Function)) span.finish(); + endTrace(span) { + if (span) { + if (span.finish instanceof Function) span.finish(); + if (span.timeout) clearTimeout(span.timeout); + } } logEvent(span, event, value) { - if((!span) || (!span.log instanceof Function)) return; + if ((!span) || (!span.log instanceof Function)) return; span.log({ event, value }); } logError(span, errorObject, message, stack) { - if((!span) || (!span.setTag instanceof Function)) return; + if ((!span) || (!span.setTag instanceof Function)) return; span.setTag(Tags.ERROR, true); - if(!span.log instanceof Function) return; + if (!span.log instanceof Function) return; span.log({ 'event': 'error', 'error.object': errorObject, 'message': message, 'stack': stack }); } - + + + async _handleMessage(message, attributes) { let span = null; try { const jsonMessage = JSON.parse(message); if (this._validateMessage(jsonMessage, this.messageSchema).error) return; - const controller = this.router.get(jsonMessage.type); + const controller = this.router.get(jsonMessage.type); const contentValidationResult = this._validateMessage(jsonMessage.content, controller.validation.schema); if (contentValidationResult.error) return; this.emit('message', { type: jsonMessage.type, status: messageStatuses.processing }); - - if(this.router.trace) { + + if (this.router.trace) { const traceId = _.getOr(null, 'traceId.StringValue')(attributes); span = this.startTrace(traceId); this.logEvent(span, 'handleMessage Request', { messageContent: jsonMessage, attributes }); @@ -100,8 +113,8 @@ class Worker extends EventEmitter { await controller.handler(contentValidationResult.value, attributes, span); this.endTrace(span); this.emit('message', { type: jsonMessage.type, status: messageStatuses.proceed }); - - } catch (error) { + + } catch (error) { this.logError(span, error, error.message, error.stack); this.endTrace(span); this.emit('error', error); diff --git a/src/consumers/SqsConsumer.js b/src/consumers/SqsConsumer.js index 89e6d57..39d1b85 100644 --- a/src/consumers/SqsConsumer.js +++ b/src/consumers/SqsConsumer.js @@ -1,6 +1,6 @@ 'use strict'; -const AWS = require('aws-sdk'); +const AWS = require('aws-sdk'); const { Consumer } = require('sqs-consumer'); const BaseConsumer = require('./BaseConsumer'); @@ -18,23 +18,23 @@ class SqsConsumer extends BaseConsumer { super(); AWS.config.update({ - region : options.aws.credentials.region || process.env.AWS_REGION, - accessKeyId : options.aws.credentials.accessKeyId || process.env.AWS_ACCESS_KEY_ID, + region: options.aws.credentials.region || process.env.AWS_REGION, + accessKeyId: options.aws.credentials.accessKeyId || process.env.AWS_ACCESS_KEY_ID, secretAccessKey: options.aws.credentials.secretAccessKey || process.env.AWS_SECRET_ACCESS_KEY }); this.batchSize = options.aws.batchSize || 10; this.sqsClient = options.aws.sqsClient; - this.queueUrl = options.aws.queueUrl; + this.queueUrl = options.aws.queueUrl; } createConsumer(handler) { return Consumer.create({ - messageAttributeNames : ['All'], - queueUrl : this.queueUrl, - sqs : this.sqsClient, + messageAttributeNames: ['All'], + queueUrl: this.queueUrl, + sqs: this.sqsClient, handleMessage: async message => await handler(message.Body, message.MessageAttributes), - batchSize : this.batchSize + batchSize: this.batchSize }).on('error', error => this.emit('error', error)) .on('timeout_error', error => this.emit('error', error)) .on('processing_error', error => this.emit('message_error', error)) diff --git a/test/spec/unit-tests/Router.ut.spec.js b/test/spec/unit-tests/Router.ut.spec.js index 58c35bb..fafe826 100644 --- a/test/spec/unit-tests/Router.ut.spec.js +++ b/test/spec/unit-tests/Router.ut.spec.js @@ -5,7 +5,7 @@ const Router = require('../../../src/Router'); test('add - router contains 0 registered types - should add new type', async () => { - const router = new Router(); + const router = new Router(); const routerConfig = { handler: (msg, attributes) => { } @@ -16,7 +16,7 @@ test('add - router contains 0 registered types - should add new type', async () }); test('add - trying to add route without handler - should throw Error', (done) => { - const router = new Router(); + const router = new Router(); const routerConfig = {}; try { router.add('TEST_ROUTE_1', routerConfig); @@ -26,7 +26,7 @@ test('add - trying to add route without handler - should throw Error', (done) => }); test('add - trying to add route twice - should throw Error', (done) => { - const router = new Router(); + const router = new Router(); const routerConfig = { handler: (msg, attributes) => { } @@ -44,7 +44,7 @@ test('add - trying to add route twice - should throw Error', (done) => { }); test('keys - router contains 1 registered types - should return 1 type', async () => { - const router = new Router(); + const router = new Router(); const routerConfig = { handler: (msg, attributes) => { } diff --git a/test/spec/unit-tests/Worker.ut.spec.js b/test/spec/unit-tests/Worker.ut.spec.js index ee4d5ac..7944cdd 100644 --- a/test/spec/unit-tests/Worker.ut.spec.js +++ b/test/spec/unit-tests/Worker.ut.spec.js @@ -1,17 +1,17 @@ /* eslint no-unused-vars:off */ 'use strict'; -const Promise = require('bluebird'); -const Joi = require('joi'); -const Worker = require('../../../src/Worker'); -const Router = require('../../../src/Router'); +const Promise = require('bluebird'); +const Joi = require('joi'); +const Worker = require('../../../src/Worker'); +const Router = require('../../../src/Router'); const ConsumerMock = require('./mocks/Consumer'); test('start - 1 valid message and undefined atttributes with existing controller - should call TEST_CONTROLLER1 handler', async (done) => { - const consumerStub = new ConsumerMock(); + const consumerStub = new ConsumerMock(); const expectedMessage = { - type : 'TEST_CONTROLLER1', + type: 'TEST_CONTROLLER1', content: { age: 19 } @@ -19,7 +19,7 @@ test('start - 1 valid message and undefined atttributes with existing controller consumerStub.injectFakeResponseData([JSON.stringify(expectedMessage)]); const router = new Router(); router.add('TEST_CONTROLLER1', { - handler : (msg, attributes) => { + handler: (msg, attributes) => { expect(msg).toEqual(expectedMessage.content); expect(attributes).toEqual(undefined); done(); @@ -36,9 +36,9 @@ test('start - 1 valid message and undefined atttributes with existing controller }); test('start - 1 valid message without existing controller - should not call TEST_CONTROLLER1 handler & emit error', async (done) => { - const consumerStub = new ConsumerMock(); + const consumerStub = new ConsumerMock(); const expectedMessage = { - type : 'TEST_CONTROLLER1', + type: 'TEST_CONTROLLER1', content: { age: 19 } @@ -58,9 +58,9 @@ test('start - 1 valid message without existing controller - should not call TEST }); test('start - 1 invalid message with existing controller - should not call TEST_CONTROLLER1 handler', async (done) => { - const consumerStub = new ConsumerMock(); + const consumerStub = new ConsumerMock(); const expectedMessage = { - type : 'TEST_CONTROLLER1', + type: 'TEST_CONTROLLER1', content: { age: 19 } @@ -68,7 +68,7 @@ test('start - 1 invalid message with existing controller - should not call TEST_ consumerStub.injectFakeResponseData([JSON.stringify(expectedMessage)]); const router = new Router(); router.add('TEST_CONTROLLER1', { - handler : (msg, attributes) => { + handler: (msg, attributes) => { done('should not be called'); }, validation: { schema: Joi.string() @@ -82,36 +82,36 @@ test('start - 1 invalid message with existing controller - should not call TEST_ }); test('start - 1 valid message and atttributes array with existing controller - should call TEST_CONTROLLER1 handler', async (done) => { - const consumerStub = new ConsumerMock(); + const consumerStub = new ConsumerMock(); const expectedMessage = { - type : 'TEST_CONTROLLER1', + type: 'TEST_CONTROLLER1', content: { age: 19 } }; const expectedMessageAttributes = { - sender : { - StringValue : 'test', + sender: { + StringValue: 'test', StringListValues: [], BinaryListValues: [], - DataType : 'String' + DataType: 'String' }, version: { - StringValue : '1', + StringValue: '1', StringListValues: [], BinaryListValues: [], - DataType : 'Number' + DataType: 'Number' } }; - + consumerStub.injectFakeResponseData([JSON.stringify(expectedMessage)], expectedMessageAttributes); const router = new Router(); router.add('TEST_CONTROLLER1', { - handler : (msg, attributes) => { + handler: (msg, attributes) => { expect(msg).toEqual(expectedMessage.content); expect(attributes).toEqual(expectedMessageAttributes.content); done(); diff --git a/test/spec/unit-tests/mocks/Consumer.js b/test/spec/unit-tests/mocks/Consumer.js index 0890230..feb4166 100644 --- a/test/spec/unit-tests/mocks/Consumer.js +++ b/test/spec/unit-tests/mocks/Consumer.js @@ -1,12 +1,12 @@ 'use strict'; -const _ = require('lodash/fp'); +const _ = require('lodash/fp'); const EventEmitter = require('events'); class ConsumerStub { constructor() { this.injectedData = []; - this.queueNumber = 0; + this.queueNumber = 0; } createConsumer(handler) { From 194fb1289899f1ca83255a23208ab429f2fa9a79 Mon Sep 17 00:00:00 2001 From: Rotem Levi Date: Mon, 17 Feb 2020 12:37:21 +0200 Subject: [PATCH 6/6] fix bug in timeout --- src/Worker.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Worker.js b/src/Worker.js index 75fe4a9..65a636c 100644 --- a/src/Worker.js +++ b/src/Worker.js @@ -12,10 +12,10 @@ const messageStatuses = { proceed: 'PROCEED' }; -const timeout = (span) => { - span.timeout = setTimeout(() => { - span.finish(); +const timeout = (span, endTrace) => { + span.timeout = setTimeout(() => { span.setTag("span.timeout", true); + endTrace(span); }, TWO_HOURS_IN_MS); return span; } @@ -69,7 +69,7 @@ class Worker extends EventEmitter { spanOptions.childOf = this.tracer.extract(FORMAT_TEXT_MAP, carrier); } - return timeout(this.tracer.startSpan('handleMessage', spanOptions)); + return timeout(this.tracer.startSpan('handleMessage', spanOptions), this.endTrace); } endTrace(span) {