Skip to content

Commit 9daa947

Browse files
EpicFnEpicFn
andauthored
[Feat/OPS-344] message queue 구현 (#107)
* chore : RabbitMQ 환경 설정 * chore : CI 파이프라인에서 RabbitMQ 컨테이너를 띄워서 사용하도록 설정 * new : RabbitMQ 설정 클래스, dto 생성 * feat : producer method 생성 * feat : 메세지 큐 구현 완료 * feat : 데이터 저장 요청 테스트 케이스 수정 * feat : ConsumerTest 코드 추가 * fix : 저장 테스트 케이스 성공 * fix : 테스트 코드 수정 중 * fix : 테스트 케이스 성공 * feat : dlq 도입 * feat : 데이터 순서 보장을 위해 version 추가 * refactor : MQConfig 파일 위치 변경 * chore : RabbitMQ 환경 설정 # Conflicts: # docker-compose.yml # src/main/resources/application.yml * chore : CI 파이프라인에서 RabbitMQ 컨테이너를 띄워서 사용하도록 설정 * new : RabbitMQ 설정 클래스, dto 생성 * feat : producer method 생성 * feat : 메세지 큐 구현 완료 * feat : 데이터 저장 요청 테스트 케이스 수정 * feat : ConsumerTest 코드 추가 * fix : 저장 테스트 케이스 성공 * fix : 테스트 코드 수정 중 * fix : 테스트 케이스 성공 * feat : dlq 도입 * feat : 데이터 순서 보장을 위해 version 추가 * refactor : MQConfig 파일 위치 변경 * fix : 최신 사항 반영 * fix : copilot review 반영 * feat : Dashboard에서 graph 참조 방식 EAGER -> LAZY 로 변경 * CI 실패해서 다시 EAGER로 변경 * fix : 다시 LAZY로 변경 --------- Co-authored-by: EpicFn <[email protected]>
1 parent 6939648 commit 9daa947

File tree

15 files changed

+359
-43
lines changed

15 files changed

+359
-43
lines changed

.github/workflows/test-server-ci.yml

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,20 @@ jobs:
2323
env:
2424
SENTRY_AUTH_TOKEN: ${{ secrets.SENTRY_AUTH_TOKEN }}
2525

26+
27+
# CI 작업이 실행되는 동안 RabbitMQ 서비스 컨테이너를 함께 실행
28+
services:
29+
rabbitmq:
30+
image: rabbitmq:3-management
31+
ports:
32+
- 5672:5672
33+
# RabbitMQ가 완전히 준비될 때까지 기다리는 상태 확인 옵션
34+
options: >-
35+
--health-cmd "rabbitmq-diagnostics check_running"
36+
--health-interval 10s
37+
--health-timeout 5s
38+
--health-retries 5
39+
2640
steps:
2741
# 1. 소스 코드 체크아웃
2842
- name: Checkout source code
@@ -74,6 +88,12 @@ jobs:
7488
7589
# 7. Gradle 테스트 실행
7690
- name: Test with Gradle
91+
# 테스트 단계에서 RabbitMQ 연결을 위한 환경 변수 설정
92+
env:
93+
SPRING_RABBITMQ_HOST: localhost
94+
SPRING_RABBITMQ_PORT: 5672
95+
SPRING_RABBITMQ_USERNAME: guest
96+
SPRING_RABBITMQ_PASSWORD: guest
7797
run: ./gradlew test
7898

7999
# 8. 테스트 결과 요약 출력

build.gradle

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,8 +116,12 @@ dependencies {
116116
// Redis (Spring starter)
117117
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
118118

119-
// RabbitMQ
119+
// RabbitMQ (Spring starter)
120120
implementation 'org.springframework.boot:spring-boot-starter-amqp'
121+
testImplementation 'org.springframework.amqp:spring-rabbit-test'
122+
123+
// Awaitility (비동기 테스트 지원)
124+
testImplementation 'org.awaitility:awaitility:4.2.0'
121125
}
122126

123127
dependencyManagement {

src/main/java/org/tuna/zoopzoop/backend/domain/dashboard/controller/ApiV1DashboardController.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,18 @@ public class ApiV1DashboardController {
3333
*/
3434
@PutMapping("/{dashboardId}/graph")
3535
@Operation(summary = "React-flow 데이터 저장(갱신)")
36-
public ResponseEntity<RsData<Void>> updateGraph(
36+
public ResponseEntity<RsData<Void>> queueGraphUpdate(
3737
@PathVariable Integer dashboardId,
3838
@RequestBody String requestBody,
3939
@RequestHeader("Liveblocks-Signature") String signature
4040
) {
41-
dashboardService.verifyAndUpdateGraph(dashboardId, requestBody, signature);
41+
dashboardService.queueGraphUpdate(dashboardId, requestBody, signature);
4242

4343
return ResponseEntity
44-
.status(HttpStatus.OK)
44+
.status(HttpStatus.ACCEPTED)
4545
.body(new RsData<>(
46-
"200",
47-
"React-flow 데이터를 저장했습니다.",
46+
"202",
47+
"데이터 업데이트 요청이 성공적으로 접수되었습니다.",
4848
null
4949
));
5050
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.tuna.zoopzoop.backend.domain.dashboard.dto;
2+
3+
public record GraphUpdateMessage(
4+
Integer dashboardId,
5+
String requestBody
6+
){
7+
}

src/main/java/org/tuna/zoopzoop/backend/domain/dashboard/entity/Graph.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import jakarta.persistence.CascadeType;
44
import jakarta.persistence.Entity;
55
import jakarta.persistence.OneToMany;
6+
import jakarta.persistence.Version;
67
import lombok.Getter;
78
import lombok.Setter;
89
import org.tuna.zoopzoop.backend.global.jpa.entity.BaseEntity;
@@ -14,6 +15,10 @@
1415
@Setter
1516
@Entity
1617
public class Graph extends BaseEntity {
18+
19+
@Version
20+
private Long version;
21+
1722
@OneToMany(mappedBy = "graph", cascade = CascadeType.ALL, orphanRemoval = true)
1823
private List<Node> nodes = new ArrayList<>();
1924

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package org.tuna.zoopzoop.backend.domain.dashboard.extraComponent;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import lombok.RequiredArgsConstructor;
5+
import lombok.extern.slf4j.Slf4j;
6+
import org.springframework.amqp.rabbit.annotation.RabbitListener;
7+
import org.springframework.orm.ObjectOptimisticLockingFailureException;
8+
import org.springframework.stereotype.Component;
9+
import org.tuna.zoopzoop.backend.domain.dashboard.dto.BodyForReactFlow;
10+
import org.tuna.zoopzoop.backend.domain.dashboard.dto.GraphUpdateMessage;
11+
import org.tuna.zoopzoop.backend.domain.dashboard.service.DashboardService;
12+
13+
@Slf4j
14+
@Component
15+
@RequiredArgsConstructor
16+
public class GraphUpdateConsumer {
17+
private final DashboardService dashboardService;
18+
private final ObjectMapper objectMapper;
19+
20+
@RabbitListener(queues = "graph.update.queue")
21+
public void handleGraphUpdate(GraphUpdateMessage message) {
22+
log.info("Received graph update message for dashboardId: {}", message.dashboardId());
23+
try {
24+
BodyForReactFlow dto = objectMapper.readValue(message.requestBody(), BodyForReactFlow.class);
25+
dashboardService.updateGraph(message.dashboardId(), dto);
26+
log.info("Successfully updated graph for dashboardId: {}", message.dashboardId());
27+
} catch (ObjectOptimisticLockingFailureException e) {
28+
// Optimistic Lock 충돌 발생!
29+
// 내가 처리하려던 메시지는 이미 구버전 데이터에 대한 요청이었음.
30+
// 따라서 이 메시지는 무시하고 정상 처리된 것으로 간주.
31+
log.warn("Stale update attempt for dashboardId: {}. A newer version already exists. Discarding message.", message.dashboardId());
32+
// 예외를 다시 던지지 않으므로, 메시지는 큐에서 정상적으로 제거(ACK)됩니다.
33+
} catch (Exception e) {
34+
// 실제 운영에서는 메시지를 재시도하거나, 실패 큐(Dead Letter Queue)로 보내는 등의
35+
// 정교한 에러 처리 로직이 필요합니다.
36+
log.error("Failed to process graph update for dashboardId: {}", message.dashboardId(), e);
37+
}
38+
}
39+
}

src/main/java/org/tuna/zoopzoop/backend/domain/dashboard/service/DashboardService.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@
33
import com.fasterxml.jackson.databind.ObjectMapper;
44
import jakarta.persistence.NoResultException;
55
import lombok.RequiredArgsConstructor;
6+
import org.apache.commons.codec.binary.Hex;
7+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
8+
import org.springframework.beans.factory.annotation.Value;
69
import org.springframework.stereotype.Service;
710
import org.springframework.transaction.annotation.Transactional;
811
import org.tuna.zoopzoop.backend.domain.dashboard.dto.BodyForReactFlow;
12+
import org.tuna.zoopzoop.backend.domain.dashboard.dto.GraphUpdateMessage;
913
import org.tuna.zoopzoop.backend.domain.dashboard.entity.Dashboard;
1014
import org.tuna.zoopzoop.backend.domain.dashboard.entity.Edge;
1115
import org.tuna.zoopzoop.backend.domain.dashboard.entity.Graph;
@@ -25,6 +29,7 @@ public class DashboardService {
2529
private final MembershipService membershipService;
2630
private final ObjectMapper objectMapper;
2731
private final SignatureService signatureService;
32+
private final RabbitTemplate rabbitTemplate;
2833

2934

3035

@@ -103,6 +108,33 @@ public void verifyAccessPermission(Member member, Integer dashboardId) throws Ac
103108
}
104109
}
105110

111+
// =========================== message 관리 메서드 ===========================
112+
113+
/**
114+
* Graph 업데이트 요청을 RabbitMQ 큐에 비동기적으로 발행하는 메서드
115+
* @param dashboardId 대시보드 ID
116+
* @param requestBody 요청 바디
117+
* @param signatureHeader 서명 헤더
118+
*/
119+
public void queueGraphUpdate(Integer dashboardId, String requestBody, String signatureHeader){
120+
// 서명 검증은 동기적으로 즉시 처리
121+
if (!signatureService.isValidSignature(requestBody, signatureHeader)) {
122+
throw new SecurityException("Invalid webhook signature.");
123+
}
124+
125+
// 대시보드 존재 여부 확인
126+
if (!dashboardRepository.existsById(dashboardId)) {
127+
throw new NoResultException(dashboardId + " ID를 가진 대시보드를 찾을 수 없습니다.");
128+
}
129+
130+
// 큐에 보낼 메시지 생성
131+
GraphUpdateMessage message = new GraphUpdateMessage(dashboardId, requestBody);
132+
133+
// RabbitMQ에 메시지 발행
134+
rabbitTemplate.convertAndSend("zoopzoop.exchange", "graph.update.rk", message);
135+
}
136+
137+
106138

107139

108140

src/main/java/org/tuna/zoopzoop/backend/domain/dashboard/service/SignatureService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,11 @@ public class SignatureService {
2626
* @return 서명이 유효하면 true, 그렇지 않으면 false
2727
*/
2828
public boolean isValidSignature(String requestBody, String signatureHeader) {
29+
// [임시 코드] 로컬 테스트를 위해 무조건 true 반환
30+
// if ("true".equals(System.getProperty("local.test.skip.signature"))) {
31+
// return true;
32+
// }
33+
2934
try {
3035
// 1. 헤더 파싱
3136
String[] parts = signatureHeader.split(",");
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package org.tuna.zoopzoop.backend.global.config.mq;
2+
3+
import org.springframework.amqp.core.*;
4+
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
5+
import org.springframework.amqp.rabbit.core.RabbitTemplate;
6+
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
7+
import org.springframework.amqp.support.converter.MessageConverter;
8+
import org.springframework.context.annotation.Bean;
9+
import org.springframework.context.annotation.Configuration;
10+
11+
@Configuration
12+
public class RabbitMQConfig {
13+
private static final String EXCHANGE_NAME = "zoopzoop.exchange";
14+
private static final String QUEUE_NAME = "graph.update.queue";
15+
private static final String ROUTING_KEY = "graph.update.#";
16+
17+
private static final String DLQ_EXCHANGE_NAME = EXCHANGE_NAME + ".dlx";
18+
private static final String DLQ_QUEUE_NAME = QUEUE_NAME + ".dlq";
19+
private static final String DLQ_ROUTING_KEY = "graph.update.dlq";
20+
21+
@Bean
22+
public TopicExchange exchange() {
23+
return new TopicExchange(EXCHANGE_NAME);
24+
}
25+
26+
@Bean
27+
public Queue queue() {
28+
return QueueBuilder.durable(QUEUE_NAME)
29+
.withArgument("x-dead-letter-exchange", DLQ_EXCHANGE_NAME) // 실패 시 메시지를 보낼 Exchange
30+
.withArgument("x-dead-letter-routing-key", DLQ_ROUTING_KEY) // 실패 시 사용할 라우팅 키
31+
.build();
32+
}
33+
34+
@Bean
35+
public Binding binding(Queue queue, TopicExchange exchange) {
36+
return BindingBuilder.bind(queue).to(exchange).with(ROUTING_KEY);
37+
}
38+
39+
// ================= DLQ 인프라 구성 추가 ================= //
40+
41+
@Bean
42+
public TopicExchange dlqExchange() {
43+
return new TopicExchange(DLQ_EXCHANGE_NAME);
44+
}
45+
46+
@Bean
47+
public Queue dlqQueue() {
48+
return new Queue(DLQ_QUEUE_NAME);
49+
}
50+
51+
@Bean
52+
public Binding dlqBinding(Queue dlqQueue, TopicExchange dlqExchange) {
53+
return BindingBuilder.bind(dlqQueue).to(dlqExchange).with(DLQ_ROUTING_KEY);
54+
}
55+
56+
// ================= DLQ 인프라 구성 추가 ================= //
57+
@Bean
58+
public MessageConverter messageConverter() {
59+
// 메시지를 JSON 형식으로 직렬화/역직렬화하는 컨버터
60+
return new Jackson2JsonMessageConverter();
61+
}
62+
63+
@Bean
64+
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter messageConverter) {
65+
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
66+
rabbitTemplate.setMessageConverter(messageConverter);
67+
return rabbitTemplate;
68+
}
69+
}

src/main/resources/application.yml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,23 @@ spring:
4040
port: 5672
4141
username: ${SPRING_RABBITMQ_USERNAME:guest}
4242
password: ${SPRING_RABBITMQ_PASSWORD:guest}
43+
listener:
44+
simple:
45+
retry:
46+
enabled: true
47+
initial-interval: 2000
48+
max-attempts: 3
49+
data: #RedisTemplate 등을 사용하기 위한 직접 연결용
50+
redis:
51+
host: localhost
52+
port: 6379
53+
timeout: 6000
54+
cache: #Spring Cache를 사용하기 위한 Redis
55+
type: redis
56+
redis:
57+
time-to-live: 300000
58+
cache-null-values: false
59+
key-prefix:
4360

4461
springdoc:
4562
default-produces-media-type: application/json;charset=UTF-8
@@ -49,6 +66,7 @@ logging:
4966
org.hibernate.orm.jdbc.extract: TRACE
5067
org.springframework.transaction.interceptor: TRACE
5168
com.back: DEBUG
69+
org.springframework.retry: DEBUG
5270

5371
server:
5472
port: 8080

0 commit comments

Comments
 (0)