Skip to content

Commit 364628e

Browse files
committed
rabbit mq event bus
1 parent c733cf6 commit 364628e

File tree

12 files changed

+138
-9
lines changed

12 files changed

+138
-9
lines changed

package-lock.json

Lines changed: 15 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
"dev": "NODE_ENV=dev ts-node-dev --ignore-watch node_modules --inspect=0.0.0.0:9267 ./src/apps/mooc_backend/server.ts",
1515
"dev:backoffice:frontend": "NODE_ENV=dev ts-node-dev --ignore-watch node_modules ./src/apps/backoffice/frontend/server.ts",
1616
"test": "npm run test:unit && npm run test:features",
17-
"test:unit": "NODE_ENV=test jest",
17+
"test:unit": "NODE_ENV=test jest --watch",
1818
"test:features": "NODE_ENV=test cucumber-js -p default",
1919
"lint": "tslint src/**/*.ts{,x}",
2020
"start": "NODE_ENV=production node dist/src/apps/mooc_backend/server",
@@ -25,6 +25,8 @@
2525
"build:clean": "rm -r dist; exit 0"
2626
},
2727
"dependencies": {
28+
"@types/amqplib": "^0.5.16",
29+
"@types/bluebird": "^3.5.33",
2830
"@types/bson": "^4.0.2",
2931
"@types/compression": "^1.7.0",
3032
"@types/convict": "^5.2.1",

src/Contexts/Mooc/Courses/domain/CourseCreatedDomainEvent.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import { DomainEvent } from '../../../Shared/domain/DomainEvent';
33
type CreateCourseDomainEventBody = {
44
readonly duration: string;
55
readonly name: string;
6+
readonly eventName: string;
67
};
78

89
export class CourseCreatedDomainEvent extends DomainEvent {
@@ -33,7 +34,8 @@ export class CourseCreatedDomainEvent extends DomainEvent {
3334
const { name, duration } = this;
3435
return {
3536
name,
36-
duration
37+
duration,
38+
eventName: CourseCreatedDomainEvent.EVENT_NAME
3739
};
3840
}
3941

src/Contexts/Shared/domain/EventBus.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ import { DomainEventSubscriber } from './DomainEventSubscriber';
44
export interface EventBus {
55
publish(events: Array<DomainEvent>): Promise<void>;
66
addSubscribers(subscribers: Array<DomainEventSubscriber<DomainEvent>>): void;
7+
start(): Promise<void>;
78
}

src/Contexts/Shared/infrastructure/EventBus/InMemoryAsyncEventBus.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ export class InMemoryAsyncEventBus implements EventBus {
1010
this.bus = new EventEmitterBus(subscribers);
1111
}
1212

13+
async start(): Promise<void> {}
14+
1315
async publish(events: DomainEvent[]): Promise<void> {
1416
this.bus.publish(events);
1517
}

src/Contexts/Shared/infrastructure/EventBus/InMemorySyncEventBus.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ export class InMemorySyncEventBus implements EventBus {
1414
this.subscriptions = new Map();
1515
}
1616

17+
async start(): Promise<void> {}
18+
1719
async publish(events: Array<DomainEvent>): Promise<void> {
1820
const executions: any = [];
1921
events.map(event => {
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
import { Connection, Message, Exchange, Queue } from 'amqp-ts';
2+
import config from '../../../../../apps/mooc_backend/config/config';
3+
import { EventBus } from '../../../domain/EventBus';
4+
import { DomainEvent } from '../../../domain/DomainEvent';
5+
import { DomainEventSubscriber } from '../../../domain/DomainEventSubscriber';
6+
import { DomainEventJsonDeserializer } from '../DomainEventJsonDeserializer';
7+
8+
export default class RabbitMqEventbus implements EventBus {
9+
private connection: Connection;
10+
private exchange: Exchange;
11+
private queue: Queue;
12+
private deserializer: DomainEventJsonDeserializer;
13+
private subscribers: Map<string, Array<DomainEventSubscriber<DomainEvent>>>;
14+
15+
constructor(deserializer: DomainEventJsonDeserializer) {
16+
const rabbitMQConfig = config.get('rabbitMQ');
17+
this.connection = new Connection(`amqp://${rabbitMQConfig.user}:${rabbitMQConfig.password}@${rabbitMQConfig.host}`);
18+
this.exchange = this.connection.declareExchange('domain_events_fanout', 'fanout', { durable: false });
19+
this.queue = this.connection.declareQueue('coursesTest', { exclusive: true });
20+
this.deserializer = deserializer;
21+
this.subscribers = new Map();
22+
}
23+
24+
async start(): Promise<void> {
25+
this.queue.bind(this.exchange);
26+
this.queue.activateConsumer(
27+
async message => {
28+
const event = this.deserializer.deserialize(message.content.toString());
29+
const subscribers = this.subscribers.get(event.eventName);
30+
const subscribersExecutions = subscribers!.map(subscriber => subscriber.on(event));
31+
await Promise.all(subscribersExecutions);
32+
},
33+
{ noAck: true }
34+
);
35+
}
36+
37+
async publish(events: Array<DomainEvent>): Promise<void> {
38+
const executions: any = [];
39+
events.map(event => {
40+
const message = new Message(event.toPrimitive());
41+
executions.push(this.exchange.send(message));
42+
});
43+
44+
await Promise.all(executions);
45+
}
46+
47+
addSubscribers(subscribers: Array<DomainEventSubscriber<DomainEvent>>): void {
48+
subscribers.map(subscriber => {
49+
this.addSubscriber(subscriber);
50+
});
51+
}
52+
53+
private addSubscriber(subscriber: DomainEventSubscriber<DomainEvent>): void {
54+
subscriber.subscribedTo().map(event => {
55+
const eventName = event.EVENT_NAME;
56+
if (this.subscribers.has(eventName)) {
57+
this.subscribers.get(eventName)!.push(subscriber);
58+
} else {
59+
this.subscribers.set(eventName, [subscriber]);
60+
}
61+
});
62+
}
63+
}

src/apps/mooc_backend/config/config.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,26 @@ const convictConfig = convict({
1414
env: 'MONGO_URL',
1515
default: 'mongodb://localhost:27017/dev'
1616
}
17+
},
18+
rabbitMQ: {
19+
host: {
20+
doc: 'The RabbitMQ connection host',
21+
format: String,
22+
env: 'RABBITMQ_HOST',
23+
default: 'localhost'
24+
},
25+
user: {
26+
doc: 'The RabbitMQ connection user',
27+
format: String,
28+
env: 'RABBITMQ_DEFAULT_USER',
29+
default: 'guest'
30+
},
31+
password: {
32+
doc: 'The RabbitMQ connection password',
33+
format: String,
34+
env: 'RABBITMQ_DEFAULT_PASS',
35+
default: 'guest'
36+
}
1737
}
1838
});
1939

src/apps/mooc_backend/config/dependency-injection/Shared/application.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ services:
1010
arguments: []
1111

1212
Mooc.shared.EventBus:
13-
class: ../../../../../Contexts/Shared/infrastructure/EventBus/InMemoryAsyncEventBus
14-
arguments: []
13+
class: ../../../../../Contexts/Shared/infrastructure/EventBus/RabbitMq/RabbitMqEventBus
14+
arguments: ['@Mooc.shared.EventBus.DomainEventJsonDeserializer']
1515

1616
Mooc.shared.CommandHandlersInformation:
1717
class: ../../../../../Contexts/Shared/infrastructure/CommandBus/CommandHandlersInformation
Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
import container from './config/dependency-injection';
2-
import { InMemoryAsyncEventBus } from '../../Contexts/Shared/infrastructure/EventBus/InMemoryAsyncEventBus';
32
import { Definition } from 'node-dependency-injection';
43
import { DomainEventSubscriber } from '../../Contexts/Shared/domain/DomainEventSubscriber';
54
import { DomainEvent } from '../../Contexts/Shared/domain/DomainEvent';
5+
import { EventBus } from '../../Contexts/Shared/domain/EventBus';
66

77
export function registerSubscribers() {
8-
const eventBus = container.get('Mooc.shared.EventBus') as InMemoryAsyncEventBus;
8+
const eventBus = container.get('Mooc.shared.EventBus') as EventBus;
99
const subscriberDefinitions = container.findTaggedServiceIds('domainEventSubscriber') as Map<String, Definition>;
1010
const subscribers: Array<DomainEventSubscriber<DomainEvent>> = [];
1111

1212
subscriberDefinitions.forEach((value: any, key: any) => subscribers.push(container.get(key)));
1313
eventBus.addSubscribers(subscribers);
14+
eventBus.start();
1415
}

0 commit comments

Comments
 (0)