Skip to content

Commit c196140

Browse files
committed
temp
1 parent 636ecc6 commit c196140

File tree

9 files changed

+295
-6
lines changed

9 files changed

+295
-6
lines changed

backend/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ dependencies {
5454
testImplementation 'org.testcontainers:junit-jupiter'
5555
testImplementation "com.redis:testcontainers-redis:2.2.4"
5656

57+
/* KAFKA */
58+
implementation 'org.springframework.kafka:spring-kafka'
59+
5760
/* ETC */
5861
implementation 'org.apache.commons:commons-lang3:3.12.0'
5962
annotationProcessor 'org.projectlombok:lombok'
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package io.f1.backend.domain.stat.app;
2+
3+
import io.f1.backend.domain.stat.dao.StatBatchRepository;
4+
import io.f1.backend.domain.stat.dto.StatChangeEvent;
5+
6+
import lombok.RequiredArgsConstructor;
7+
import lombok.extern.slf4j.Slf4j;
8+
9+
import org.springframework.kafka.annotation.KafkaListener;
10+
import org.springframework.stereotype.Component;
11+
import org.springframework.transaction.annotation.Transactional;
12+
13+
import java.util.List;
14+
15+
@Slf4j
16+
@Component
17+
@RequiredArgsConstructor
18+
public class StatKafkaConsumer {
19+
20+
private final StatBatchRepository statBatchRepository;
21+
22+
@KafkaListener(
23+
topics = "stat-changes",
24+
groupId = "stat-sync-group"
25+
)
26+
@Transactional
27+
public void handleStatChanges(List<StatChangeEvent> events) {
28+
29+
log.info("Processing {} stat change events", events.size());
30+
31+
try {
32+
// 500개 이벤트를 2번의 쿼리로 처리 (Win/Lose 분리)
33+
statBatchRepository.batchUpdateStats(events);
34+
35+
log.info("Successfully processed {} events in batch", events.size());
36+
37+
} catch (Exception e) {
38+
log.error("Failed to process stat change events", e);
39+
throw e; // 트랜잭션 롤백을 위해 예외 재발생
40+
}
41+
}
42+
}

backend/src/main/java/io/f1/backend/domain/stat/app/StatService.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package io.f1.backend.domain.stat.app;
22

33
import io.f1.backend.domain.stat.dao.StatRepository;
4+
import io.f1.backend.domain.stat.dto.StatChangeEvent;
45
import io.f1.backend.domain.stat.dto.StatPageResponse;
56
import io.f1.backend.global.exception.CustomException;
67
import io.f1.backend.global.exception.errorcode.RoomErrorCode;
8+
import io.f1.backend.global.util.kafka.KafkaProducer;
79

810
import lombok.RequiredArgsConstructor;
911

@@ -18,6 +20,7 @@
1820
public class StatService {
1921

2022
private final StatRepository statRepository;
23+
private final KafkaProducer kafkaProducer;
2124

2225
@Transactional(readOnly = true)
2326
public StatPageResponse getRanks(Pageable pageable, String nickname) {
@@ -36,9 +39,14 @@ public StatPageResponse getRanks(Pageable pageable, String nickname) {
3639
return response;
3740
}
3841

39-
// TODO: 게임 종료 후 호출 필요
42+
// 게임 종료 후 호출됨 - Redis 즉시 업데이트 + Kafka 이벤트 발행
4043
public void updateRank(long userId, boolean win, int deltaScore) {
44+
// 1. Redis에서 즉시 업데이트 (실시간 성능)
4145
statRepository.updateRank(userId, win, deltaScore);
46+
47+
// 2. Kafka로 변경 이벤트 전송 (배치 동기화용)
48+
StatChangeEvent event = StatChangeEvent.of(userId, win, deltaScore);
49+
kafkaProducer.sendWithKey("stat-changes", String.valueOf(userId), event);
4250
}
4351

4452
public void addUser(long userId, String nickname) {
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package io.f1.backend.domain.stat.dao;
2+
3+
import io.f1.backend.domain.stat.dto.StatChangeEvent;
4+
import lombok.RequiredArgsConstructor;
5+
import lombok.extern.slf4j.Slf4j;
6+
import org.springframework.jdbc.core.JdbcTemplate;
7+
import org.springframework.stereotype.Repository;
8+
9+
import java.util.List;
10+
import java.util.stream.Collectors;
11+
12+
@Slf4j
13+
@Repository
14+
@RequiredArgsConstructor
15+
public class StatBatchRepository {
16+
17+
private final JdbcTemplate jdbcTemplate;
18+
19+
public void batchUpdateStats(List<StatChangeEvent> events) {
20+
if (events.isEmpty()) {
21+
return;
22+
}
23+
24+
// Win 이벤트와 Lose 이벤트 분리
25+
List<StatChangeEvent> winEvents = events.stream()
26+
.filter(StatChangeEvent::isWin)
27+
.toList();
28+
29+
List<StatChangeEvent> loseEvents = events.stream()
30+
.filter(e -> !e.isWin())
31+
.toList();
32+
33+
// Win 업데이트 (1번의 쿼리)
34+
if (!winEvents.isEmpty()) {
35+
batchUpdateWinStats(winEvents);
36+
}
37+
38+
// Lose 업데이트 (1번의 쿼리)
39+
if (!loseEvents.isEmpty()) {
40+
batchUpdateLoseStats(loseEvents);
41+
}
42+
}
43+
44+
private void batchUpdateWinStats(List<StatChangeEvent> events) {
45+
StringBuilder sql = new StringBuilder("""
46+
UPDATE stat SET
47+
total_games = total_games + 1,
48+
winning_games = winning_games + 1,
49+
score = score + CASE user_id
50+
""");
51+
52+
// CASE WHEN 절 생성
53+
for (StatChangeEvent event : events) {
54+
sql.append(String.format("WHEN %d THEN %d ", event.getUserId(), event.getDeltaScore()));
55+
}
56+
57+
sql.append("END WHERE user_id IN (");
58+
sql.append(events.stream()
59+
.map(e -> String.valueOf(e.getUserId()))
60+
.collect(Collectors.joining(",")));
61+
sql.append(")");
62+
63+
int updatedRows = jdbcTemplate.update(sql.toString());
64+
log.debug("Batch updated {} win stats", updatedRows);
65+
}
66+
67+
private void batchUpdateLoseStats(List<StatChangeEvent> events) {
68+
StringBuilder sql = new StringBuilder("""
69+
UPDATE stat SET
70+
total_games = total_games + 1,
71+
score = score + CASE user_id
72+
""");
73+
74+
// CASE WHEN 절 생성
75+
for (StatChangeEvent event : events) {
76+
sql.append(String.format("WHEN %d THEN %d ", event.getUserId(), event.getDeltaScore()));
77+
}
78+
79+
sql.append("END WHERE user_id IN (");
80+
sql.append(events.stream()
81+
.map(e -> String.valueOf(e.getUserId()))
82+
.collect(Collectors.joining(",")));
83+
sql.append(")");
84+
85+
int updatedRows = jdbcTemplate.update(sql.toString());
86+
log.debug("Batch updated {} lose stats", updatedRows);
87+
}
88+
}
89+

backend/src/main/java/io/f1/backend/domain/stat/dao/StatRepositoryAdapter.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,6 @@ public void addUser(long userId, String nickname) {
5959
@Override
6060
public void updateRank(long userId, boolean win, int deltaScore) {
6161
redisRepository.updateRank(userId, win, deltaScore);
62-
if (win) {
63-
jpaRepository.updateStatByUserIdCaseWin(deltaScore, userId);
64-
} else {
65-
jpaRepository.updateStatByUserIdCaseLose(deltaScore, userId);
66-
}
6762
}
6863

6964
@Override
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package io.f1.backend.domain.stat.dto;
2+
3+
import java.time.LocalDateTime;
4+
5+
import lombok.Builder;
6+
import lombok.Getter;
7+
8+
@Getter
9+
@Builder
10+
public class StatChangeEvent {
11+
12+
private Long userId;
13+
private boolean win;
14+
private int deltaScore;
15+
private LocalDateTime timestamp;
16+
private String eventType; // "GAME_RESULT", "SCORE_UPDATE", etc.
17+
18+
public static StatChangeEvent of(Long userId, boolean win, int deltaScore) {
19+
return StatChangeEvent.builder()
20+
.userId(userId)
21+
.win(win)
22+
.deltaScore(deltaScore)
23+
.timestamp(LocalDateTime.now())
24+
.eventType("GAME_RESULT")
25+
.build();
26+
}
27+
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package io.f1.backend.global.config;
2+
3+
import java.util.HashMap;
4+
import java.util.Map;
5+
6+
import org.apache.kafka.clients.consumer.ConsumerConfig;
7+
import org.apache.kafka.clients.producer.ProducerConfig;
8+
import org.apache.kafka.common.serialization.StringDeserializer;
9+
import org.apache.kafka.common.serialization.StringSerializer;
10+
import org.springframework.beans.factory.annotation.Value;
11+
import org.springframework.context.annotation.Bean;
12+
import org.springframework.context.annotation.Configuration;
13+
import org.springframework.kafka.annotation.EnableKafka;
14+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
15+
import org.springframework.kafka.core.ConsumerFactory;
16+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
17+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
18+
import org.springframework.kafka.core.KafkaTemplate;
19+
import org.springframework.kafka.core.ProducerFactory;
20+
import org.springframework.kafka.listener.ContainerProperties;
21+
22+
@EnableKafka
23+
@Configuration
24+
public class KafkaConfig {
25+
26+
@Value("${spring.kafka.bootstrap-servers}")
27+
private String bootstrapServers;
28+
29+
@Bean
30+
public ProducerFactory<String, String> producerFactory() {
31+
Map<String, Object> properties = new HashMap<>();
32+
33+
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
34+
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
35+
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
36+
properties.put(ProducerConfig.ACKS_CONFIG, "all");
37+
properties.put(ProducerConfig.RETRIES_CONFIG, 3);
38+
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
39+
properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
40+
41+
return new DefaultKafkaProducerFactory<>(properties);
42+
}
43+
44+
@Bean
45+
public KafkaTemplate<String, String> kafkaTemplate() {
46+
return new KafkaTemplate<>(producerFactory());
47+
}
48+
49+
@Bean
50+
public ConsumerFactory<String, String> consumerFactory() {
51+
Map<String, Object> properties = new HashMap<>();
52+
53+
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
54+
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "stat-sync-group");
55+
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
56+
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
57+
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
58+
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
59+
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
60+
61+
return new DefaultKafkaConsumerFactory<>(properties);
62+
}
63+
64+
@Bean
65+
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
66+
ConcurrentKafkaListenerContainerFactory<String, String> factory =
67+
new ConcurrentKafkaListenerContainerFactory<>();
68+
factory.setConsumerFactory(consumerFactory());
69+
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
70+
return factory;
71+
}
72+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package io.f1.backend.global.util.kafka;
2+
3+
import org.springframework.kafka.core.KafkaTemplate;
4+
import org.springframework.stereotype.Component;
5+
import com.fasterxml.jackson.core.JsonProcessingException;
6+
import com.fasterxml.jackson.databind.ObjectMapper;
7+
8+
import lombok.RequiredArgsConstructor;
9+
import lombok.extern.slf4j.Slf4j;
10+
11+
@Slf4j
12+
@Component
13+
@RequiredArgsConstructor
14+
public class KafkaProducer {
15+
16+
private final KafkaTemplate<String, String> kafkaTemplate;
17+
private final ObjectMapper objectMapper = new ObjectMapper();
18+
19+
public void sendWithKey(String topic, String key, Object object) {
20+
String jsonMessage = convertToJson(object);
21+
if (jsonMessage != null) {
22+
kafkaTemplate.send(topic, key, jsonMessage);
23+
}
24+
}
25+
26+
private String convertToJson(Object object) {
27+
try {
28+
return objectMapper.writeValueAsString(object);
29+
} catch (JsonProcessingException e) {
30+
log.error("KafkaProducer Json Processing error", e);
31+
return null;
32+
}
33+
}
34+
}

backend/src/main/resources/application.yml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,25 @@ spring:
3838
format_sql: true
3939
dialect: org.hibernate.dialect.MySQLDialect
4040

41+
kafka:
42+
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
43+
consumer:
44+
group-id: stat-sync-group
45+
auto-offset-reset: earliest
46+
enable-auto-commit: false
47+
max-poll-records: 100
48+
fetch-min-size: 1
49+
fetch-max-wait: 500
50+
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
51+
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
52+
producer:
53+
key-serializer: org.apache.kafka.common.serialization.StringSerializer
54+
value-serializer: org.apache.kafka.common.serialization.StringSerializer
55+
acks: all
56+
retries: 3
57+
batch-size: 16384
58+
linger-ms: 5
59+
4160
security:
4261
oauth2:
4362
client:

0 commit comments

Comments
 (0)