diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..f22457a --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,45 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + + { + "type": "node", + "request": "launch", + "name": "Jest All", + "program": "${workspaceFolder}/node_modules/.bin/jest", + "args": [ + "--runInBand", + "--detectOpenHandles", + "--config", + "jest.config.json", + ], + "console": "integratedTerminal", + "internalConsoleOptions": "neverOpen", + "disableOptimisticBPs": true, + "windows": { + "program": "${workspaceFolder}/node_modules/jest/bin/jest", + } + }, + { + "type": "node", + "request": "launch", + "name": "Jest Current File", + "program": "${workspaceFolder}/node_modules/.bin/jest", + "args": [ + "${fileBasenameNoExtension}", + "--config", + "jest.config.json", + ], + "console": "integratedTerminal", + "internalConsoleOptions": "neverOpen", + "disableOptimisticBPs": true, + "windows": { + "program": "${workspaceFolder}/node_modules/jest/bin/jest", + }, + }, + + ] +} \ No newline at end of file diff --git a/index.js b/index.js index b2cdca5..62eadbd 100644 --- a/index.js +++ b/index.js @@ -2,5 +2,5 @@ module.exports = { workerFactory: require('./src/workerFactory'), - Router : require('./src/Router') + Router: require('./src/Router') }; diff --git a/package.json b/package.json index 136c438..71912b0 100644 --- a/package.json +++ b/package.json @@ -26,6 +26,7 @@ "dependencies": { "aws-sdk": "^2.286.2", "bluebird": "^3.5.1", + "jaeger-tracer": "git+https://github.com/infrasrc/jaeger-tracer.git", "joi": "^13.5.2", "lodash": ">=4.17.11", "sqs-consumer": "^5.2.0" diff --git a/src/Router.js b/src/Router.js index 38102af..8a04c43 100644 --- a/src/Router.js +++ b/src/Router.js @@ -1,9 +1,10 @@ const Joi = require('joi'); - +const _ = require('lodash/fp'); class Router { - constructor() { + constructor(config) { + this.trace = _.getOr(false, 'trace')(config); this._controllers = new Map(); } @@ -21,7 +22,7 @@ class Router { } const validationResult = Joi.validate(config, Joi.object({ - handler : Joi.func().maxArity(2).required(), + handler: Joi.func().maxArity(3).required(), validation: Joi.object({ schema: Joi.object({ isJoi: Joi.valid(true).error(new Error('schema joi can be Joi schema only')) diff --git a/src/Worker.js b/src/Worker.js index 26322a4..65a636c 100644 --- a/src/Worker.js +++ b/src/Worker.js @@ -1,26 +1,36 @@ 'use strict'; const EventEmitter = require('events'); -const _ = require('lodash/fp'); -const Joi = require('joi'); - +const _ = require('lodash/fp'); +const Joi = require('joi'); +const Tracer = require('jaeger-tracer'); +const { Tags, FORMAT_TEXT_MAP, globalTracer } = Tracer.opentracing; +const TWO_HOURS_IN_MS = 2 * 60 * 60 * 1000; const messageStatuses = { processing: 'PROCESSING', - proceed : 'PROCEED' + proceed: 'PROCEED' }; +const timeout = (span, endTrace) => { + span.timeout = setTimeout(() => { + span.setTag("span.timeout", true); + endTrace(span); + }, TWO_HOURS_IN_MS); + return span; +} class Worker extends EventEmitter { constructor(consumer, router) { super(); this._consumer = consumer; this.router = router; + this.tracer = globalTracer(); } get messageSchema() { return Joi.object({ - type : Joi.any().valid(_.values([...this.router.getRegisteredTypes()])).required(), + type: Joi.any().valid(_.values([...this.router.getRegisteredTypes()])).required(), content: Joi.any().required() }).required().unknown(false); } @@ -47,19 +57,66 @@ class Worker extends EventEmitter { this._queue.stop(); } + startTrace(traceId) { + const carrier = { + "uber-trace-id": traceId + }; + let spanOptions = { + tags: { [Tags.SPAN_KIND]: Tags.SPAN_KIND_MESSAGING_CONSUMER } + } + + if (traceId) { + spanOptions.childOf = this.tracer.extract(FORMAT_TEXT_MAP, carrier); + } + + return timeout(this.tracer.startSpan('handleMessage', spanOptions), this.endTrace); + } + + endTrace(span) { + if (span) { + if (span.finish instanceof Function) span.finish(); + if (span.timeout) clearTimeout(span.timeout); + } + } + + logEvent(span, event, value) { + if ((!span) || (!span.log instanceof Function)) return; + span.log({ event, value }); + } + + logError(span, errorObject, message, stack) { + if ((!span) || (!span.setTag instanceof Function)) return; + span.setTag(Tags.ERROR, true); + if (!span.log instanceof Function) return; + span.log({ 'event': 'error', 'error.object': errorObject, 'message': message, 'stack': stack }); + } + + + async _handleMessage(message, attributes) { + let span = null; try { const jsonMessage = JSON.parse(message); if (this._validateMessage(jsonMessage, this.messageSchema).error) return; - const controller = this.router.get(jsonMessage.type); + const controller = this.router.get(jsonMessage.type); const contentValidationResult = this._validateMessage(jsonMessage.content, controller.validation.schema); if (contentValidationResult.error) return; this.emit('message', { type: jsonMessage.type, status: messageStatuses.processing }); - await controller.handler(contentValidationResult.value, attributes); + + if (this.router.trace) { + const traceId = _.getOr(null, 'traceId.StringValue')(attributes); + span = this.startTrace(traceId); + this.logEvent(span, 'handleMessage Request', { messageContent: jsonMessage, attributes }); + } + await controller.handler(contentValidationResult.value, attributes, span); + this.endTrace(span); this.emit('message', { type: jsonMessage.type, status: messageStatuses.proceed }); + } catch (error) { + this.logError(span, error, error.message, error.stack); + this.endTrace(span); this.emit('error', error); } } diff --git a/src/consumers/SqsConsumer.js b/src/consumers/SqsConsumer.js index 89e6d57..39d1b85 100644 --- a/src/consumers/SqsConsumer.js +++ b/src/consumers/SqsConsumer.js @@ -1,6 +1,6 @@ 'use strict'; -const AWS = require('aws-sdk'); +const AWS = require('aws-sdk'); const { Consumer } = require('sqs-consumer'); const BaseConsumer = require('./BaseConsumer'); @@ -18,23 +18,23 @@ class SqsConsumer extends BaseConsumer { super(); AWS.config.update({ - region : options.aws.credentials.region || process.env.AWS_REGION, - accessKeyId : options.aws.credentials.accessKeyId || process.env.AWS_ACCESS_KEY_ID, + region: options.aws.credentials.region || process.env.AWS_REGION, + accessKeyId: options.aws.credentials.accessKeyId || process.env.AWS_ACCESS_KEY_ID, secretAccessKey: options.aws.credentials.secretAccessKey || process.env.AWS_SECRET_ACCESS_KEY }); this.batchSize = options.aws.batchSize || 10; this.sqsClient = options.aws.sqsClient; - this.queueUrl = options.aws.queueUrl; + this.queueUrl = options.aws.queueUrl; } createConsumer(handler) { return Consumer.create({ - messageAttributeNames : ['All'], - queueUrl : this.queueUrl, - sqs : this.sqsClient, + messageAttributeNames: ['All'], + queueUrl: this.queueUrl, + sqs: this.sqsClient, handleMessage: async message => await handler(message.Body, message.MessageAttributes), - batchSize : this.batchSize + batchSize: this.batchSize }).on('error', error => this.emit('error', error)) .on('timeout_error', error => this.emit('error', error)) .on('processing_error', error => this.emit('message_error', error)) diff --git a/test/spec/unit-tests/Router.ut.spec.js b/test/spec/unit-tests/Router.ut.spec.js index 58c35bb..fafe826 100644 --- a/test/spec/unit-tests/Router.ut.spec.js +++ b/test/spec/unit-tests/Router.ut.spec.js @@ -5,7 +5,7 @@ const Router = require('../../../src/Router'); test('add - router contains 0 registered types - should add new type', async () => { - const router = new Router(); + const router = new Router(); const routerConfig = { handler: (msg, attributes) => { } @@ -16,7 +16,7 @@ test('add - router contains 0 registered types - should add new type', async () }); test('add - trying to add route without handler - should throw Error', (done) => { - const router = new Router(); + const router = new Router(); const routerConfig = {}; try { router.add('TEST_ROUTE_1', routerConfig); @@ -26,7 +26,7 @@ test('add - trying to add route without handler - should throw Error', (done) => }); test('add - trying to add route twice - should throw Error', (done) => { - const router = new Router(); + const router = new Router(); const routerConfig = { handler: (msg, attributes) => { } @@ -44,7 +44,7 @@ test('add - trying to add route twice - should throw Error', (done) => { }); test('keys - router contains 1 registered types - should return 1 type', async () => { - const router = new Router(); + const router = new Router(); const routerConfig = { handler: (msg, attributes) => { } diff --git a/test/spec/unit-tests/Worker.ut.spec.js b/test/spec/unit-tests/Worker.ut.spec.js index ee4d5ac..7944cdd 100644 --- a/test/spec/unit-tests/Worker.ut.spec.js +++ b/test/spec/unit-tests/Worker.ut.spec.js @@ -1,17 +1,17 @@ /* eslint no-unused-vars:off */ 'use strict'; -const Promise = require('bluebird'); -const Joi = require('joi'); -const Worker = require('../../../src/Worker'); -const Router = require('../../../src/Router'); +const Promise = require('bluebird'); +const Joi = require('joi'); +const Worker = require('../../../src/Worker'); +const Router = require('../../../src/Router'); const ConsumerMock = require('./mocks/Consumer'); test('start - 1 valid message and undefined atttributes with existing controller - should call TEST_CONTROLLER1 handler', async (done) => { - const consumerStub = new ConsumerMock(); + const consumerStub = new ConsumerMock(); const expectedMessage = { - type : 'TEST_CONTROLLER1', + type: 'TEST_CONTROLLER1', content: { age: 19 } @@ -19,7 +19,7 @@ test('start - 1 valid message and undefined atttributes with existing controller consumerStub.injectFakeResponseData([JSON.stringify(expectedMessage)]); const router = new Router(); router.add('TEST_CONTROLLER1', { - handler : (msg, attributes) => { + handler: (msg, attributes) => { expect(msg).toEqual(expectedMessage.content); expect(attributes).toEqual(undefined); done(); @@ -36,9 +36,9 @@ test('start - 1 valid message and undefined atttributes with existing controller }); test('start - 1 valid message without existing controller - should not call TEST_CONTROLLER1 handler & emit error', async (done) => { - const consumerStub = new ConsumerMock(); + const consumerStub = new ConsumerMock(); const expectedMessage = { - type : 'TEST_CONTROLLER1', + type: 'TEST_CONTROLLER1', content: { age: 19 } @@ -58,9 +58,9 @@ test('start - 1 valid message without existing controller - should not call TEST }); test('start - 1 invalid message with existing controller - should not call TEST_CONTROLLER1 handler', async (done) => { - const consumerStub = new ConsumerMock(); + const consumerStub = new ConsumerMock(); const expectedMessage = { - type : 'TEST_CONTROLLER1', + type: 'TEST_CONTROLLER1', content: { age: 19 } @@ -68,7 +68,7 @@ test('start - 1 invalid message with existing controller - should not call TEST_ consumerStub.injectFakeResponseData([JSON.stringify(expectedMessage)]); const router = new Router(); router.add('TEST_CONTROLLER1', { - handler : (msg, attributes) => { + handler: (msg, attributes) => { done('should not be called'); }, validation: { schema: Joi.string() @@ -82,36 +82,36 @@ test('start - 1 invalid message with existing controller - should not call TEST_ }); test('start - 1 valid message and atttributes array with existing controller - should call TEST_CONTROLLER1 handler', async (done) => { - const consumerStub = new ConsumerMock(); + const consumerStub = new ConsumerMock(); const expectedMessage = { - type : 'TEST_CONTROLLER1', + type: 'TEST_CONTROLLER1', content: { age: 19 } }; const expectedMessageAttributes = { - sender : { - StringValue : 'test', + sender: { + StringValue: 'test', StringListValues: [], BinaryListValues: [], - DataType : 'String' + DataType: 'String' }, version: { - StringValue : '1', + StringValue: '1', StringListValues: [], BinaryListValues: [], - DataType : 'Number' + DataType: 'Number' } }; - + consumerStub.injectFakeResponseData([JSON.stringify(expectedMessage)], expectedMessageAttributes); const router = new Router(); router.add('TEST_CONTROLLER1', { - handler : (msg, attributes) => { + handler: (msg, attributes) => { expect(msg).toEqual(expectedMessage.content); expect(attributes).toEqual(expectedMessageAttributes.content); done(); diff --git a/test/spec/unit-tests/mocks/Consumer.js b/test/spec/unit-tests/mocks/Consumer.js index 0890230..feb4166 100644 --- a/test/spec/unit-tests/mocks/Consumer.js +++ b/test/spec/unit-tests/mocks/Consumer.js @@ -1,12 +1,12 @@ 'use strict'; -const _ = require('lodash/fp'); +const _ = require('lodash/fp'); const EventEmitter = require('events'); class ConsumerStub { constructor() { this.injectedData = []; - this.queueNumber = 0; + this.queueNumber = 0; } createConsumer(handler) {