Skip to content

Commit eb2893b

Browse files
[feat] Redis Stream 메트릭 모니터링 구현 (E2E Latency + Lag Gauge) (#1100)
* feat: Redis Stream 메트릭 수집 기능과 End-to-End 지연 시간 측정 로직 추가 (#1098) - RedisStreamLagMetricService 구현: - Redis Stream 길이 및 스레드풀 상태를 수집하는 메트릭 추가(redis_stream_length, redis_stream_threadpool_queue_size, redis_stream_threadpool_active_count) - Prometheus Gauge를 활용한 메트릭 등록 - 스트림 길이가 임계치를 초과할 경우 경고 로그 출력 - RedisStreamLatencyMetricService 구현: - Redis 메시지 발행~소비 간 End-to-End 지연 시간 기록 - Timer 메트릭(redis_stream_e2e_latency_seconds) 등록 및 퍼센타일 데이터 수집 - EventDispatcher 개선: - RedisStreamLatencyMetricService를 통한 레이턴시 측정 로직 추가 - 이벤트 처리 시 레이턴시 로깅 및 경고 조건 추가 * test: Redis Stream 메트릭 서비스에 대한 단위 테스트 추가 - RedisStreamLagMetricServiceTest: - 스트림별 길이 게이지 등록 테스트 추가 - 여러 스트림의 게이지 등록 및 XLEN 실패 시 음수 반환 케이스 추가 - 스트림 설정이 없는 경우 게이지 미등록 확인 - RedisStreamLatencyMetricServiceTest: - 이벤트 처리 지연 시간 기록 및 누적 테스트 추가 - timestamp가 null이거나 음수 지연 시간 처리 검증 * feat: WebSocket 메시지 Rate Limiter Drop 메트릭 추가 - Micrometer Counter를 활용한 드롭된 메시지 수 메트릭(rateLimitDropCounter) 추가 - WebSocketRateLimiter의 생성자에 MeterRegistry 의존성 추가 - WebSocket 메시지가 Rate Limit에 의해 드롭될 경우 Counter 증가 처리 - 드롭된 메시지 수에 대한 지표 제공으로 모니터링 강화 * feat: Room 상태별 활성 수 및 전체 수 메트릭 수집 기능 추가 - RoomActiveMetricService 구현: - MemoryRoomRepository 기반의 상태별 활성 수 및 전체 Room 수 수집 - Prometheus에 메트릭 등록(room_active_count, room_total_count) - MemoryRoomRepository: - RoomState별 Room 수(countByState) 및 전체 Room 수(totalCount) 조회 메서드 추가 - RoomState별 활성 Room 모니터링 및 메트릭 제공 기능 강화 * test: Room 및 WebSocket 메트릭 서비스에 대한 단위 테스트 추가 - RoomActiveMetricServiceTest: - Room 상태별 활성 수 및 전체 수에 대한 게이지 등록/변경 테스트 추가 - Room 상태별 정확한 메트릭 집계 검증 - Room 생성/삭제 시 메트릭 변경 반영 테스트 추가 - WebSocketRateLimiterMetricTest: - 드롭된 WebSocket 메시지 수에 대한 카운터 증가 테스트 추가 - Rate Limit 초과 및 누적 드롭 카운터 테스트 추가 - Rate Limit 윈도우 리셋 후 카운터 누적 확인 테스트 추가 - 테스트를 통해 메트릭 서비스의 올바른 동작 검증 및 안정성 강화 * test: RoomActiveMetricServiceTest JoinCode 값 변경 테스트 케이스 수정 - 테스트 데이터의 JoinCode 값을 더 간단하게 수정 (예: "ABC12" -> "ABC1") - 여러 Room의 상태별 게이지 집계 및 생성/삭제 시 게이지 변경 반영 테스트 케이스 업데이트 - JoinCode 값을 통해 테스트의 가독성과 일관성 강화 * test: RoomActiveMetricServiceTest JoinCode 테스트 데이터 수정 - 테스트 데이터의 JoinCode 값 일부를 간단한 형식으로 변경 (예: "ABC1" -> "ABC4") - 여러 Room의 상태별 게이지 집계 및 생성/삭제 시 게이지 값 반영 테스트 데이터 업데이트 - JoinCode 값을 통해 테스트 환경의 일관성 및 가독성 강화 * refactor: Redis EventDispatcher 메서드 분리 및 지연 메트릭 기록 실패 처리 추가 - latencyMetricService.recordLatency 호출 로직 recordLatency 메서드로 분리 - Redis 지연 메트릭 기록 실패 시 경고 로그 출력 로직 추가 - 코드 가독성 및 에러 처리 강화 * refactor: MemoryRoomRepository totalCount 반환 타입 변경 - totalCount 메서드의 반환 타입을 int에서 long으로 변경 - 대규모 데이터를 다룰 때의 정확성 보장 * fix: Redis Stream 메트릭 기록 중 null 및 실패 케이스 처리 추가 - RedisStreamLatencyMetricService: streamLatencyTimer가 null인 경우 로직 early return 추가 - RedisStreamLagMetricService: XLEN 실패 시 음수 반환 시 비교 로직 스킵 추가 - 메트릭 기록 실패로 인한 오류 방지 및 안정성 강화 * test: RoomActiveMetricServiceTest 상태별 Gauge 등록 및 검증 로직 수정 - RoomState 열거형을 기반으로 모든 상태의 Gauge 등록 및 초기 값 검증 테스트 추가 - 기존 하드코딩된 상태 값("READY", "PLAYING" 등)을 RoomState 열거형으로 대체 - Gauge 값 검증 로직 및 메시지 형식 개선 (as 메서드 활용) - JoinCode 테스트 데이터 일부 수정 및 일관성 강화 - 테스트 코드 가독성 및 유지보수성 증대 * test: RoomActiveMetricServiceTest JoinCode 테스트 데이터 수정 - JoinCode 값을 간단한 형식으로 변경하여 테스트 데이터 일관성 강화 (예: "ABC12" -> "ABC4") - Room 생성/삭제 및 상태별 게이지 집계 로직에 사용되는 JoinCode 데이터 업데이트 - 테스트 가독성 및 유지보수성 개선 * fix: RedisStreamLagMetricService 메서드 반환값 NaN으로 통일 - XLEN 조회 실패 및 ThreadPool 관련 메서드 실패 시 반환값을 -1.0에서 Double.NaN으로 변경 - 실패 시 일관된 반환값 제공으로 비교 로직의 명확성 및 코드 가독성 강화 - 로그 메시지와 반환값의 역할 분리로 디버깅 용이성 향상 * refactor: RedisStreamLagMetricService 메서드 분리 및 가독성 개선 - resolveBeanName 메서드 분리로 ThreadPoolTaskExecutor 로직 단순화 - config 및 streamKey 기반 BeanName 결정 로직 개선 - 코드의 명확성 및 유지보수성 강화 * fix: RedisStreamLagMetricService NaN 비교 로직 수정 - XLEN 조회 실패 시 Double.isNaN(length)를 사용하여 비교 로직 명확성 강화 - 실패 케이스 처리의 일관성 및 코드 가독성 개선 * refactor: RedisStreamLagMetricService 로그 메서드 제거 - @scheduled 어노테이션을 사용한 logStreamStatus 메서드 제거 - 스트림 길이 경고 로그 기록 로직 삭제로 코드 단순화 및 유지보수성 개선 * test: RedisStreamLagMetricServiceTest NaN 반환값 관련 테스트 수정 - 빈을 못 찾거나 XLEN 조회 실패 시 반환값을 NaN으로 변경한 로직에 대한 테스트 케이스 수정 - 기존 -1.0 반환 검증을 Double.NaN 및 isNaN() 검증으로 업데이트 - 테스트 코드를 최신 로직에 맞게 수정하여 일관성 및 가독성 강화 * fix: RedisStreamLagMetricService NaN 반환값 정정 - 활성 스레드 수 조회 실패 시 -Double.NaN 반환을 Double.NaN 반환으로 수정 - 잘못된 반환값 처리로 인한 오해 방지 및 코드 명확성 강화 * test: RedisStreamLagMetricServiceTest NaN 검증 방식 수정 - Double.NaN 검증을 isNaN() 검증으로 변경 - 테스트 코드 가독성 및 명확성 강화
1 parent dea889d commit eb2893b

File tree

10 files changed

+800
-3
lines changed

10 files changed

+800
-3
lines changed
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
package coffeeshout.global.metric;
2+
3+
import coffeeshout.global.redis.config.RedisStreamProperties;
4+
import coffeeshout.global.redis.config.RedisStreamProperties.StreamConfig;
5+
import coffeeshout.global.redis.config.RedisStreamThreadPoolConfig;
6+
import io.micrometer.core.instrument.Gauge;
7+
import io.micrometer.core.instrument.MeterRegistry;
8+
import jakarta.annotation.PostConstruct;
9+
import java.util.Map;
10+
import java.util.concurrent.ThreadPoolExecutor;
11+
import lombok.extern.slf4j.Slf4j;
12+
import org.springframework.context.ApplicationContext;
13+
import org.springframework.data.redis.core.StringRedisTemplate;
14+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
15+
import org.springframework.stereotype.Component;
16+
17+
/**
18+
* Redis Stream의 공간적 백로그(lag)를 주기적으로 수집한다.
19+
*
20+
* <p>현재 ZZOL은 Consumer Group 없이 StreamOffset.fromStart()로 소비하므로,
21+
* XINFO GROUPS 대신 XLEN(스트림 길이)과 스레드풀 큐 깊이를 조합하여 측정한다.</p>
22+
*
23+
* <p>Redis 부하 최소화:
24+
* <ul>
25+
* <li>XLEN은 O(1) 연산이므로 Redis에 거의 부하를 주지 않음</li>
26+
* <li>스레드풀 큐 깊이는 JVM 내부 조회이므로 Redis 호출 없음</li>
27+
* <li>Gauge는 Prometheus scrape 시점에만 평가되므로 불필요한 polling 없음</li>
28+
* </ul>
29+
* </p>
30+
*
31+
* <p>Prometheus 메트릭명:
32+
* <ul>
33+
* <li>redis_stream_length (tag: stream)</li>
34+
* <li>redis_stream_threadpool_queue_size (tag: stream)</li>
35+
* <li>redis_stream_threadpool_active_count (tag: stream)</li>
36+
* </ul>
37+
* </p>
38+
*/
39+
@Slf4j
40+
@Component
41+
public class RedisStreamLagMetricService {
42+
43+
private final StringRedisTemplate stringRedisTemplate;
44+
private final RedisStreamProperties redisStreamProperties;
45+
private final MeterRegistry meterRegistry;
46+
private final ApplicationContext applicationContext;
47+
48+
public RedisStreamLagMetricService(
49+
StringRedisTemplate stringRedisTemplate,
50+
RedisStreamProperties redisStreamProperties,
51+
MeterRegistry meterRegistry,
52+
ApplicationContext applicationContext
53+
) {
54+
this.stringRedisTemplate = stringRedisTemplate;
55+
this.redisStreamProperties = redisStreamProperties;
56+
this.meterRegistry = meterRegistry;
57+
this.applicationContext = applicationContext;
58+
}
59+
60+
@PostConstruct
61+
public void initializeMetrics() {
62+
if (redisStreamProperties.keys() == null) {
63+
log.warn("Redis Stream 설정이 없습니다. Lag 메트릭을 등록하지 않습니다.");
64+
return;
65+
}
66+
67+
for (Map.Entry<String, StreamConfig> entry : redisStreamProperties.keys().entrySet()) {
68+
final String streamKey = entry.getKey();
69+
final StreamConfig streamConfig = entry.getValue();
70+
71+
// 1) 스트림 길이 Gauge (XLEN - O(1))
72+
Gauge.builder("redis.stream.length", () -> getStreamLength(streamKey))
73+
.description("Redis Stream의 현재 메시지 수 (XLEN)")
74+
.tag("stream", streamKey)
75+
.register(meterRegistry);
76+
77+
// 2) 스레드풀 큐 깊이 Gauge
78+
Gauge.builder("redis.stream.threadpool.queue.size",
79+
() -> getThreadPoolQueueSize(streamKey, streamConfig))
80+
.description("Redis Stream 컨슈머 스레드풀의 대기 큐 크기")
81+
.tag("stream", streamKey)
82+
.register(meterRegistry);
83+
84+
// 3) 스레드풀 활성 스레드 수
85+
Gauge.builder("redis.stream.threadpool.active.count",
86+
() -> getThreadPoolActiveCount(streamKey, streamConfig))
87+
.description("Redis Stream 컨슈머 스레드풀의 활성 스레드 수")
88+
.tag("stream", streamKey)
89+
.register(meterRegistry);
90+
}
91+
92+
log.info("Redis Stream Lag 메트릭 등록 완료: streams={}",
93+
redisStreamProperties.keys().keySet());
94+
}
95+
96+
private double getStreamLength(String streamKey) {
97+
try {
98+
Long length = stringRedisTemplate.opsForStream().size(streamKey);
99+
return length != null ? length.doubleValue() : 0.0;
100+
} catch (Exception e) {
101+
log.warn("XLEN 조회 실패: stream={}", streamKey, e);
102+
return Double.NaN;
103+
}
104+
}
105+
106+
private double getThreadPoolQueueSize(String streamKey, StreamConfig config) {
107+
try {
108+
ThreadPoolTaskExecutor executor = resolveExecutor(streamKey, config);
109+
if (executor == null) {
110+
return Double.NaN;
111+
}
112+
113+
ThreadPoolExecutor threadPoolExecutor = executor.getThreadPoolExecutor();
114+
return threadPoolExecutor.getQueue().size();
115+
} catch (Exception e) {
116+
log.debug("스레드풀 큐 크기 조회 실패: stream={}", streamKey, e);
117+
return Double.NaN;
118+
}
119+
}
120+
121+
private double getThreadPoolActiveCount(String streamKey, StreamConfig config) {
122+
try {
123+
ThreadPoolTaskExecutor executor = resolveExecutor(streamKey, config);
124+
if (executor == null) {
125+
return Double.NaN;
126+
}
127+
128+
return executor.getThreadPoolExecutor().getActiveCount();
129+
} catch (Exception e) {
130+
log.debug("활성 스레드 수 조회 실패: stream={}", streamKey, e);
131+
return Double.NaN;
132+
}
133+
}
134+
135+
private ThreadPoolTaskExecutor resolveExecutor(String streamKey, StreamConfig config) {
136+
try {
137+
String beanName = resolveBeanName(streamKey, config);
138+
Object bean = applicationContext.getBean(beanName);
139+
140+
if (!(bean instanceof ThreadPoolTaskExecutor executor)) {
141+
return null;
142+
}
143+
144+
return executor;
145+
} catch (Exception e) {
146+
return null;
147+
}
148+
}
149+
150+
private String resolveBeanName(String streamKey, StreamConfig config) {
151+
if (config != null && config.threadPoolName() != null) {
152+
return RedisStreamThreadPoolConfig.convertBeanName(config.threadPoolName());
153+
}
154+
155+
return RedisStreamThreadPoolConfig.convertBeanName(streamKey);
156+
}
157+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package coffeeshout.global.metric;
2+
3+
import coffeeshout.global.redis.BaseEvent;
4+
import io.micrometer.core.instrument.MeterRegistry;
5+
import io.micrometer.core.instrument.Timer;
6+
import jakarta.annotation.PostConstruct;
7+
import java.time.Duration;
8+
import java.time.Instant;
9+
import lombok.extern.slf4j.Slf4j;
10+
import org.springframework.stereotype.Component;
11+
12+
/**
13+
* Redis Stream 메시지의 End-to-End 지연 시간을 측정한다.
14+
*
15+
* <p>BaseEvent.timestamp() (발행 시점) ~ 소비 시점 간의 시간차를 Micrometer Timer로 기록한다.
16+
* EventDispatcher에서 이벤트 처리 직전에 호출한다.</p>
17+
*
18+
* <p>Prometheus 메트릭명: redis_stream_e2e_latency_seconds</p>
19+
*/
20+
@Slf4j
21+
@Component
22+
public class RedisStreamLatencyMetricService {
23+
24+
private final MeterRegistry meterRegistry;
25+
private Timer streamLatencyTimer;
26+
27+
public RedisStreamLatencyMetricService(MeterRegistry meterRegistry) {
28+
this.meterRegistry = meterRegistry;
29+
}
30+
31+
@PostConstruct
32+
public void initializeMetrics() {
33+
this.streamLatencyTimer = Timer.builder("redis.stream.e2e.latency")
34+
.description("Redis Stream 메시지 발행~소비 간 End-to-End 지연 시간")
35+
.publishPercentiles(0.5, 0.95, 0.99)
36+
.publishPercentileHistogram()
37+
.register(meterRegistry);
38+
}
39+
40+
/**
41+
* 이벤트의 발행 타임스탬프와 현재 시각의 차이를 기록한다.
42+
*
43+
* @param event 소비된 Redis Stream 이벤트
44+
*/
45+
public void recordLatency(BaseEvent event) {
46+
if (streamLatencyTimer == null) {
47+
return;
48+
}
49+
50+
if (event.timestamp() == null) {
51+
log.warn("이벤트에 timestamp가 없습니다: eventId={}", event.eventId());
52+
return;
53+
}
54+
55+
final Duration latency = Duration.between(event.timestamp(), Instant.now());
56+
57+
if (latency.isNegative()) {
58+
log.warn("음수 지연 감지 (clock skew 의심): eventId={}, latency={}ms",
59+
event.eventId(), latency.toMillis());
60+
return;
61+
}
62+
63+
streamLatencyTimer.record(latency);
64+
65+
if (latency.toMillis() > 50) {
66+
log.warn("Redis Stream 지연 50ms 초과: eventId={}, latency={}ms, eventType={}",
67+
event.eventId(), latency.toMillis(), event.getClass().getSimpleName());
68+
}
69+
}
70+
}
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package coffeeshout.global.metric;
2+
3+
import coffeeshout.room.domain.RoomState;
4+
import coffeeshout.room.domain.repository.MemoryRoomRepository;
5+
import io.micrometer.core.instrument.Gauge;
6+
import io.micrometer.core.instrument.MeterRegistry;
7+
import jakarta.annotation.PostConstruct;
8+
import lombok.extern.slf4j.Slf4j;
9+
import org.springframework.stereotype.Component;
10+
11+
/**
12+
* Room 상태별 활성 수를 Gauge로 수집한다.
13+
*
14+
* <p>MemoryRoomRepository의 인메모리 Room 데이터를 기반으로
15+
* RoomState별(READY, PLAYING, SCORE_BOARD, ROULETTE, DONE) 활성 Room 수와
16+
* 전체 Room 수를 Prometheus에 노출한다.</p>
17+
*
18+
* <p>Prometheus 메트릭명:
19+
* <ul>
20+
* <li>room_active_count (tag: state)</li>
21+
* <li>room_total_count</li>
22+
* </ul>
23+
* </p>
24+
*/
25+
@Slf4j
26+
@Component
27+
public class RoomActiveMetricService {
28+
29+
private final MemoryRoomRepository memoryRoomRepository;
30+
private final MeterRegistry meterRegistry;
31+
32+
public RoomActiveMetricService(MemoryRoomRepository memoryRoomRepository, MeterRegistry meterRegistry) {
33+
this.memoryRoomRepository = memoryRoomRepository;
34+
this.meterRegistry = meterRegistry;
35+
}
36+
37+
@PostConstruct
38+
public void initializeMetrics() {
39+
for (RoomState state : RoomState.values()) {
40+
Gauge.builder("room.active.count", () -> memoryRoomRepository.countByState(state))
41+
.description("상태별 활성 Room 수")
42+
.tag("state", state.name())
43+
.register(meterRegistry);
44+
}
45+
46+
Gauge.builder("room.total.count", memoryRoomRepository::totalCount)
47+
.description("전체 활성 Room 수")
48+
.register(meterRegistry);
49+
50+
log.info("Room 활성 수 메트릭 등록 완료: states={}", RoomState.values().length);
51+
}
52+
}

backend/src/main/java/coffeeshout/global/redis/EventDispatcher.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package coffeeshout.global.redis;
22

3+
import coffeeshout.global.metric.RedisStreamLatencyMetricService;
34
import coffeeshout.global.trace.Traceable;
45
import coffeeshout.global.trace.TracerProvider;
56
import java.util.function.Consumer;
@@ -17,10 +18,13 @@ public class EventDispatcher {
1718

1819
private final TracerProvider tracerProvider;
1920
private final ApplicationContext applicationContext;
21+
private final RedisStreamLatencyMetricService latencyMetricService;
2022

2123
@SuppressWarnings("unchecked")
2224
public void handle(BaseEvent event) {
2325
try {
26+
recordLatency(event);
27+
2428
final Consumer<BaseEvent> consumer = (Consumer<BaseEvent>) getConsumer(event.getClass());
2529
final Runnable handling = () -> consumer.accept(event);
2630

@@ -35,6 +39,14 @@ public void handle(BaseEvent event) {
3539
}
3640
}
3741

42+
private void recordLatency(BaseEvent event) {
43+
try {
44+
latencyMetricService.recordLatency(event);
45+
} catch (Exception e) {
46+
log.warn("Redis Stream 지연 메트릭 기록 실패: eventId={}", event.eventId(), e);
47+
}
48+
}
49+
3850
private <T extends BaseEvent> Consumer<T> getConsumer(Class<T> eventType) {
3951
final ResolvableType type = ResolvableType.forClassWithGenerics(Consumer.class, eventType);
4052
final ObjectProvider<Consumer<T>> provider = applicationContext.getBeanProvider(type);

backend/src/main/java/coffeeshout/global/websocket/ratelimit/WebSocketRateLimiter.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package coffeeshout.global.websocket.ratelimit;
22

3+
import io.micrometer.core.instrument.Counter;
4+
import io.micrometer.core.instrument.MeterRegistry;
35
import java.time.Clock;
46
import java.util.Map;
57
import java.util.concurrent.ConcurrentHashMap;
@@ -27,18 +29,31 @@ public class WebSocketRateLimiter {
2729

2830
private final int maxMessagesPerSecond;
2931
private final Clock clock;
32+
private final Counter rateLimitDropCounter;
3033
private final Map<String, SessionCounter> sessionCounters = new ConcurrentHashMap<>();
3134

3235
@Autowired
3336
public WebSocketRateLimiter(
34-
@Value("${websocket.rate-limit.max-messages-per-second:20}") int maxMessagesPerSecond
37+
@Value("${websocket.rate-limit.max-messages-per-second:20}") int maxMessagesPerSecond,
38+
MeterRegistry meterRegistry
3539
) {
36-
this(maxMessagesPerSecond, Clock.systemUTC());
40+
this(maxMessagesPerSecond, Clock.systemUTC(), meterRegistry);
3741
}
3842

3943
WebSocketRateLimiter(int maxMessagesPerSecond, Clock clock) {
44+
this(maxMessagesPerSecond, clock, null);
45+
}
46+
47+
WebSocketRateLimiter(int maxMessagesPerSecond, Clock clock, MeterRegistry meterRegistry) {
4048
this.maxMessagesPerSecond = maxMessagesPerSecond;
4149
this.clock = clock;
50+
if (meterRegistry != null) {
51+
this.rateLimitDropCounter = Counter.builder("websocket.ratelimit.dropped.total")
52+
.description("WebSocket Rate Limit에 의해 드롭된 메시지 수")
53+
.register(meterRegistry);
54+
} else {
55+
this.rateLimitDropCounter = null;
56+
}
4257
}
4358

4459
/**
@@ -51,7 +66,11 @@ public boolean tryAcquire(String sessionId) {
5166
final SessionCounter counter = sessionCounters.computeIfAbsent(
5267
sessionId, k -> new SessionCounter(clock.millis())
5368
);
54-
return counter.tryAcquire(maxMessagesPerSecond, clock.millis());
69+
boolean allowed = counter.tryAcquire(maxMessagesPerSecond, clock.millis());
70+
if (!allowed && rateLimitDropCounter != null) {
71+
rateLimitDropCounter.increment();
72+
}
73+
return allowed;
5574
}
5675

5776
/**

0 commit comments

Comments
 (0)