Skip to content

Commit 5d9e0cb

Browse files
committed
temporary
1 parent 636ecc6 commit 5d9e0cb

File tree

9 files changed

+239
-6
lines changed

9 files changed

+239
-6
lines changed

backend/build.gradle

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ dependencies {
5454
testImplementation 'org.testcontainers:junit-jupiter'
5555
testImplementation "com.redis:testcontainers-redis:2.2.4"
5656

57+
/* KAFKA */
58+
implementation 'org.springframework.kafka:spring-kafka'
59+
5760
/* ETC */
5861
implementation 'org.apache.commons:commons-lang3:3.12.0'
5962
annotationProcessor 'org.projectlombok:lombok'
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.f1.backend.domain.stat.app;
2+
3+
import io.f1.backend.domain.stat.dao.StatBatchRepository;
4+
import io.f1.backend.domain.stat.dto.StatChangeEvent;
5+
6+
import lombok.RequiredArgsConstructor;
7+
import lombok.extern.slf4j.Slf4j;
8+
9+
import org.springframework.kafka.annotation.KafkaListener;
10+
import org.springframework.stereotype.Component;
11+
import org.springframework.transaction.annotation.Transactional;
12+
13+
import java.util.List;
14+
15+
@Slf4j
16+
@Component
17+
@RequiredArgsConstructor
18+
public class StatKafkaConsumer {
19+
20+
private final StatBatchRepository statBatchRepository;
21+
22+
@KafkaListener(topics = "stat-changes")
23+
@Transactional
24+
public void handleStatChanges(List<StatChangeEvent> events) {
25+
26+
log.info("Processing {} stat change events", events.size());
27+
28+
try {
29+
statBatchRepository.batchUpdateStats(events);
30+
} catch (Exception e) {
31+
log.error("Failed to process stat change events", e);
32+
throw e;
33+
}
34+
}
35+
}

backend/src/main/java/io/f1/backend/domain/stat/app/StatService.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package io.f1.backend.domain.stat.app;
22

33
import io.f1.backend.domain.stat.dao.StatRepository;
4+
import io.f1.backend.domain.stat.dto.StatChangeEvent;
45
import io.f1.backend.domain.stat.dto.StatPageResponse;
56
import io.f1.backend.global.exception.CustomException;
67
import io.f1.backend.global.exception.errorcode.RoomErrorCode;
8+
import io.f1.backend.global.util.kafka.KafkaProducer;
79

810
import lombok.RequiredArgsConstructor;
911

@@ -18,6 +20,7 @@
1820
public class StatService {
1921

2022
private final StatRepository statRepository;
23+
private final KafkaProducer kafkaProducer;
2124

2225
@Transactional(readOnly = true)
2326
public StatPageResponse getRanks(Pageable pageable, String nickname) {
@@ -36,9 +39,11 @@ public StatPageResponse getRanks(Pageable pageable, String nickname) {
3639
return response;
3740
}
3841

39-
// TODO: 게임 종료 후 호출 필요
4042
public void updateRank(long userId, boolean win, int deltaScore) {
4143
statRepository.updateRank(userId, win, deltaScore);
44+
45+
StatChangeEvent event = StatChangeEvent.of(userId, win, deltaScore);
46+
kafkaProducer.sendWithKey("stat-changes", String.valueOf(userId), event);
4247
}
4348

4449
public void addUser(long userId, String nickname) {
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package io.f1.backend.domain.stat.dao;
2+
3+
import io.f1.backend.domain.stat.dto.StatChangeEvent;
4+
import lombok.RequiredArgsConstructor;
5+
import lombok.extern.slf4j.Slf4j;
6+
import org.springframework.jdbc.core.JdbcTemplate;
7+
import org.springframework.stereotype.Repository;
8+
9+
import java.util.List;
10+
import java.util.stream.Collectors;
11+
12+
@Slf4j
13+
@Repository
14+
@RequiredArgsConstructor
15+
public class StatBatchRepository {
16+
17+
private final JdbcTemplate jdbcTemplate;
18+
19+
public void batchUpdateStats(List<StatChangeEvent> events) {
20+
if (events.isEmpty()) {
21+
return;
22+
}
23+
24+
// Win 이벤트와 Lose 이벤트 분리
25+
List<StatChangeEvent> winEvents = events.stream()
26+
.filter(StatChangeEvent::isWin)
27+
.toList();
28+
29+
List<StatChangeEvent> loseEvents = events.stream()
30+
.filter(e -> !e.isWin())
31+
.toList();
32+
33+
// Win 업데이트 (1번의 쿼리)
34+
if (!winEvents.isEmpty()) {
35+
batchUpdateWinStats(winEvents);
36+
}
37+
38+
// Lose 업데이트 (1번의 쿼리)
39+
if (!loseEvents.isEmpty()) {
40+
batchUpdateLoseStats(loseEvents);
41+
}
42+
}
43+
44+
private void batchUpdateWinStats(List<StatChangeEvent> events) {
45+
StringBuilder sql = new StringBuilder("""
46+
UPDATE stat SET
47+
total_games = total_games + 1,
48+
winning_games = winning_games + 1,
49+
score = score + CASE user_id
50+
""");
51+
52+
// CASE WHEN 절 생성
53+
for (StatChangeEvent event : events) {
54+
sql.append(String.format("WHEN %d THEN %d ", event.getUserId(), event.getDeltaScore()));
55+
}
56+
57+
sql.append("END WHERE user_id IN (");
58+
sql.append(events.stream()
59+
.map(e -> String.valueOf(e.getUserId()))
60+
.collect(Collectors.joining(",")));
61+
sql.append(")");
62+
63+
int updatedRows = jdbcTemplate.update(sql.toString());
64+
log.debug("Batch updated {} win stats", updatedRows);
65+
}
66+
67+
private void batchUpdateLoseStats(List<StatChangeEvent> events) {
68+
StringBuilder sql = new StringBuilder("""
69+
UPDATE stat SET
70+
total_games = total_games + 1,
71+
score = score + CASE user_id
72+
""");
73+
74+
// CASE WHEN 절 생성
75+
for (StatChangeEvent event : events) {
76+
sql.append(String.format("WHEN %d THEN %d ", event.getUserId(), event.getDeltaScore()));
77+
}
78+
79+
sql.append("END WHERE user_id IN (");
80+
sql.append(events.stream()
81+
.map(e -> String.valueOf(e.getUserId()))
82+
.collect(Collectors.joining(",")));
83+
sql.append(")");
84+
85+
int updatedRows = jdbcTemplate.update(sql.toString());
86+
log.debug("Batch updated {} lose stats", updatedRows);
87+
}
88+
}
89+

backend/src/main/java/io/f1/backend/domain/stat/dao/StatRepositoryAdapter.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,6 @@ public void addUser(long userId, String nickname) {
5959
@Override
6060
public void updateRank(long userId, boolean win, int deltaScore) {
6161
redisRepository.updateRank(userId, win, deltaScore);
62-
if (win) {
63-
jpaRepository.updateStatByUserIdCaseWin(deltaScore, userId);
64-
} else {
65-
jpaRepository.updateStatByUserIdCaseLose(deltaScore, userId);
66-
}
6762
}
6863

6964
@Override
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.f1.backend.domain.stat.dto;
2+
3+
import lombok.Builder;
4+
import lombok.Getter;
5+
6+
@Getter
7+
@Builder
8+
public class StatChangeEvent {
9+
10+
private Long userId;
11+
private boolean win;
12+
private int deltaScore;
13+
14+
public static StatChangeEvent of(Long userId, boolean win, int deltaScore) {
15+
return StatChangeEvent.builder()
16+
.userId(userId)
17+
.win(win)
18+
.deltaScore(deltaScore)
19+
.build();
20+
}
21+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package io.f1.backend.global.config;
2+
3+
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
4+
import org.springframework.context.annotation.Bean;
5+
import org.springframework.context.annotation.Configuration;
6+
import org.springframework.kafka.annotation.EnableKafka;
7+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
8+
import org.springframework.kafka.core.ConsumerFactory;
9+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
10+
import org.springframework.kafka.listener.ContainerProperties;
11+
12+
import lombok.RequiredArgsConstructor;
13+
14+
@EnableKafka
15+
@Configuration
16+
@RequiredArgsConstructor
17+
public class KafkaConfig {
18+
19+
private final KafkaProperties kafkaProperties;
20+
21+
@Bean
22+
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
23+
ConcurrentKafkaListenerContainerFactory<String, String> factory =
24+
new ConcurrentKafkaListenerContainerFactory<>();
25+
26+
ConsumerFactory<String, String> consumerFactory =
27+
new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
28+
29+
factory.setConsumerFactory(consumerFactory);
30+
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
31+
factory.setBatchListener(true);
32+
33+
return factory;
34+
}
35+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package io.f1.backend.global.util.kafka;
2+
3+
import org.springframework.kafka.core.KafkaTemplate;
4+
import org.springframework.stereotype.Component;
5+
import com.fasterxml.jackson.core.JsonProcessingException;
6+
import com.fasterxml.jackson.databind.ObjectMapper;
7+
8+
import lombok.RequiredArgsConstructor;
9+
import lombok.extern.slf4j.Slf4j;
10+
11+
@Slf4j
12+
@Component
13+
@RequiredArgsConstructor
14+
public class KafkaProducer {
15+
16+
private final KafkaTemplate<String, String> kafkaTemplate;
17+
private final ObjectMapper objectMapper = new ObjectMapper();
18+
19+
public void sendWithKey(String topic, String key, Object object) {
20+
String jsonMessage = convertToJson(object);
21+
if (jsonMessage != null) {
22+
kafkaTemplate.send(topic, key, jsonMessage);
23+
}
24+
}
25+
26+
private String convertToJson(Object object) {
27+
try {
28+
return objectMapper.writeValueAsString(object);
29+
} catch (JsonProcessingException e) {
30+
log.error("KafkaProducer Json Processing error", e);
31+
return null;
32+
}
33+
}
34+
}

backend/src/main/resources/application.yml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,22 @@ spring:
3838
format_sql: true
3939
dialect: org.hibernate.dialect.MySQLDialect
4040

41+
kafka:
42+
bootstrap-servers: ${KAFKA_BOOTSTRAP_SERVERS}
43+
consumer:
44+
group-id: consumer-group
45+
auto-offset-reset: earliest
46+
enable-auto-commit: false
47+
max-poll-records: 100
48+
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
49+
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
50+
producer:
51+
key-serializer: org.apache.kafka.common.serialization.StringSerializer
52+
value-serializer: org.apache.kafka.common.serialization.StringSerializer
53+
acks: all
54+
retries: 3
55+
batch-size: 16384
56+
4157
security:
4258
oauth2:
4359
client:

0 commit comments

Comments
 (0)