Skip to content

Commit 0c189fc

Browse files
authored
Merge pull request #478 from boostcampwm-2024/feat/rabbitmq-messaging-layer
✨Feat: RabbitMQ 클라이언트 레이어 구현
2 parents a2625ba + f8882c5 commit 0c189fc

16 files changed

+328
-26
lines changed

docker-compose/definitions.json

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
{
2+
"exchanges": [
3+
{
4+
"name": "email",
5+
"vhost": "/",
6+
"type": "direct",
7+
"durable": true
8+
},
9+
{
10+
"name": "crawling",
11+
"vhost": "/",
12+
"type": "topic",
13+
"durable": true
14+
},
15+
{
16+
"name": "dead_letter",
17+
"vhost": "/",
18+
"type": "topic",
19+
"durable": true
20+
}
21+
],
22+
"queues": [
23+
{
24+
"name": "email_send",
25+
"vhost": "/",
26+
"durable": true,
27+
"arguments": {
28+
"x-dead-letter-exchange": "dead_letter",
29+
"x-dead-letter-routing-key": "email.dead_letter"
30+
}
31+
},
32+
{
33+
"name": "crawling_full",
34+
"vhost": "/",
35+
"durable": true,
36+
"arguments": {
37+
"x-dead-letter-exchange": "dead_letter",
38+
"x-dead-letter-routing-key": "crawling.full.dead_letter"
39+
}
40+
},
41+
{
42+
"name": "email_dead_letter",
43+
"vhost": "/",
44+
"durable": true
45+
},
46+
{
47+
"name": "crawling_full_dead_letter",
48+
"vhost": "/",
49+
"durable": true
50+
}
51+
],
52+
"bindings": [
53+
{
54+
"source": "email",
55+
"vhost": "/",
56+
"destination": "email_send",
57+
"destination_type": "queue",
58+
"routing_key": "email.send",
59+
"arguments": {}
60+
},
61+
{
62+
"source": "crawling",
63+
"vhost": "/",
64+
"destination": "crawling_full",
65+
"destination_type": "queue",
66+
"routing_key": "crawling.full",
67+
"arguments": {}
68+
},
69+
{
70+
"source": "dead_letter",
71+
"vhost": "/",
72+
"destination": "email_dead_letter",
73+
"destination_type": "queue",
74+
"routing_key": "email.dead_letter",
75+
"arguments": {}
76+
},
77+
{
78+
"source": "dead_letter",
79+
"vhost": "/",
80+
"destination": "crawling_full_dead_letter",
81+
"destination_type": "queue",
82+
"routing_key": "crawling.full.dead_letter",
83+
"arguments": {}
84+
}
85+
]
86+
}

docker-compose/docker-compose.infra.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ services:
7676
RABBITMQ_DEFAULT_PASS: "denamu-rabbitmq-password"
7777
volumes:
7878
- denamu-rabbitmq:/var/lib/rabbitmq
79+
- ./definitions.json:/etc/rabbitmq/definitions.json:ro
80+
- ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro
7981
healthcheck:
8082
test: [ "CMD", "rabbitmq-diagnostics", "ping" ]
8183
interval: 30s

docker-compose/docker-compose.local.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ services:
3636
condition: service_healthy
3737
redis:
3838
condition: service_healthy
39+
rabbitmq:
40+
condition: service_healthy
3941
volumes:
4042
- ../server/logs:/var/web05-Denamu/server/logs
4143
environment:
@@ -55,6 +57,8 @@ services:
5557
condition: service_healthy
5658
redis:
5759
condition: service_healthy
60+
rabbitmq:
61+
condition: service_healthy
5862
volumes:
5963
- ../feed-crawler/logs:/var/web05-Denamu/feed-crawler/logs
6064
environment:

docker-compose/docker-compose.prod.infra.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ services:
8787
- /var/prod_config/infra/.env.prod
8888
volumes:
8989
- denamu-rabbitmq:/var/lib/rabbitmq
90+
- ./definitions.json:/etc/rabbitmq/definitions.json:ro
91+
- ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf:ro
9092
healthcheck:
9193
test: [ "CMD", "rabbitmq-diagnostics", "ping" ]
9294
interval: 30s

