Skip to content

Commit 4b07f92

Browse files
committed
feat: Kafka 이용한 Stat 비동기 업데이트 진행
1 parent 636ecc6 commit 4b07f92

File tree

12 files changed

+349
-6
lines changed

12 files changed

+349
-6
lines changed

backend/build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,10 @@ dependencies {
5353
testImplementation 'com.github.database-rider:rider-spring:1.44.0'
5454
testImplementation 'org.testcontainers:junit-jupiter'
5555
testImplementation "com.redis:testcontainers-redis:2.2.4"
56+
testImplementation 'org.testcontainers:kafka:1.21.3'
57+
58+
/* KAFKA */
59+
implementation 'org.springframework.kafka:spring-kafka'
5660

5761
/* ETC */
5862
implementation 'org.apache.commons:commons-lang3:3.12.0'
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package io.f1.backend.domain.stat.app;
2+
3+
import io.f1.backend.domain.stat.dao.StatBatchRepository;
4+
import io.f1.backend.domain.stat.dto.StatChangeEvent;
5+
6+
import lombok.RequiredArgsConstructor;
7+
import lombok.extern.slf4j.Slf4j;
8+
9+
import org.springframework.kafka.annotation.KafkaListener;
10+
import org.springframework.stereotype.Component;
11+
import org.springframework.transaction.annotation.Transactional;
12+
13+
import com.fasterxml.jackson.databind.ObjectMapper;
14+
15+
import java.util.ArrayList;
16+
import java.util.List;
17+
18+
@Slf4j
19+
@Component
20+
@RequiredArgsConstructor
21+
public class StatKafkaConsumer {
22+
23+
private final StatBatchRepository statBatchRepository;
24+
private final ObjectMapper objectMapper;
25+
26+
@Transactional
27+
@KafkaListener(topics = "stat-changes")
28+
public void handleStatChanges(List<String> messages) {
29+
log.info("Received {} messages from Kafka", messages.size());
30+
31+
try {
32+
// JSON 문자열을 StatChangeEvent 객체로 변환
33+
List<StatChangeEvent> events = new ArrayList<>();
34+
for (String message : messages) {
35+
StatChangeEvent event = objectMapper.readValue(message, StatChangeEvent.class);
36+
events.add(event);
37+
}
38+
39+
log.info("Processing {} stat change events", events.size());
40+
statBatchRepository.batchUpdateStats(events);
41+
42+
} catch (Exception e) {
43+
log.error("Failed to process stat change events", e);
44+
throw new RuntimeException("Failed to process stat change events", e);
45+
}
46+
}
47+
}

backend/src/main/java/io/f1/backend/domain/stat/app/StatService.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package io.f1.backend.domain.stat.app;
22

33
import io.f1.backend.domain.stat.dao.StatRepository;
4+
import io.f1.backend.domain.stat.dto.StatChangeEvent;
45
import io.f1.backend.domain.stat.dto.StatPageResponse;
56
import io.f1.backend.global.exception.CustomException;
67
import io.f1.backend.global.exception.errorcode.RoomErrorCode;
8+
import io.f1.backend.global.util.kafka.KafkaProducer;
79

810
import lombok.RequiredArgsConstructor;
911

@@ -18,6 +20,7 @@
1820
public class StatService {
1921

2022
private final StatRepository statRepository;
23+
private final KafkaProducer kafkaProducer;
2124

2225
@Transactional(readOnly = true)
2326
public StatPageResponse getRanks(Pageable pageable, String nickname) {
@@ -36,9 +39,11 @@ public StatPageResponse getRanks(Pageable pageable, String nickname) {
3639
return response;
3740
}
3841

39-
// TODO: 게임 종료 후 호출 필요
4042
public void updateRank(long userId, boolean win, int deltaScore) {
4143
statRepository.updateRank(userId, win, deltaScore);
44+
45+
StatChangeEvent event = StatChangeEvent.of(userId, win, deltaScore);
46+
kafkaProducer.sendWithKey("stat-changes", String.valueOf(userId), event);
4247
}
4348

4449
public void addUser(long userId, String nickname) {
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package io.f1.backend.domain.stat.dao;
2+
3+
import io.f1.backend.domain.stat.dto.StatChangeEvent;
4+
import lombok.RequiredArgsConstructor;
5+
import lombok.extern.slf4j.Slf4j;
6+
import org.springframework.jdbc.core.JdbcTemplate;
7+
import org.springframework.stereotype.Repository;
8+
9+
import java.util.List;
10+
import java.util.stream.Collectors;
11+
12+
@Slf4j
13+
@Repository
14+
@RequiredArgsConstructor
15+
public class StatBatchRepository {
16+
17+
private final JdbcTemplate jdbcTemplate;
18+
19+
public void batchUpdateStats(List<StatChangeEvent> events) {
20+
if (events.isEmpty()) {
21+
return;
22+
}
23+
24+
List<StatChangeEvent> winEvents = events.stream()
25+
.filter(StatChangeEvent::isWin)
26+
.toList();
27+
28+
List<StatChangeEvent> loseEvents = events.stream()
29+
.filter(e -> !e.isWin())
30+
.toList();
31+
32+
if (!winEvents.isEmpty()) {
33+
batchUpdateWinStats(winEvents);
34+
}
35+
36+
if (!loseEvents.isEmpty()) {
37+
batchUpdateLoseStats(loseEvents);
38+
}
39+
}
40+
41+
private void batchUpdateWinStats(List<StatChangeEvent> events) {
42+
StringBuilder sql = new StringBuilder("""
43+
UPDATE stat SET
44+
total_games = total_games + 1,
45+
winning_games = winning_games + 1,
46+
score = score + CASE user_id
47+
""");
48+
49+
for (StatChangeEvent event : events) {
50+
sql.append(String.format("WHEN %d THEN %d ", event.getUserId(), event.getDeltaScore()));
51+
}
52+
53+
sql.append("END WHERE user_id IN (");
54+
sql.append(events.stream()
55+
.map(e -> String.valueOf(e.getUserId()))
56+
.collect(Collectors.joining(",")));
57+
sql.append(")");
58+
59+
int updatedRows = jdbcTemplate.update(sql.toString());
60+
log.debug("Batch updated {} win stats", updatedRows);
61+
}
62+
63+
private void batchUpdateLoseStats(List<StatChangeEvent> events) {
64+
StringBuilder sql = new StringBuilder("""
65+
UPDATE stat SET
66+
total_games = total_games + 1,
67+
score = score + CASE user_id
68+
""");
69+
70+
for (StatChangeEvent event : events) {
71+
sql.append(String.format("WHEN %d THEN %d ", event.getUserId(), event.getDeltaScore()));
72+
}
73+
74+
sql.append("END WHERE user_id IN (");
75+
sql.append(events.stream()
76+
.map(e -> String.valueOf(e.getUserId()))
77+
.collect(Collectors.joining(",")));
78+
sql.append(")");
79+
80+
int updatedRows = jdbcTemplate.update(sql.toString());
81+
log.debug("Batch updated {} lose stats", updatedRows);
82+
}
83+
}
84+

backend/src/main/java/io/f1/backend/domain/stat/dao/StatRepositoryAdapter.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,6 @@ public void addUser(long userId, String nickname) {
5959
@Override
6060
public void updateRank(long userId, boolean win, int deltaScore) {
6161
redisRepository.updateRank(userId, win, deltaScore);
62-
if (win) {
63-
jpaRepository.updateStatByUserIdCaseWin(deltaScore, userId);
64-
} else {
65-
jpaRepository.updateStatByUserIdCaseLose(deltaScore, userId);
66-
}
6762
}
6863

6964
@Override
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.f1.backend.domain.stat.dto;
2+
3+
import lombok.Builder;
4+
import lombok.Getter;
5+
6+
@Getter
7+
@Builder
8+
public class StatChangeEvent {
9+
10+
private Long userId;
11+
private boolean win;
12+
private int deltaScore;
13+
14+
public static StatChangeEvent of(Long userId, boolean win, int deltaScore) {
15+
return StatChangeEvent.builder()
16+
.userId(userId)
17+
.win(win)
18+
.deltaScore(deltaScore)
19+
.build();
20+
}
21+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.f1.backend.global.config;
2+
3+
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
4+
import org.springframework.context.annotation.Bean;
5+
import org.springframework.context.annotation.Configuration;
6+
import org.springframework.kafka.annotation.EnableKafka;
7+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
8+
import org.springframework.kafka.core.ConsumerFactory;
9+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
10+
import org.springframework.kafka.listener.ContainerProperties;
11+
12+
import lombok.RequiredArgsConstructor;
13+
14+
@EnableKafka
15+
@Configuration
16+
@RequiredArgsConstructor
17+
public class KafkaConfig {
18+
19+
private final KafkaProperties kafkaProperties;
20+
21+
@Bean
22+
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
23+
ConcurrentKafkaListenerContainerFactory<String, String> factory =
24+
new ConcurrentKafkaListenerContainerFactory<>();
25+
26+
ConsumerFactory<String, String> consumerFactory =
27+
new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
28+
29+
factory.setConsumerFactory(consumerFactory);
30+
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
31+
factory.setBatchListener(true);
32+
33+
return factory;
34+
}
35+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package io.f1.backend.global.util.kafka;
2+
3+
import org.springframework.kafka.core.KafkaTemplate;
4+
import org.springframework.stereotype.Component;
5+
import com.fasterxml.jackson.core.JsonProcessingException;
6+
import com.fasterxml.jackson.databind.ObjectMapper;
7+
8+
import lombok.RequiredArgsConstructor;
9+
import lombok.extern.slf4j.Slf4j;
10+
11+
@Slf4j
12+
@Component
13+
@RequiredArgsConstructor
14+
public class KafkaProducer {
15+
16+
private final KafkaTemplate<String, String> kafkaTemplate;
17+
private final ObjectMapper objectMapper = new ObjectMapper();
18+
19+
public void sendWithKey(String topic, String key, Object object) {
20+
String jsonMessage = convertToJson(object);
21+
if (jsonMessage != null) {
22+
kafkaTemplate.send(topic, key, jsonMessage);
23+
}
24+
}
25+
26+
private String convertToJson(Object object) {
27+
try {
28+
return objectMapper.writeValueAsString(object);
29+
} catch (JsonProcessingException e) {
30+
log.error("KafkaProducer Json Processing error", e);
31+
return null;
32+
}
33+
}
34+
}

backend/src/main/resources/application.yml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,22 @@ spring:
3838
format_sql: true
3939
dialect: org.hibernate.dialect.MySQLDialect
4040

41+
kafka:
42+
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
43+
consumer:
44+
group-id: consumer-group
45+
auto-offset-reset: earliest
46+
enable-auto-commit: false
47+
max-poll-records: ${KAFKA_MAX_POLL_RECORDS}
48+
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
49+
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
50+
producer:
51+
key-serializer: org.apache.kafka.common.serialization.StringSerializer
52+
value-serializer: org.apache.kafka.common.serialization.StringSerializer
53+
acks: all
54+
retries: 3
55+
batch-size: 16384
56+
4157
security:
4258
oauth2:
4359
client:
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package io.f1.backend.domain.stat;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import com.github.database.rider.core.api.dataset.DataSet;
6+
import com.github.database.rider.spring.api.DBRider;
7+
8+
import io.f1.backend.domain.stat.dao.StatJpaRepository;
9+
import io.f1.backend.domain.stat.dto.StatChangeEvent;
10+
import io.f1.backend.domain.stat.dto.StatWithUserSummary;
11+
import io.f1.backend.domain.user.dao.UserRepository;
12+
import io.f1.backend.global.config.KafkaTestContainerConfig;
13+
import io.f1.backend.global.util.kafka.KafkaProducer;
14+
import org.awaitility.Awaitility;
15+
import org.junit.jupiter.api.DisplayName;
16+
import org.junit.jupiter.api.Test;
17+
import org.springframework.beans.factory.annotation.Autowired;
18+
import org.springframework.boot.test.context.SpringBootTest;
19+
import org.springframework.context.annotation.Import;
20+
import java.time.Duration;
21+
22+
@DBRider
23+
@SpringBootTest
24+
@Import({KafkaTestContainerConfig.class})
25+
class KafkaStatTest {
26+
27+
@Autowired UserRepository userRepository;
28+
@Autowired KafkaProducer kafkaProducer;
29+
@Autowired StatJpaRepository statJpaRepository;
30+
31+
@Test
32+
@DataSet("datasets/stat/one-user-stat.yml")
33+
@DisplayName("Kafka를 통해 게임 결과가 전송되면 Consumer가 비동기로 처리하여 MySQL에 반영된다")
34+
void kafkaConsumerProcessesGameResultAsynchronously() throws Exception {
35+
// given
36+
long userId = 1L;
37+
StatWithUserSummary originalStat = statJpaRepository.findStatWithUserSummary(userId).orElseThrow(AssertionError::new);
38+
39+
int deltaScore = 100;
40+
StatChangeEvent event = StatChangeEvent.of(userId, true, deltaScore);
41+
kafkaProducer.sendWithKey("stat-changes", String.valueOf(userId), event);
42+
43+
// when
44+
Awaitility.await()
45+
.atMost(Duration.ofSeconds(10))
46+
.pollInterval(Duration.ofMillis(200))
47+
.until(() -> isStatUpdated(userId, originalStat.score() + deltaScore));
48+
49+
// then
50+
StatWithUserSummary updatedStat = statJpaRepository.findStatWithUserSummary(userId).orElseThrow(AssertionError::new);
51+
assertThat(updatedStat.score()).isEqualTo(originalStat.score() + deltaScore);
52+
assertThat(updatedStat.totalGames()).isEqualTo(originalStat.totalGames() + 1);
53+
assertThat(updatedStat.winningGames()).isEqualTo(originalStat.winningGames() + 1);
54+
}
55+
56+
private boolean isStatUpdated(long userId, long expectedScore) {
57+
try {
58+
return statJpaRepository.findStatWithUserSummary(userId).orElseThrow(AssertionError::new).score() == expectedScore;
59+
} catch (Exception e) {
60+
return false;
61+
}
62+
}
63+
}

0 commit comments

Comments
 (0)