Skip to content

Commit 34ab7c7

Browse files
committed
Use rabbitmq event bus in all applications
1 parent e28671f commit 34ab7c7

File tree

21 files changed

+210
-130
lines changed

21 files changed

+210
-130
lines changed

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
},
1313
"scripts": {
1414
"dev": "NODE_ENV=dev ts-node-dev --ignore-watch node_modules --inspect=0.0.0.0:9267 ./src/apps/mooc/backend/start.ts",
15-
"dev:backoffice:frontend": "NODE_ENV=dev ts-node-dev --ignore-watch node_modules ./src/apps/backoffice/frontend/server.ts",
16-
"dev:backoffice:backend": "NODE_ENV=dev ts-node-dev --ignore-watch node_modules ./src/apps/backoffice/backend/server.ts",
15+
"dev:backoffice:frontend": "NODE_ENV=dev ts-node-dev --ignore-watch node_modules ./src/apps/backoffice/frontend/start.ts",
16+
"dev:backoffice:backend": "NODE_ENV=dev ts-node-dev --ignore-watch node_modules ./src/apps/backoffice/backend/start.ts",
1717
"test": "npm run test:unit && npm run test:features && npm run cypress:run",
1818
"test:unit": "NODE_ENV=test jest",
1919
"test:features": "npm run test:mooc:backend:features && npm run test:backoffice:backend:features",

src/Contexts/Shared/domain/EventBus.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
1+
import { DomainEventMapping } from '../infrastructure/EventBus/DomainEventMapping';
12
import { DomainEvent } from './DomainEvent';
23
import { DomainEventSubscriber } from './DomainEventSubscriber';
34

45
export interface EventBus {
6+
setDomainEventMapping(domainEventMapping: DomainEventMapping): void;
57
publish(events: Array<DomainEvent>): Promise<void>;
68
addSubscribers(subscribers: Array<DomainEventSubscriber<DomainEvent>>): void;
79
start(): Promise<void>;

src/Contexts/Shared/infrastructure/EventBus/InMemory/InMemoryAsyncEventBus.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { DomainEvent } from '../../../domain/DomainEvent';
22
import { DomainEventSubscriber } from '../../../domain/DomainEventSubscriber';
33
import { EventBus } from '../../../domain/EventBus';
4+
import { DomainEventMapping } from '../DomainEventMapping';
45
import { EventEmitterBus } from '../EventEmitterBus';
56

67
export class InMemoryAsyncEventBus implements EventBus {
@@ -19,4 +20,6 @@ export class InMemoryAsyncEventBus implements EventBus {
1920
addSubscribers(subscribers: Array<DomainEventSubscriber<DomainEvent>>) {
2021
this.bus.registerSubscribers(subscribers);
2122
}
23+
24+
setDomainEventMapping(domainEventMapping: DomainEventMapping): void {}
2225
}

src/Contexts/Shared/infrastructure/EventBus/InMemory/InMemorySyncEventBus.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { DomainEvent } from '../../../domain/DomainEvent';
22
import { DomainEventSubscriber } from '../../../domain/DomainEventSubscriber';
33
import { EventBus } from '../../../domain/EventBus';
4+
import { DomainEventMapping } from '../DomainEventMapping';
45

56
type Subscription = {
67
boundedCallback: Function;
@@ -34,6 +35,8 @@ export class InMemorySyncEventBus implements EventBus {
3435
);
3536
}
3637

38+
setDomainEventMapping(domainEventMapping: DomainEventMapping): void {}
39+
3740
private subscribe(topic: string, subscriber: DomainEventSubscriber<DomainEvent>): void {
3841
const currentSubscriptions = this.subscriptions.get(topic);
3942
const subscription = { boundedCallback: subscriber.on.bind(subscriber), originalCallback: subscriber.on };

src/Contexts/Shared/infrastructure/EventBus/RabbitMq/RabbitMqEventBus.ts

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,33 +4,38 @@ import { EventBus } from '../../../domain/EventBus';
44
import { DomainEvent } from '../../../domain/DomainEvent';
55
import { DomainEventSubscriber } from '../../../domain/DomainEventSubscriber';
66
import { DomainEventJsonDeserializer } from '../DomainEventJsonDeserializer';
7+
import { DomainEventMapping } from '../DomainEventMapping';
78

89
export default class RabbitMqEventbus implements EventBus {
910
private connection: Connection;
1011
private exchange: Exchange;
1112
private queue: Queue;
12-
private deserializer: DomainEventJsonDeserializer;
13+
private deserializer?: DomainEventJsonDeserializer;
1314
private subscribers: Map<string, Array<DomainEventSubscriber<DomainEvent>>>;
1415

15-
constructor(deserializer: DomainEventJsonDeserializer) {
16+
constructor() {
1617
const rabbitMQConfig = config.get('rabbitMQ');
1718
this.connection = new Connection(`amqp://${rabbitMQConfig.user}:${rabbitMQConfig.password}@${rabbitMQConfig.host}`);
1819
this.exchange = this.connection.declareExchange('domain_events_fanout', 'fanout', { durable: false });
1920
this.queue = this.connection.declareQueue('coursesTest', { exclusive: true });
20-
this.deserializer = deserializer;
2121
this.subscribers = new Map();
2222
}
2323

2424
async start(): Promise<void> {
25+
if (!this.deserializer) {
26+
throw new Error('RabbitMqEventbus has not being properly initialized, deserializer is missing');
27+
}
28+
2529
this.queue.bind(this.exchange);
2630
this.queue.activateConsumer(
2731
async message => {
28-
const event = this.deserializer.deserialize(message.content.toString());
32+
const event = this.deserializer!.deserialize(message.content.toString());
2933
const subscribers = this.subscribers.get(event.eventName);
3034
const subscribersExecutions = subscribers!.map(subscriber => subscriber.on(event));
3135
await Promise.all(subscribersExecutions);
36+
await message.ack();
3237
},
33-
{ noAck: true }
38+
{ noAck: false }
3439
);
3540
}
3641

@@ -50,6 +55,10 @@ export default class RabbitMqEventbus implements EventBus {
5055
});
5156
}
5257

58+
setDomainEventMapping(domainEventMapping: DomainEventMapping): void {
59+
this.deserializer = new DomainEventJsonDeserializer(domainEventMapping);
60+
}
61+
5362
private addSubscriber(subscriber: DomainEventSubscriber<DomainEvent>): void {
5463
subscriber.subscribedTo().map(event => {
5564
const eventName = event.EVENT_NAME;

src/apps/backoffice/backend/app.ts

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,13 @@
1-
import bodyParser from 'body-parser';
2-
import express from 'express';
3-
import helmet from 'helmet';
4-
import compress from 'compression';
5-
import { registerRoutes } from './routes';
61
import { registerSubscribers } from './subscribers';
7-
8-
const app: express.Express = express();
9-
10-
app.set('port', process.env.PORT || 3000);
11-
12-
app.use(bodyParser.json());
13-
app.use(bodyParser.urlencoded({ extended: true }));
14-
app.use(helmet.xssFilter());
15-
app.use(helmet.noSniff());
16-
app.use(helmet.hidePoweredBy());
17-
app.use(helmet.frameguard({ action: 'deny' }));
18-
app.use(compress());
19-
20-
registerRoutes(app);
21-
registerSubscribers();
22-
23-
export default app;
2+
import { Server } from './server';
3+
4+
export class Application {
5+
private server?: Server;
6+
7+
async start() {
8+
const port = process.env.PORT || '3000';
9+
this.server = new Server(port);
10+
await registerSubscribers();
11+
return this.server.listen();
12+
}
13+
}

src/apps/backoffice/backend/config/dependency-injection/Shared/application.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ services:
2424
arguments: ['@Shared.QueryHandlersInformation']
2525

2626
Shared.EventBus:
27-
class: ../../../../../../Contexts/Shared/infrastructure/EventBus/InMemory/InMemorySyncEventBus
27+
class: ../../../../../../Contexts/Shared/infrastructure/EventBus/RabbitMq/RabbitMqEventBus
2828
arguments: []
2929

3030
Shared.EventBus.DomainEventMapping:

src/apps/backoffice/backend/config/dependency-injection/application_test.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,7 @@ services:
99
Backoffice.Backend.ElasticEnvironmentArranger:
1010
class: ../../../../../../tests/Contexts/Shared/infrastructure/elastic/ElasticEnvironmentArranger
1111
arguments: ['@Shared.ElasticConnectionManager']
12+
13+
Shared.EventBus:
14+
class: ../../../../../Contexts/Shared/infrastructure/EventBus/InMemory/InMemorySyncEventBus
15+
arguments: []
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{}
Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,47 @@
1-
import errorHandler from 'errorhandler';
2-
import app from './app';
31
import container from './config/dependency-injection';
2+
import errorHandler from 'errorhandler';
3+
import helmet from 'helmet';
4+
import compress from 'compression';
5+
import bodyParser from 'body-parser';
6+
import express from 'express';
7+
import * as http from 'http';
8+
import Logger from '../../../Contexts/Shared/domain/Logger';
9+
import { registerRoutes } from './routes';
410

5-
/**
6-
* Error Handler. Provides full stack - remove for production
7-
*/
8-
app.use(errorHandler());
11+
export class Server {
12+
private express: express.Express;
13+
private port: string;
14+
private logger: Logger;
15+
private httpServer?: http.Server;
916

10-
/**
11-
* Start Express server.
12-
*/
13-
const server = app.listen(app.get('port'), () => {
14-
// tslint:disable: no-console
15-
const logger = container.get('Shared.Logger');
17+
constructor(port: string) {
18+
this.port = port;
19+
this.logger = container.get('Shared.Logger');
20+
this.express = express();
21+
this.express.use(bodyParser.json());
22+
this.express.use(bodyParser.urlencoded({ extended: true }));
23+
this.express.use(helmet.xssFilter());
24+
this.express.use(helmet.noSniff());
25+
this.express.use(helmet.hidePoweredBy());
26+
this.express.use(helmet.frameguard({ action: 'deny' }));
27+
this.express.use(compress());
28+
registerRoutes(this.express);
29+
this.express.use(errorHandler());
30+
}
1631

17-
logger.info(` App is running at http://localhost:${app.get('port')} in ${app.get('env')} mode`);
18-
console.log(' Press CTRL-C to stop\n');
19-
});
32+
async listen(): Promise<void> {
33+
return new Promise(resolve => {
34+
this.httpServer = this.express.listen(this.port, () => {
35+
this.logger.info(` App is running at http://localhost:${this.port} in ${this.express.get('env')} mode`);
36+
this.logger.info(' Press CTRL-C to stop\n');
37+
resolve();
38+
});
39+
});
40+
}
2041

21-
export default server;
42+
stop() {
43+
if (this.httpServer) {
44+
this.httpServer.close();
45+
}
46+
}
47+
}

0 commit comments

Comments
 (0)