Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 121 additions & 25 deletions __tests__/plugins/rabbitmq_plugin.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,128 @@
import { bigIntUtils } from '@hathor/wallet-lib';
import { eventHandlerFactory, getSettings } from '../../src/plugins/hathor_rabbitmq';

test('settings', () => {
const oldArgs = process.argv;
process.argv = [
'node', // not used but a value is required at this index
'a_script_file.js', // not used but a value is required at this index
'--plugin_rabbitmq_url', 'test-url',
'--plugin_rabbitmq_queue', 'test-queue',
];
const settings = getSettings();
expect(settings).toMatchObject({
url: 'test-url',
queue: 'test-queue',
});

// Restore original argv state
process.argv = oldArgs;
describe('RabbitMQ plugin settings', () => {
it('should throw an error if no settings are provided', () => {
const oldArgs = process.argv;
process.argv = [
'node', // not used but a value is required at this index
'a_script_file.js', // not used but a value is required at this index
];
expect(() => getSettings()).toThrow('You must provide a RabbitMQ URL');
process.argv = oldArgs;
});

it('should throw an error if no queue or exchange is provided', () => {
const oldArgs = process.argv;
process.argv = [
'node', // not used but a value is required at this index
'a_script_file.js', // not used but a value is required at this index
'--plugin_rabbitmq_url', 'test-url',
];
expect(() => getSettings()).toThrow('You must provide either a RabbitMQ queue or exchange');
process.argv = oldArgs;
});

it('should throw an error if both queue and exchange are provided', () => {
const oldArgs = process.argv;
process.argv = [
'node', // not used but a value is required at this index
'a_script_file.js', // not used but a value is required at this index
'--plugin_rabbitmq_url', 'test-url',
'--plugin_rabbitmq_queue', 'test-queue',
'--plugin_rabbitmq_exchange', 'test-exchange',
];
expect(() => getSettings()).toThrow('You must provide either a RabbitMQ queue or exchange, not both');
process.argv = oldArgs;
});

it('should throw an error if exchange is provided without routing key', () => {
const oldArgs = process.argv;
process.argv = [
'node', // not used but a value is required at this index
'a_script_file.js', // not used but a value is required at this index
'--plugin_rabbitmq_url', 'test-url',
'--plugin_rabbitmq_exchange', 'test-exchange',
];
expect(() => getSettings()).toThrow('You must provide a RabbitMQ routing key if you provide exchange. A blank routing key is acceptable though.');
process.argv = oldArgs;
});

it('should return the settings if everything is correct with queue configuration', () => {
const oldArgs = process.argv;
process.argv = [
'node', // not used but a value is required at this index
'a_script_file.js', // not used but a value is required at this index
'--plugin_rabbitmq_url', 'test-url',
'--plugin_rabbitmq_queue', 'test-queue',
];
const settings = getSettings();
expect(settings).toMatchObject({
url: 'test-url',
queue: 'test-queue',
});
process.argv = oldArgs;
});

it('should return the settings if everything is correct with exchange and routing key', () => {
const oldArgs = process.argv;
process.argv = [
'node', // not used but a value is required at this index
'a_script_file.js', // not used but a value is required at this index
'--plugin_rabbitmq_url', 'test-url',
'--plugin_rabbitmq_exchange', 'test-exchange',
'--plugin_rabbitmq_routing_key', 'test-routing-key',
];
const settings = getSettings();
expect(settings).toMatchObject({
url: 'test-url',
exchange: 'test-exchange',
routingKey: 'test-routing-key',
});
process.argv = oldArgs;
});

it('should return the settings if everything is correct with exchange and a blank routing key', () => {
const oldArgs = process.argv;
process.argv = [
'node', // not used but a value is required at this index
'a_script_file.js', // not used but a value is required at this index
'--plugin_rabbitmq_url', 'test-url',
'--plugin_rabbitmq_exchange', 'test-exchange',
'--plugin_rabbitmq_routing_key', '',
];
const settings = getSettings();
expect(settings).toMatchObject({
url: 'test-url',
exchange: 'test-exchange',
routingKey: '',
});
process.argv = oldArgs;
});
});

test('event handler', () => {
const channelMock = {
sendToQueue: jest.fn(),
};
const mockedSettings = { queue: 'test-queue' };
const evHandler = eventHandlerFactory(channelMock, mockedSettings);
const data = { test: 'event' };
describe('RabbitMQ plugin event handler', () => {
it('should return a function that sends to a queue', () => {
const channelMock = {
sendToQueue: jest.fn(),
};
const mockedSettings = { queue: 'test-queue' };
const evHandler = eventHandlerFactory(channelMock, mockedSettings);
const data = { test: 'event' };

evHandler(data);
expect(channelMock.sendToQueue).toHaveBeenCalledWith('test-queue', Buffer.from(bigIntUtils.JSONBigInt.stringify(data)));
evHandler(data);
expect(channelMock.sendToQueue).toHaveBeenCalledWith('test-queue', Buffer.from(bigIntUtils.JSONBigInt.stringify(data)));
});

it('should return a function that publishes to an exchange', () => {
const channelMock = {
publish: jest.fn(),
};
const mockedSettings = { exchange: 'test-exchange', routingKey: 'test-routing-key' };
const evHandler = eventHandlerFactory(channelMock, mockedSettings);
const data = { test: 'event' };

evHandler(data);
expect(channelMock.publish).toHaveBeenCalledWith('test-exchange', 'test-routing-key', Buffer.from(bigIntUtils.JSONBigInt.stringify(data)));
});
});
35 changes: 34 additions & 1 deletion src/plugins/hathor_rabbitmq.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,45 @@ export function getSettings() {
|| process.env.HEADLESS_PLUGIN_RABBITMQ_URL;
const queue = argv.plugin_rabbitmq_queue
|| process.env.HEADLESS_PLUGIN_RABBITMQ_QUEUE;
const exchange = argv.plugin_rabbitmq_exchange
|| process.env.HEADLESS_PLUGIN_RABBITMQ_EXCHANGE;
const routingKey = argv.plugin_rabbitmq_routing_key !== undefined
? argv.plugin_rabbitmq_routing_key
: process.env.HEADLESS_PLUGIN_RABBITMQ_ROUTING_KEY;

return { url, queue };
if (url === undefined) {
throw new Error('You must provide a RabbitMQ URL');
}

if (queue === undefined && exchange === undefined) {
throw new Error('You must provide either a RabbitMQ queue or exchange');
}

if (queue !== undefined && exchange !== undefined) {
throw new Error('You must provide either a RabbitMQ queue or exchange, not both');
}

if (exchange !== undefined && routingKey === undefined) {
throw new Error('You must provide a RabbitMQ routing key if you provide exchange. A blank routing key is acceptable though.');
}

return { url, queue, exchange, routingKey };
}

export function eventHandlerFactory(channel, settings) {
if (settings.exchange !== undefined) {
return data => {
// Check https://amqp-node.github.io/amqplib/channel_api.html#channel_publish
channel.publish(
settings.exchange,
settings.routingKey,
Buffer.from(bigIntUtils.JSONBigInt.stringify(data))
);
};
}

return data => {
// Check https://amqp-node.github.io/amqplib/channel_api.html#channel_sendToQueue
channel.sendToQueue(settings.queue, Buffer.from(bigIntUtils.JSONBigInt.stringify(data)));
};
}
Expand Down