Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [

{
"type": "node",
"request": "launch",
"name": "Jest All",
"program": "${workspaceFolder}/node_modules/.bin/jest",
"args": [
"--runInBand",
"--detectOpenHandles",
"--config",
"jest.config.json",
],
"console": "integratedTerminal",
"internalConsoleOptions": "neverOpen",
"disableOptimisticBPs": true,
"windows": {
"program": "${workspaceFolder}/node_modules/jest/bin/jest",
}
},
{
"type": "node",
"request": "launch",
"name": "Jest Current File",
"program": "${workspaceFolder}/node_modules/.bin/jest",
"args": [
"${fileBasenameNoExtension}",
"--config",
"jest.config.json",
],
"console": "integratedTerminal",
"internalConsoleOptions": "neverOpen",
"disableOptimisticBPs": true,
"windows": {
"program": "${workspaceFolder}/node_modules/jest/bin/jest",
},
},

]
}
2 changes: 1 addition & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@

module.exports = {
workerFactory: require('./src/workerFactory'),
Router : require('./src/Router')
Router: require('./src/Router')
};
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"dependencies": {
"aws-sdk": "^2.286.2",
"bluebird": "^3.5.1",
"jaeger-tracer": "git+https://github.com/infrasrc/jaeger-tracer.git",
"joi": "^13.5.2",
"lodash": ">=4.17.11",
"sqs-consumer": "^5.2.0"
Expand Down
7 changes: 4 additions & 3 deletions src/Router.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@

const Joi = require('joi');

const _ = require('lodash/fp');

class Router {
constructor() {
constructor(config) {
this.trace = _.getOr(false, 'trace')(config);
this._controllers = new Map();
}

Expand All @@ -21,7 +22,7 @@ class Router {
}

const validationResult = Joi.validate(config, Joi.object({
handler : Joi.func().maxArity(2).required(),
handler: Joi.func().maxArity(3).required(),
validation: Joi.object({
schema: Joi.object({
isJoi: Joi.valid(true).error(new Error('schema joi can be Joi schema only'))
Expand Down
71 changes: 64 additions & 7 deletions src/Worker.js
Original file line number Diff line number Diff line change
@@ -1,26 +1,36 @@
'use strict';

const EventEmitter = require('events');
const _ = require('lodash/fp');
const Joi = require('joi');

const _ = require('lodash/fp');
const Joi = require('joi');
const Tracer = require('jaeger-tracer');
const { Tags, FORMAT_TEXT_MAP, globalTracer } = Tracer.opentracing;
const TWO_HOURS_IN_MS = 2 * 60 * 60 * 1000;

const messageStatuses = {
processing: 'PROCESSING',
proceed : 'PROCEED'
proceed: 'PROCEED'
};

const timeout = (span, endTrace) => {
span.timeout = setTimeout(() => {
span.setTag("span.timeout", true);
endTrace(span);
}, TWO_HOURS_IN_MS);
return span;
}

class Worker extends EventEmitter {
constructor(consumer, router) {
super();
this._consumer = consumer;
this.router = router;
this.tracer = globalTracer();
}

get messageSchema() {
return Joi.object({
type : Joi.any().valid(_.values([...this.router.getRegisteredTypes()])).required(),
type: Joi.any().valid(_.values([...this.router.getRegisteredTypes()])).required(),
content: Joi.any().required()
}).required().unknown(false);
}
Expand All @@ -47,19 +57,66 @@ class Worker extends EventEmitter {
this._queue.stop();
}

startTrace(traceId) {
const carrier = {
"uber-trace-id": traceId
};
let spanOptions = {
tags: { [Tags.SPAN_KIND]: Tags.SPAN_KIND_MESSAGING_CONSUMER }
}

if (traceId) {
spanOptions.childOf = this.tracer.extract(FORMAT_TEXT_MAP, carrier);
}

return timeout(this.tracer.startSpan('handleMessage', spanOptions), this.endTrace);
}

endTrace(span) {
if (span) {
if (span.finish instanceof Function) span.finish();
if (span.timeout) clearTimeout(span.timeout);
}
}

logEvent(span, event, value) {
if ((!span) || (!span.log instanceof Function)) return;
span.log({ event, value });
}

logError(span, errorObject, message, stack) {
if ((!span) || (!span.setTag instanceof Function)) return;
span.setTag(Tags.ERROR, true);
if (!span.log instanceof Function) return;
span.log({ 'event': 'error', 'error.object': errorObject, 'message': message, 'stack': stack });
}



async _handleMessage(message, attributes) {
let span = null;
try {
const jsonMessage = JSON.parse(message);
if (this._validateMessage(jsonMessage, this.messageSchema).error) return;

const controller = this.router.get(jsonMessage.type);
const controller = this.router.get(jsonMessage.type);
const contentValidationResult = this._validateMessage(jsonMessage.content, controller.validation.schema);
if (contentValidationResult.error) return;

this.emit('message', { type: jsonMessage.type, status: messageStatuses.processing });
await controller.handler(contentValidationResult.value, attributes);

if (this.router.trace) {
const traceId = _.getOr(null, 'traceId.StringValue')(attributes);
span = this.startTrace(traceId);
this.logEvent(span, 'handleMessage Request', { messageContent: jsonMessage, attributes });
}
await controller.handler(contentValidationResult.value, attributes, span);
this.endTrace(span);
this.emit('message', { type: jsonMessage.type, status: messageStatuses.proceed });

} catch (error) {
this.logError(span, error, error.message, error.stack);
this.endTrace(span);
this.emit('error', error);
}
}
Expand Down
16 changes: 8 additions & 8 deletions src/consumers/SqsConsumer.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict';

const AWS = require('aws-sdk');
const AWS = require('aws-sdk');
const { Consumer } = require('sqs-consumer');
const BaseConsumer = require('./BaseConsumer');

Expand All @@ -18,23 +18,23 @@ class SqsConsumer extends BaseConsumer {
super();

AWS.config.update({
region : options.aws.credentials.region || process.env.AWS_REGION,
accessKeyId : options.aws.credentials.accessKeyId || process.env.AWS_ACCESS_KEY_ID,
region: options.aws.credentials.region || process.env.AWS_REGION,
accessKeyId: options.aws.credentials.accessKeyId || process.env.AWS_ACCESS_KEY_ID,
secretAccessKey: options.aws.credentials.secretAccessKey || process.env.AWS_SECRET_ACCESS_KEY
});

this.batchSize = options.aws.batchSize || 10;
this.sqsClient = options.aws.sqsClient;
this.queueUrl = options.aws.queueUrl;
this.queueUrl = options.aws.queueUrl;
}

createConsumer(handler) {
return Consumer.create({
messageAttributeNames : ['All'],
queueUrl : this.queueUrl,
sqs : this.sqsClient,
messageAttributeNames: ['All'],
queueUrl: this.queueUrl,
sqs: this.sqsClient,
handleMessage: async message => await handler(message.Body, message.MessageAttributes),
batchSize : this.batchSize
batchSize: this.batchSize
}).on('error', error => this.emit('error', error))
.on('timeout_error', error => this.emit('error', error))
.on('processing_error', error => this.emit('message_error', error))
Expand Down
8 changes: 4 additions & 4 deletions test/spec/unit-tests/Router.ut.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const Router = require('../../../src/Router');


test('add - router contains 0 registered types - should add new type', async () => {
const router = new Router();
const router = new Router();
const routerConfig = {
handler: (msg, attributes) => {
}
Expand All @@ -16,7 +16,7 @@ test('add - router contains 0 registered types - should add new type', async ()
});

test('add - trying to add route without handler - should throw Error', (done) => {
const router = new Router();
const router = new Router();
const routerConfig = {};
try {
router.add('TEST_ROUTE_1', routerConfig);
Expand All @@ -26,7 +26,7 @@ test('add - trying to add route without handler - should throw Error', (done) =>
});

test('add - trying to add route twice - should throw Error', (done) => {
const router = new Router();
const router = new Router();
const routerConfig = {
handler: (msg, attributes) => {
}
Expand All @@ -44,7 +44,7 @@ test('add - trying to add route twice - should throw Error', (done) => {
});

test('keys - router contains 1 registered types - should return 1 type', async () => {
const router = new Router();
const router = new Router();
const routerConfig = {
handler: (msg, attributes) => {
}
Expand Down
42 changes: 21 additions & 21 deletions test/spec/unit-tests/Worker.ut.spec.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
/* eslint no-unused-vars:off */
'use strict';

const Promise = require('bluebird');
const Joi = require('joi');
const Worker = require('../../../src/Worker');
const Router = require('../../../src/Router');
const Promise = require('bluebird');
const Joi = require('joi');
const Worker = require('../../../src/Worker');
const Router = require('../../../src/Router');
const ConsumerMock = require('./mocks/Consumer');


test('start - 1 valid message and undefined atttributes with existing controller - should call TEST_CONTROLLER1 handler', async (done) => {
const consumerStub = new ConsumerMock();
const consumerStub = new ConsumerMock();
const expectedMessage = {
type : 'TEST_CONTROLLER1',
type: 'TEST_CONTROLLER1',
content: {
age: 19
}
};
consumerStub.injectFakeResponseData([JSON.stringify(expectedMessage)]);
const router = new Router();
router.add('TEST_CONTROLLER1', {
handler : (msg, attributes) => {
handler: (msg, attributes) => {
expect(msg).toEqual(expectedMessage.content);
expect(attributes).toEqual(undefined);
done();
Expand All @@ -36,9 +36,9 @@ test('start - 1 valid message and undefined atttributes with existing controller
});

test('start - 1 valid message without existing controller - should not call TEST_CONTROLLER1 handler & emit error', async (done) => {
const consumerStub = new ConsumerMock();
const consumerStub = new ConsumerMock();
const expectedMessage = {
type : 'TEST_CONTROLLER1',
type: 'TEST_CONTROLLER1',
content: {
age: 19
}
Expand All @@ -58,17 +58,17 @@ test('start - 1 valid message without existing controller - should not call TEST
});

test('start - 1 invalid message with existing controller - should not call TEST_CONTROLLER1 handler', async (done) => {
const consumerStub = new ConsumerMock();
const consumerStub = new ConsumerMock();
const expectedMessage = {
type : 'TEST_CONTROLLER1',
type: 'TEST_CONTROLLER1',
content: {
age: 19
}
};
consumerStub.injectFakeResponseData([JSON.stringify(expectedMessage)]);
const router = new Router();
router.add('TEST_CONTROLLER1', {
handler : (msg, attributes) => {
handler: (msg, attributes) => {
done('should not be called');
}, validation: {
schema: Joi.string()
Expand All @@ -82,36 +82,36 @@ test('start - 1 invalid message with existing controller - should not call TEST_
});

test('start - 1 valid message and atttributes array with existing controller - should call TEST_CONTROLLER1 handler', async (done) => {
const consumerStub = new ConsumerMock();
const consumerStub = new ConsumerMock();
const expectedMessage = {
type : 'TEST_CONTROLLER1',
type: 'TEST_CONTROLLER1',
content: {
age: 19
}
};

const expectedMessageAttributes = {
sender : {
StringValue : 'test',
sender: {
StringValue: 'test',
StringListValues: [],
BinaryListValues: [],
DataType : 'String'
DataType: 'String'
},
version: {
StringValue : '1',
StringValue: '1',
StringListValues: [],
BinaryListValues: [],
DataType : 'Number'
DataType: 'Number'
}
};



consumerStub.injectFakeResponseData([JSON.stringify(expectedMessage)], expectedMessageAttributes);

const router = new Router();
router.add('TEST_CONTROLLER1', {
handler : (msg, attributes) => {
handler: (msg, attributes) => {
expect(msg).toEqual(expectedMessage.content);
expect(attributes).toEqual(expectedMessageAttributes.content);
done();
Expand Down
4 changes: 2 additions & 2 deletions test/spec/unit-tests/mocks/Consumer.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
'use strict';

const _ = require('lodash/fp');
const _ = require('lodash/fp');
const EventEmitter = require('events');

class ConsumerStub {
constructor() {
this.injectedData = [];
this.queueNumber = 0;
this.queueNumber = 0;
}

createConsumer(handler) {
Expand Down