docker-compose/rabbitmq.conf

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# Definitions 파일 로드
2+
management.load_definitions = /etc/rabbitmq/definitions.json
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
export const RMQ_EXCHANGES = {
2+
EMAIL: 'EmailExchange',
3+
CRAWLING: 'CrawlingExchange',
4+
DEAD_LETTER: 'DeadLetterExchange',
5+
};
6+
7+
export const RMQ_QUEUES = {
8+
EMAIL_SEND: 'email.send.queue',
9+
CRAWLING_FULL: 'crawling.full.queue',
10+
EMAIL_DEAD_LETTER: 'email.deadLetter.queue',
11+
CRAWLING_FULL_DEAD_LETTER: 'crawling.full.deadLetter.queue',
12+
};
13+
14+
export const RMQ_ROUTING_KEYS = {
15+
EMAIL_SEND: 'email.send',
16+
CRAWLING_FULL: 'crawling.full',
17+
EMAIL_DEAD_LETTER: 'email.deadLetter',
18+
CRAWLING_FULL_DEAD_LETTER: 'crawling.full.deadLetter',
19+
};
20+
21+
export const RMQ_EXCHANGE_TYPE = {
22+
DIRECT: 'direct',
23+
TOPIC: 'topic',
24+
FANOUT: 'fanout',
25+
};
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import { injectable } from 'tsyringe';
2+
import { Channel, ChannelModel } from 'amqplib';
3+
import * as amqp from 'amqplib';
4+
5+
@injectable()
6+
export class RabbitMQManager {
7+
private connection: ChannelModel | null;
8+
private channel: Channel | null;
9+
private connectionPromise: Promise<ChannelModel> | null = null;
10+
private channelPromise: Promise<Channel> | null = null;
11+
12+
constructor() {
13+
this.connection = null;
14+
this.channel = null;
15+
}
16+
17+
async connect() {
18+
if (this.connection) return this.connection;
19+
if (this.connectionPromise) return this.connectionPromise;
20+
21+
this.connectionPromise = amqp.connect({
22+
protocol: 'amqp',
23+
hostname: process.env.RABBITMQ_HOST,
24+
port: Number.parseInt(process.env.RABBITMQ_PORT),
25+
username: process.env.RABBITMQ_DEFAULT_USER,
26+
password: process.env.RABBITMQ_DEFAULT_PASS,
27+
});
28+
29+
this.connection = await this.connectionPromise;
30+
this.connectionPromise = null;
31+
return this.connection;
32+
}
33+
34+
async getChannel() {
35+
if (this.channel) return this.channel;
36+
if (this.channelPromise) return this.channelPromise;
37+
38+
if (!this.connection) {
39+
await this.connect();
40+
}
41+
this.channelPromise = this.connection.createChannel();
42+
this.channel = await this.channelPromise;
43+
this.channelPromise = null;
44+
return this.channel;
45+
}
46+
47+
async disconnect() {
48+
if (this.channel) {
49+
await this.channel.close();
50+
this.channel = null;
51+
}
52+
53+
if (this.connection) {
54+
await this.connection.close();
55+
this.connection = null;
56+
}
57+
}
58+
}
Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,46 @@
1-
import { injectable } from 'tsyringe';
2-
import * as amqp from 'amqplib';
3-
import { Channel, ChannelModel } from 'amqplib';
1+
import { inject, injectable } from 'tsyringe';
2+
import { RabbitMQManager } from './rabbitmq.manager';
3+
import { ConsumeMessage } from 'amqplib/properties';
4+
import { DEPENDENCY_SYMBOLS } from '../types/dependency-symbols';
5+
import logger from './logger';
46

