Skip to content

Commit 02aaeae

Browse files
committed
refactor: sse 리펙토링 #81
- 1. 코드 리펙토링 - 2. SseEmitter 객체의 Id값을 시간값으로 - 3. userId의 emitterId값을 Redis에 저장
1 parent 9187581 commit 02aaeae

File tree

8 files changed

+161
-105
lines changed

8 files changed

+161
-105
lines changed

build.gradle.kts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,9 @@ dependencies {
8484
implementation("org.springframework.boot:spring-boot-starter-mail")
8585

8686
// Kafka
87-
// implementation ("org.springframework.kafka:spring-kafka")
88-
// implementation ("org.apache.kafka:kafka-streams")
89-
// implementation ("org.apache.kafka:kafka-clients")
87+
// implementation ("org.springframework.kafka:spring-kafka") // Spring Boot에서 Kafka를 편하게 사용하도록 도와주는 라이브러리
88+
// implementation ("org.apache.kafka:kafka-streams") // Kafka의 스트림 API를 사용할 때 필요
89+
// implementation ("org.apache.kafka:kafka-clients") // Kafka 브로커와 직접 통신하는 기본 클라이언트 라이브러리
9090

9191
//QueryDSL 추가
9292
implementation ("com.querydsl:querydsl-apt:5.0.0")

src/main/java/org/dfbf/soundlink/domain/alert/controller/AlertController.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,18 @@ public class AlertController {
1919

2020
@GetMapping(value = "/connect", produces = "text/event-stream")
2121
@Operation(summary = "SSE 연결 API", description = "SSE 연결")
22-
public SseEmitter subscribe(@AuthenticationPrincipal Long id) {
23-
return alertService.connectAlarm(id);
22+
public SseEmitter subscribe(
23+
@AuthenticationPrincipal Long id,
24+
@RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId) {
25+
return alertService.connectAlarm(id, lastEventId);
2426
}
2527

2628
@GetMapping(value = "/connect/test", produces = "text/event-stream")
2729
@Operation(summary = "SSE 연결 API", description = "SSE 연결")
28-
public SseEmitter subscribeTest(@RequestParam("id") Long id) {
29-
return alertService.connectAlarm(id);
30+
public SseEmitter subscribeTest(
31+
@RequestParam("id") Long id,
32+
@RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId) {
33+
return alertService.connectAlarm(id, lastEventId);
3034
}
3135

3236
@PostMapping("")
Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,52 @@
11
package org.dfbf.soundlink.domain.alert.repository;
22

3+
import lombok.RequiredArgsConstructor;
34
import lombok.extern.slf4j.Slf4j;
5+
import org.springframework.data.redis.core.RedisTemplate;
46
import org.springframework.stereotype.Repository;
57
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
68

7-
import java.util.ArrayList;
8-
import java.util.List;
9-
import java.util.Map;
10-
import java.util.Optional;
11-
import java.util.concurrent.ConcurrentHashMap;
9+
import java.util.*;
1210

1311
@Slf4j
1412
@Repository
13+
@RequiredArgsConstructor
1514
public class AlertRepository {
16-
private Map<String, SseEmitter> emitterMap = new ConcurrentHashMap<>();
15+
private final RedisTemplate<String, Object> redisTemplate;
16+
private final Map<String, SseEmitter> sseEmitterMap = new HashMap<>();
1717

18-
private String createKey(Long userId) {
19-
return "userAlarm:" + userId;
18+
public String createEmitterId(Long userId) {
19+
return String.valueOf(userId) + "_" + System.currentTimeMillis();
2020
}
2121

22-
public SseEmitter save(Long userId, SseEmitter sseEmitter) {
23-
String key = this.createKey(userId);
24-
if (emitterMap.containsKey(key)) {
25-
emitterMap.remove(key);
26-
}
27-
emitterMap.put(key, sseEmitter);
22+
// Redis에 emitterId 저장
23+
public String saveEmitterId(Long userId, String emitterId) {
24+
redisTemplate.opsForValue().set("alert::" + userId, emitterId);
2825

29-
log.info("[SseEmitter] Set {}", key);
30-
return sseEmitter;
26+
return emitterId;
27+
}
28+
29+
// Redis에서 emitterId 조회
30+
public Optional<String> getEmitterId(Long userId) {
31+
return Optional.ofNullable((String) redisTemplate.opsForValue().get("alert::" + userId));
3132
}
3233

33-
public Optional<SseEmitter> get(Long userId) {
34-
String key = this.createKey(userId);
35-
SseEmitter sseEmitter = emitterMap.get(key);
34+
// Redis에 SseEmitter 저장
35+
public SseEmitter save(String emitterId, SseEmitter sseEmitter) {
36+
sseEmitterMap.remove(emitterId);
37+
sseEmitterMap.put(emitterId, sseEmitter);
3638

37-
log.info("[SseEmitter] Get {}", key);
38-
return Optional.ofNullable(sseEmitter);
39+
return sseEmitter;
3940
}
4041

41-
public void delete(Long userId) {
42-
emitterMap.remove(this.createKey(userId));
42+
// Redis에서 SseEmitter 조회
43+
public Optional<SseEmitter> get(String emitterId) {
44+
return Optional.ofNullable(sseEmitterMap.get(emitterId));
4345
}
4446

45-
// 저장된 알람을 가져오는 메서드
46-
// public List<String> getSavedAlerts(Long userId) {
47-
// // 저장된 알람 목록을 리턴하는 로직을 추가
48-
// // 예시로 간단히 List<String> 타입으로, 필요에 따라 알람 객체를 리턴할 수도 있음
49-
// return new ArrayList<>(); // 이곳을 실제 알람 저장 로직으로 수정
50-
// }
47+
// Redis에서 emitterId, SseEmitter 삭제
48+
public void delete(Long userId, String emitterId) {
49+
sseEmitterMap.remove(emitterId);
50+
redisTemplate.delete("alert::" + userId);
51+
}
5152
}

src/main/java/org/dfbf/soundlink/domain/alert/service/AlertService.java

Lines changed: 56 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -12,112 +12,100 @@
1212

1313
import java.io.IOException;
1414
import java.util.concurrent.Executors;
15+
import java.util.concurrent.ScheduledExecutorService;
16+
import java.util.concurrent.TimeUnit;
1517

1618
@Service
1719
@Slf4j
1820
@RequiredArgsConstructor
1921
public class AlertService {
2022

21-
private final AlertRepository emitterRepository;
23+
private final AlertRepository alertRepository;
2224

2325
// 60 * 1000 * 60 = 3,600,000{ms} = 1시간
2426
private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
25-
private static final String ALARM_NAME = "alarm";
2627
private static final String USER_PREFIX = "user:";
2728

28-
private String createAlarmId(Long userId, Long alarmId) {
29-
String username = String.valueOf(userId);
30-
if (alarmId == null) {
31-
return username + "_" + System.currentTimeMillis();
32-
}
33-
log.info("[SseEmitter] Create alarmId {}", alarmId);
34-
35-
return username + "_" + alarmId;
29+
private String createEmitterId(Long userId) {
30+
return String.valueOf(userId) + "_" + System.currentTimeMillis();
3631
}
3732

3833
// SSE 서버 연결
39-
public SseEmitter connectAlarm(Long id) {
40-
41-
SseEmitter sseEmitter = new SseEmitter(DEFAULT_TIMEOUT);
42-
emitterRepository.save(id, sseEmitter);
43-
44-
// 종료 되었을 때 처리
45-
sseEmitter.onCompletion(() -> {
46-
log.info("[SseEmitter] {} SseEmitter Completed", USER_PREFIX + id);
47-
emitterRepository.delete(id);
48-
});
49-
50-
// timeOut 시 처리
51-
sseEmitter.onTimeout(() -> {
52-
log.info("[SseEmitter] {} SseEmitter Timeout", USER_PREFIX + id);
53-
emitterRepository.delete(id);
54-
});
55-
56-
57-
// // 연결 시 기존에 저장된 알람을 전송
58-
// sendSavedAlerts(id, sseEmitter);
59-
60-
Executors.newSingleThreadExecutor().submit(() -> {
61-
try {
62-
sseEmitter.send(SseEmitter.event()
63-
.id(createAlarmId(id, null))
64-
.name("open")
65-
.data("connect completed!!")
66-
);
67-
68-
// 45초마다 빈 데이터를 보내어 연결을 유지
69-
while (true) {
70-
Thread.sleep(40000); // 45초마다 빈 메시지를 전송
71-
sseEmitter.send(SseEmitter.event().name("ping").data("connection keep-alive"));
72-
}
73-
} catch (IOException e) {
74-
log.error(e.getMessage());
75-
} catch (InterruptedException e) {
76-
Thread.currentThread().interrupt();
77-
log.error(e.getMessage());
78-
} finally {
79-
sseEmitter.complete();
80-
}
81-
});
34+
public SseEmitter connectAlarm(Long id, String lastEventId) {
35+
36+
if (lastEventId != null && !lastEventId.isEmpty()) {
37+
log.info("[lastEventId] {}", lastEventId);
38+
}
39+
40+
// emitterId 생성 & 저장
41+
String emitterId = this.createEmitterId(id);
42+
alertRepository.saveEmitterId(id, emitterId);
43+
44+
// SseEmitter 생성
45+
SseEmitter sseEmitter = alertRepository.save(emitterId, new SseEmitter(DEFAULT_TIMEOUT));
46+
47+
sseEmitter.onCompletion(() -> alertRepository.delete(id, emitterId)); // 연결 종료 시 처리
48+
sseEmitter.onTimeout(() -> alertRepository.delete(id, emitterId)); // 타임아웃 시 처리
49+
50+
try {
51+
sseEmitter.send(SseEmitter.event()
52+
.id(this.createEmitterId(id))
53+
.name("open")
54+
.data("connect completed!!")
55+
);
56+
} catch (IOException e) {
57+
log.error("Error sending ping", e);
58+
}
59+
60+
// ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
61+
// scheduler.scheduleAtFixedRate(() -> {
62+
// try {
63+
// this.send(id, "ping", "connection keep-alive");
64+
// } catch (Exception e) {
65+
// log.error("Error sending ping", e);
66+
// }
67+
// }, 0, 45, TimeUnit.SECONDS); // 45초마다 빈 메시지 전송
8268

8369
return sseEmitter;
8470
}
8571

8672
// SSE를 통해 메시지 전송
87-
public ResponseResult send(/*Long alarmId,*/ Long userId, String alertName, Object msg) {
88-
SseEmitter sseEmitter = emitterRepository.get(userId)
89-
.orElseGet(() -> {
90-
log.info("[SseEmitter] {} SseEmitter Not Founded", USER_PREFIX + userId);
91-
return new SseEmitter(); // 기본 객체 반환 (예시)
92-
});
73+
public ResponseResult send(Long userId, String alertName, Object data) {
74+
String eventId = alertName.equals("ping") ? "-1" : this.createEmitterId(userId);
75+
76+
String emitterId = alertRepository.getEmitterId(userId)
77+
.orElseThrow(() -> new IllegalArgumentException("Not Found EmitterId: " + userId));
78+
79+
SseEmitter sseEmitter = alertRepository.get(emitterId)
80+
.orElseThrow(() -> new IllegalArgumentException("Not Found SseEmitter: " + userId));
9381

9482
try {
9583
ObjectMapper objectMapper = new ObjectMapper();
96-
String jsonMsg = objectMapper.writeValueAsString(msg); // msg를 JSON 문자열로 변환
84+
String jsonMsg = objectMapper.writeValueAsString(data); // msg를 JSON 문자열로 변환
9785

9886
sseEmitter.send(
9987
SseEmitter.event()
100-
.id(createAlarmId(userId, null))
88+
.id(eventId)
10189
.name(alertName)
10290
.data(jsonMsg, MediaType.APPLICATION_JSON) // 변환된 JSON 문자열 전송
10391
);
10492

10593
return new ResponseResult(ErrorCode.SUCCESS);
10694
} catch (IOException e) {
107-
emitterRepository.delete(userId);
108-
log.error(e.getMessage(), e);
95+
alertRepository.delete(userId, emitterId);
10996
return new ResponseResult(ErrorCode.BAD_REQUEST_STATUS, e.getMessage());
11097
}
11198
}
11299

113100
// 사용자 SSE 연결 해제
114101
public void disconnectAlarm(Long userId) {
115-
SseEmitter sseEmitter = emitterRepository.get(userId)
116-
.orElseGet(() -> {
117-
log.info("[SseEmitter] {} SseEmitter Not Founded", USER_PREFIX + userId);
118-
return new SseEmitter(); // 기본 객체 반환 (예시)
119-
});
102+
String emitterId = alertRepository.getEmitterId(userId)
103+
.orElseThrow(() -> new IllegalArgumentException("Not Found EmitterId: " + userId));
104+
105+
SseEmitter sseEmitter = alertRepository.get(emitterId)
106+
.orElseThrow(() -> new IllegalArgumentException("Not Found SseEmitter: " + userId));
120107

108+
alertRepository.delete(userId, emitterId);
121109
sseEmitter.complete();
122110
}
123111
}

src/main/java/org/dfbf/soundlink/domain/chat/service/ChatRoomService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ public ResponseResult saveRequestToRedis(Long requestUserId, Long emotionRecordI
7272
Set<String> existIngKeys = redisTemplate.keys(CHAT_REQUEST_KEY + "*to" + emotionRecordId + "*");
7373
if(!existIngKeys.isEmpty()){
7474
//이미 요청이 있을 경우, 예외처리
75+
log.info(existIngKeys.toString());
7576
return new ResponseResult(400, "Another user has already sent a request for this record.");
7677
}
7778

@@ -92,7 +93,7 @@ public ResponseResult saveRequestToRedis(Long requestUserId, Long emotionRecordI
9293
ChatRequest chatRequest = new ChatRequest(requestUserId, responseUserId, emotionRecordId);
9394

9495
// Redis 저장
95-
redisTemplate.opsForValue().set(key, chatRequest, Duration.ofSeconds(10));
96+
redisTemplate.opsForValue().set(key, chatRequest, Duration.ofSeconds(60));
9697

9798
// 알림 전송
9899
User requestUser = userRepository.findByUserIdWithCache(requestUserId)
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package org.dfbf.soundlink.global.config;
2+
3+
import org.springframework.context.annotation.Configuration;
4+
5+
@Configuration
6+
public class KafkaConfig {
7+
8+
// @Value("${spring.kafka.bootstrap-servers}")
9+
// private String bootstrapServers;
10+
//
11+
// @Value("${spring.kafka.streams.application-id}")
12+
// private String applicationId;
13+
//
14+
// @Bean
15+
// public KafkaStreamsDefaultConfiguration kStreamsConfigs() {
16+
// Map<String, Object> props = new HashMap<>();
17+
// props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
18+
// props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
19+
// props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
20+
// props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class);
21+
//
22+
// return new KafkaStreamsDefaultConfiguration();
23+
// }
24+
//
25+
// @Bean
26+
// public KStream<String, String> simpleStream(StreamsBuilder builder) {
27+
// // "chat-topic" 토픽 구독 (테스트용)
28+
// KStream<String, String> stream = builder.stream("alert-topic");
29+
// stream.foreach((key, value) -> System.out.println("Received message: " + key + " -> " + value));
30+
// return stream;
31+
// }
32+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package org.dfbf.soundlink.global.service;
2+
3+
import org.springframework.stereotype.Service;
4+
5+
@Service
6+
public class KafkaConsumerService {
7+
8+
// @KafkaListener(topics = "your_topic_name", groupId = "my-consumer-group")
9+
// public void listen(String message) {
10+
// System.out.println("Received Message: " + message);
11+
// // 메시지를 처리하는 로직 추가
12+
// }
13+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package org.dfbf.soundlink.global.service;
2+
3+
import org.springframework.stereotype.Service;
4+
5+
@Service
6+
public class KafkaProducerService {
7+
8+
// private final KafkaTemplate<String, String> kafkaTemplate;
9+
//
10+
// public KafkaProducerService(KafkaTemplate<String, String> kafkaTemplate) {
11+
// this.kafkaTemplate = kafkaTemplate;
12+
// }
13+
//
14+
// public void sendMessage(String topic, String message) {
15+
// kafkaTemplate.send(topic, message);
16+
// }
17+
}

0 commit comments

Comments
 (0)