Skip to content

Commit 10c0859

Browse files
refactor(be): webFlux -> 가상 스레드 적용 (#297)
* refactor: webflux -> virtual thread 적용 * refactor: 주석 변경 * refactor: etaService 수정 * chore: application yml 설정 * refactor(be): 필요없어진 타입 변환 삭제, 상점 오픈 확인 코드 반복문 바깥으로 이동 * test(be): 유틸 테스트 코드 추가 * test(be): 검색 도메인 서비스 테스트 코드 추가 * feat(be): 검색 도메인 스웨거 설명 추가 --------- Co-authored-by: yeongbin1999 <[email protected]>
1 parent 92101f8 commit 10c0859

File tree

17 files changed

+1030
-137
lines changed

17 files changed

+1030
-137
lines changed
Lines changed: 41 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,63 @@
11
package com.deliveranything.domain.delivery.handler.redis;
22

3+
import com.deliveranything.domain.delivery.event.dto.RiderNotificationDto;
34
import com.deliveranything.domain.delivery.service.OrderNotificationService;
45
import com.deliveranything.domain.notification.subscriber.delivery.OrderAcceptedNotifier;
56
import com.deliveranything.domain.order.event.OrderAcceptedEvent;
7+
import com.deliveranything.global.exception.CustomException;
68
import com.fasterxml.jackson.core.JsonProcessingException;
79
import com.fasterxml.jackson.databind.ObjectMapper;
810
import jakarta.annotation.PostConstruct;
9-
import java.util.Objects;
11+
import java.util.List;
1012
import lombok.RequiredArgsConstructor;
1113
import lombok.extern.slf4j.Slf4j;
12-
import org.springframework.data.redis.core.ReactiveStringRedisTemplate;
14+
import org.springframework.data.redis.connection.Message;
15+
import org.springframework.data.redis.connection.MessageListener;
16+
import org.springframework.data.redis.listener.ChannelTopic;
17+
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
1318
import org.springframework.stereotype.Component;
14-
import reactor.core.publisher.Mono;
1519

20+
/**
21+
* Virtual Thread 기반 Redis Pub/Sub Subscriber - 주문 접수 이벤트를 수신하여 반경 내 라이더에게 알림 전송 - MessageListener를
22+
* 구현하여 블로킹 방식으로 처리 - RedisMessageListenerContainer의 TaskExecutor가 Virtual Thread를 사용
23+
*/
1624
@Slf4j
1725
@Component
1826
@RequiredArgsConstructor
19-
// 로드 밸런싱 / 다중 인스턴스 환경 대응하여 Redis Pub/Sub 구축
20-
public class OrderAcceptedRedisSubscriber {
27+
public class OrderAcceptedRedisSubscriber implements MessageListener {
28+
29+
public static final String CHANNEL = "order-accepted-event";
2130

22-
private static final String CHANNEL = "order-accepted-event";
2331
private final ObjectMapper objectMapper;
24-
private final ReactiveStringRedisTemplate redisTemplate;
2532
private final OrderAcceptedNotifier orderAcceptedNotifier;
2633
private final OrderNotificationService orderNotificationService;
34+
private final RedisMessageListenerContainer container;
35+
36+
@PostConstruct
37+
public void subscribe() {
38+
container.addMessageListener(this, new ChannelTopic(CHANNEL));
39+
}
40+
41+
@Override
42+
public void onMessage(Message message, byte[] pattern) {
43+
try {
44+
String payload = new String(message.getBody());
45+
OrderAcceptedEvent event = objectMapper.readValue(payload, OrderAcceptedEvent.class);
46+
47+
List<RiderNotificationDto> notifications = orderNotificationService.processOrderEvent(event);
48+
if (!notifications.isEmpty()) {
49+
orderAcceptedNotifier.publish(notifications);
50+
} else {
51+
log.warn("No available riders for orderId: {} (This is not an error)", event.orderId());
52+
}
2753

28-
@PostConstruct // 애플리케이션 구동 시 자동 실행
29-
public void subscribeToOrderChannel() {
30-
redisTemplate.listenToChannel(CHANNEL) // Redis 채널 구독 시작
31-
.map(message -> {
32-
try {
33-
return objectMapper.readValue(message.getMessage(), OrderAcceptedEvent.class);
34-
} catch (JsonProcessingException e) {
35-
log.error("Failed to parse Redis message", e);
36-
return null;
37-
}
38-
}) // 수신된 메시지 내용 추출
39-
.filter(Objects::nonNull) // null 제거
40-
.flatMap(orderNotificationService::processOrderEvent) // 비즈니스 처리
41-
.doOnNext(orderAcceptedNotifier::publish) // SSE로 전송
42-
.onErrorResume(e -> {
43-
log.error("RedisSubscriber error: {}", e.getMessage());
44-
return Mono.empty(); // 끊김 방지
45-
})
46-
.subscribe(); // 실제 구독 실행
54+
} catch (JsonProcessingException e) {
55+
log.error("Failed to parse order event: {}", e.getMessage());
56+
} catch (CustomException e) {
57+
log.error("Business error processing order event: {} - {}", e.getCode(), e.getMessage());
58+
} catch (Exception e) {
59+
log.error("Unexpected error processing order event: {}", e.getMessage(), e);
60+
}
61+
// 예외를 throw하지 않음 → Redis 연결 유지
4762
}
4863
}

backend/src/main/java/com/deliveranything/domain/delivery/service/EtaService.java

Lines changed: 112 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,24 @@
33
import java.util.HashMap;
44
import java.util.List;
55
import java.util.Map;
6+
import java.util.concurrent.CompletableFuture;
7+
import java.util.concurrent.ExecutionException;
8+
import java.util.concurrent.Executors;
9+
import java.util.concurrent.Future;
10+
import java.util.stream.Collectors;
611
import lombok.RequiredArgsConstructor;
12+
import lombok.extern.slf4j.Slf4j;
713
import org.springframework.beans.factory.annotation.Value;
814
import org.springframework.data.geo.Point;
15+
import org.springframework.scheduling.annotation.Async;
916
import org.springframework.stereotype.Service;
1017
import org.springframework.web.reactive.function.client.WebClient;
11-
import reactor.core.publisher.Flux;
12-
import reactor.core.publisher.Mono;
1318

19+
/**
20+
* Virtual Thread 기반 Kakao Map API 서비스 - WebClient.block()은 Virtual Thread에서 안전하게 사용 가능 - 블로킹 호출이지만
21+
* Virtual Thread 덕분에 높은 동시성 유지
22+
*/
23+
@Slf4j
1424
@Service
1525
@RequiredArgsConstructor
1626
public class EtaService {
@@ -22,69 +32,117 @@ public class EtaService {
2232

2333
private static final String KAKAO_BASE_URL = "https://apis-navi.kakaomobility.com/v1";
2434

25-
// Kakao Map API 호출 (Reactive)
26-
public Mono<Map<String, Double>> getEtaForMultipleReactive(
27-
// double storeLat, double storeLon,
35+
/**
36+
* 여러 라이더의 ETA 계산 (동기식 + 병렬 처리) - Virtual Thread에서 병렬로 실행 - @Async로 각 API 호출을 독립적인 Virtual
37+
* Thread에서 처리
38+
*/
39+
public Map<String, Double> getEtaForMultiple(
2840
double userLat, double userLon,
2941
List<Point> riderPoints,
3042
List<String> riderIds
3143
) {
32-
WebClient webClient = webClientBuilder.baseUrl(KAKAO_BASE_URL).build();
44+
// 각 라이더의 ETA를 병렬로 계산 (Virtual Thread)
45+
List<CompletableFuture<Map.Entry<String, Double>>> futures = new java.util.ArrayList<>();
46+
47+
for (int i = 0; i < riderIds.size(); i++) {
48+
final int idx = i;
49+
final String riderId = riderIds.get(idx);
50+
final Point riderPoint = riderPoints.get(idx);
51+
52+
CompletableFuture<Map.Entry<String, Double>> future = calculateSingleEta(
53+
riderId, riderPoint, userLat, userLon
54+
);
55+
futures.add(future);
56+
}
57+
58+
// 모든 결과 대기 및 맵으로 변환
59+
Map<String, Double> result = futures.stream()
60+
.map(CompletableFuture::join)
61+
.filter(entry -> entry != null && entry.getValue() != null)
62+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
3363

34-
return Flux.fromIterable(riderIds)
35-
.index() // (index, riderId)
36-
.flatMap(tuple -> {
37-
long idx = tuple.getT1();
38-
String riderId = tuple.getT2();
39-
Point riderPoint = riderPoints.get((int) idx);
40-
41-
return webClient.get()
42-
.uri(uriBuilder -> uriBuilder
43-
.path("/directions")
44-
.queryParam("origin", riderPoint.getX() + "," + riderPoint.getY()) // lon,lat
45-
.queryParam("destination", userLon + "," + userLat)
46-
.build())
47-
.header("Authorization", "KakaoAK " + kakaoApiKey)
48-
.retrieve()
49-
.bodyToMono(Map.class)
50-
.map(response -> {
51-
Map<String, Object> routes
52-
= (Map<String, Object>) ((List<?>) response.get("routes")).get(0);
53-
Map<String, Object> summary = (Map<String, Object>) routes.get("summary");
54-
Double duration = ((Number) summary.get("duration")).doubleValue(); // 초 단위
55-
return Map.entry(riderId, duration / 60.0); // 분 단위 변환
56-
});
57-
})
58-
.collectMap(Map.Entry::getKey, Map.Entry::getValue); // Map<String, Double>
64+
log.info("Calculated ETA for {} out of {} riders", result.size(), riderIds.size());
65+
66+
return result;
5967
}
6068

61-
// 상점 <-> 주문자 사이 거리 (eta 기준)
62-
public Mono<Map<String, Double>> getDistance(
69+
/**
70+
* 단일 라이더의 ETA 계산 (비동기) - @Async로 Virtual Thread에서 실행
71+
*/
72+
@Async("deliveryVirtualThreadExecutor")
73+
public CompletableFuture<Map.Entry<String, Double>> calculateSingleEta(
74+
String riderId, Point riderPoint, double userLat, double userLon
75+
) {
76+
try {
77+
WebClient webClient = webClientBuilder.baseUrl(KAKAO_BASE_URL).build();
78+
79+
Map<String, Object> response = webClient.get()
80+
.uri(uriBuilder -> uriBuilder
81+
.path("/directions")
82+
.queryParam("origin", riderPoint.getX() + "," + riderPoint.getY()) // lon,lat
83+
.queryParam("destination", userLon + "," + userLat)
84+
.build())
85+
.header("Authorization", "KakaoAK " + kakaoApiKey)
86+
.retrieve()
87+
.bodyToMono(Map.class)
88+
.block(); // Virtual Thread에서는 block() 안전!
89+
90+
if (response != null) {
91+
Map<String, Object> routes = (Map<String, Object>) ((List<?>) response.get("routes"))
92+
.get(0);
93+
Map<String, Object> summary = (Map<String, Object>) routes.get("summary");
94+
Double duration = ((Number) summary.get("duration")).doubleValue(); // 초 단위
95+
double etaMinutes = duration / 60.0; // 분 단위 변환
96+
97+
return CompletableFuture.completedFuture(Map.entry(riderId, etaMinutes));
98+
}
99+
} catch (Exception e) {
100+
log.warn("Failed to calculate ETA for rider {}: {}", riderId, e.getMessage());
101+
}
102+
return CompletableFuture.completedFuture(null);
103+
}
104+
105+
/**
106+
* 상점 <-> 주문자 사이 거리 계산 (동기식) - Virtual Thread에서 블로킹 호출해도 효율적
107+
*/
108+
public Map<String, Double> getDistance(
63109
double storeLat, double storeLon,
64110
double userLat, double userLon
65111
) {
66112
WebClient webClient = webClientBuilder.baseUrl(KAKAO_BASE_URL).build();
67113

68-
return webClient.get()
69-
.uri(uriBuilder -> uriBuilder
70-
.path("/directions")
71-
.queryParam("origin", storeLon + "," + storeLat)
72-
.queryParam("destination", userLon + "," + userLat)
73-
.build())
74-
.header("Authorization", "KakaoAK " + kakaoApiKey)
75-
.retrieve()
76-
.bodyToMono(Map.class)
77-
.map(response -> {
78-
Map<String, Object> routes = (Map<String, Object>) ((List<?>) response.get("routes")).get(
79-
0);
80-
Map<String, Object> summary = (Map<String, Object>) routes.get("summary");
81-
Double distanceM = ((Number) summary.get("distance")).doubleValue(); // m 단위
82-
double distanceKm =
83-
Math.round((distanceM / 1000.0) * 100.0) / 100.0; // km 단위, 소수 둘째 자리 반올림
84-
85-
Map<String, Double> result = new HashMap<>();
86-
result.put("distance", distanceKm);
87-
return result;
88-
});
114+
// 가상 스레드 풀 사용
115+
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
116+
Future<Map<String, Double>> future = executor.submit(() -> {
117+
Map<String, Object> response = webClient.get()
118+
.uri(uriBuilder -> uriBuilder
119+
.path("/directions")
120+
.queryParam("origin", storeLon + "," + storeLat)
121+
.queryParam("destination", userLon + "," + userLat)
122+
.build())
123+
.header("Authorization", "KakaoAK " + kakaoApiKey)
124+
.retrieve()
125+
.bodyToMono(Map.class)
126+
.block();
127+
128+
if (response == null || !response.containsKey("routes")) {
129+
throw new IllegalStateException("Invalid API response");
130+
}
131+
132+
Map<String, Object> routes = (Map<String, Object>) ((List<?>) response.get("routes")).get(
133+
0);
134+
Map<String, Object> summary = (Map<String, Object>) routes.get("summary");
135+
Double distanceM = ((Number) summary.get("distance")).doubleValue(); // m 단위
136+
double distanceKm = Math.round((distanceM / 1000.0) * 100.0) / 100.0;
137+
138+
Map<String, Double> result = new HashMap<>();
139+
result.put("distance", distanceKm);
140+
return result;
141+
});
142+
return future.get();
143+
144+
} catch (ExecutionException | InterruptedException e) {
145+
throw new RuntimeException("Distance calculation failed", e);
146+
}
89147
}
90148
}

0 commit comments

Comments
 (0)