57
@injectable()
68
export class RabbitMQConnection {
7-
private connection: ChannelModel;
8-
private channel: Channel;
99
private nameTag: string;
1010

11-
constructor() {
11+
constructor(
12+
@inject(DEPENDENCY_SYMBOLS.RabbitMQManager)
13+
private readonly rabbitMQManager: RabbitMQManager,
14+
) {
1215
this.nameTag = '[RabbitMQ]';
13-
this.connect();
1416
}
1517

16-
async connect() {
17-
this.connection = await amqp.connect({
18-
protocol: 'amqp',
19-
hostname: process.env.RABBITMQ_HOST,
20-
port: Number.parseInt(process.env.RABBITMQ_PORT),
21-
username: process.env.RABBITMQ_DEFAULT_USER,
22-
password: process.env.RABBITMQ_DEFAULT_PASS,
18+
async sendMessage(exchange: string, routingKey: string, message: string) {
19+
const channel = await this.rabbitMQManager.getChannel();
20+
channel.publish(exchange, routingKey, Buffer.from(message));
21+
}
22+
23+
async consumeMessage(
24+
queue: string,
25+
onMessage: (msg: ConsumeMessage | null) => void | Promise<void>,
26+
) {
27+
const channel = await this.rabbitMQManager.getChannel();
28+
await channel.consume(queue, async (message) => {
29+
try {
30+
if (!message) return;
31+
32+
const parsedMessage = JSON.parse(message.content.toString());
33+
await onMessage(parsedMessage);
34+
35+
channel.ack(message);
36+
} catch (error) {
37+
logger.error(
38+
`${this.nameTag} 메시지 처리 중 오류 발생
39+
오류 메시지: ${error.message}
40+
스택 트레이스: ${error.stack}`,
41+
);
42+
channel.nack(message, false, false);
43+
}
2344
});
2445
}
2546
}

feed-crawler/src/container.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import { Atom10Parser } from './common/parser/formats/atom10-parser';
1414
import { FeedCrawler } from './feed-crawler';
1515
import { FullFeedCrawlEventWorker } from './event_worker/workers/full-feed-crawl-event-worker';
1616
import { RabbitMQConnection } from './common/rmq-access';
17+
import { RabbitMQManager } from './common/rabbitmq.manager';
1718

1819
container.registerSingleton<DatabaseConnection>(
1920
DEPENDENCY_SYMBOLS.DatabaseConnection,
@@ -80,4 +81,9 @@ container.registerSingleton<RabbitMQConnection>(
8081
RabbitMQConnection,
8182
);
8283

84+
container.registerSingleton<RabbitMQManager>(
85+
DEPENDENCY_SYMBOLS.RabbitMQManager,
86+
RabbitMQManager,
87+
);
88+
8389
export { container };

feed-crawler/src/main.ts

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { ClaudeEventWorker } from './event_worker/workers/claude-event-worker';
99
import * as schedule from 'node-schedule';
1010
import { RedisConnection } from './common/redis-access';
1111
import { FullFeedCrawlEventWorker } from './event_worker/workers/full-feed-crawl-event-worker';
12+
import { RabbitMQManager } from './common/rabbitmq.manager';
1213

1314
function initializeDependencies() {
1415
return {
@@ -25,6 +26,9 @@ function initializeDependencies() {
2526
fullFeedCrawlEventWorker: container.resolve<FullFeedCrawlEventWorker>(
2627
DEPENDENCY_SYMBOLS.FullFeedCrawlEventWorker,
2728
),
29+
rabbitMQManager: container.resolve<RabbitMQManager>(
30+
DEPENDENCY_SYMBOLS.RabbitMQManager,
31+
),
2832
};
2933
}
3034

@@ -59,18 +63,38 @@ async function handleShutdown(
5963
logger.info(`${signal} 신호 수신, feed-crawler 종료 중...`);
6064
await dependencies.dbConnection.end();
6165
await dependencies.redisConnection.quit();
62-
logger.info('DB 및 Redis 연결 종료');
66+
await dependencies.rabbitMQManager.disconnect();
67+
logger.info('DB, Redis, RabbitMQ 연결 종료');
6368
process.exit(0);
6469
}
6570

66-
function startScheduler() {
71+
async function startScheduler() {
6772
logger.info('[Feed Crawler Scheduler Start]');
6873

6974
const dependencies = initializeDependencies();
75+
await initializeRabbitMQ(dependencies);
7076
registerSchedulers(dependencies);
7177

7278
process.on('SIGINT', () => handleShutdown(dependencies, 'SIGINT'));
7379
process.on('SIGTERM', () => handleShutdown(dependencies, 'SIGTERM'));
7480
}
7581

76-
startScheduler();
82+
async function initializeRabbitMQ(
83+
dependencies: ReturnType<typeof initializeDependencies>,
84+
) {
85+
try {
86+
logger.info(`RabbitMQ 초기화 시작...`);
87+
88+
await dependencies.rabbitMQManager.connect();
89+
90+
logger.info(`RabbitMQ 초기화 완료`);
91+
} catch (error) {
92+
logger.error(`RabbitMQ 초기화 실패:`, error);
93+
throw error;
94+
}
95+
}
96+
97+
startScheduler().catch((error) => {
98+
logger.error(`스케줄러 시작 실패: `, error);
99+
process.exit(1);
100+
});

0 commit comments

Comments
 (0)