Skip to content

Commit ea73250

Browse files
authored
merge: pull request #87 from feat/kafka/1
feat: Sse & Kafka & Redis #81
2 parents a019b95 + fee49af commit ea73250

File tree

11 files changed

+247
-104
lines changed

11 files changed

+247
-104
lines changed

build.gradle.kts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,9 +84,9 @@ dependencies {
8484
implementation("org.springframework.boot:spring-boot-starter-mail")
8585

8686
// Kafka
87-
// implementation ("org.springframework.kafka:spring-kafka") // Spring Boot에서 Kafka를 편하게 사용하도록 도와주는 라이브러리
88-
// implementation ("org.apache.kafka:kafka-streams") // Kafka의 스트림 API를 사용할 때 필요
89-
// implementation ("org.apache.kafka:kafka-clients") // Kafka 브로커와 직접 통신하는 기본 클라이언트 라이브러리
87+
implementation ("org.springframework.kafka:spring-kafka") // Spring Boot에서 Kafka를 편하게 사용하도록 도와주는 라이브러리
88+
implementation ("org.apache.kafka:kafka-streams") // Kafka의 스트림 API를 사용할 때 필요
89+
implementation ("org.apache.kafka:kafka-clients") // Kafka 브로커와 직접 통신하는 기본 클라이언트 라이브러리
9090

9191
//QueryDSL 추가
9292
implementation ("com.querydsl:querydsl-apt:5.0.0")

src/main/java/org/dfbf/soundlink/domain/alert/controller/AlertController.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import io.swagger.v3.oas.annotations.tags.Tag;
55
import lombok.RequiredArgsConstructor;
66
import org.dfbf.soundlink.domain.alert.service.AlertService;
7-
import org.dfbf.soundlink.global.exception.ResponseResult;
87
import org.springframework.security.core.annotation.AuthenticationPrincipal;
98
import org.springframework.web.bind.annotation.*;
109
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@@ -32,10 +31,4 @@ public SseEmitter subscribeTest(
3231
@RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId) {
3332
return alertService.connectAlarm(id, lastEventId);
3433
}
35-
36-
@PostMapping("")
37-
@Operation(summary = "알림 전송 API", description = "알림을 전송하는 기능 (CHAT 서버에서 사용하는 기능입니다.)")
38-
public ResponseResult send(@RequestParam("id") Long id, @RequestParam("msg") String msg) {
39-
return alertService.send(id,"test" ,msg);
40-
}
4134
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.dfbf.soundlink.domain.alert.dto;
2+
3+
public record AlertReq(
4+
String type,
5+
String timestamp,
6+
Object data) {
7+
}

src/main/java/org/dfbf/soundlink/domain/alert/entity/Alert.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,30 @@
11
package org.dfbf.soundlink.domain.alert.entity;
22

33
import com.fasterxml.jackson.annotation.JsonProperty;
4-
import lombok.Builder;
4+
import lombok.*;
55

66
import java.time.Instant;
77
import java.time.ZoneId;
88
import java.time.format.DateTimeFormatter;
99

10+
@Data
11+
@NoArgsConstructor(access = AccessLevel.PROTECTED, force = true)
12+
@AllArgsConstructor
13+
@Builder
1014
public class Alert {
1115

16+
@JsonProperty("eventId")
17+
private String eventId;
18+
1219
@JsonProperty("type")
1320
private String type;
1421

1522
@JsonProperty("timestamp")
1623
private final String timestamp;
1724

25+
@JsonProperty("userId")
26+
private Long userId;
27+
1828
@JsonProperty("data")
1929
private Object data;
2030

@@ -29,6 +39,6 @@ private String format(Instant now) {
2939
public Alert(String type, Object data) {
3040
this.type = type;
3141
this.timestamp = this.format(Instant.now());
32-
this.data = data;
42+
this.data = data != null ? data.toString() : null;
3343
}
3444
}

src/main/java/org/dfbf/soundlink/domain/alert/service/AlertService.java

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
import com.fasterxml.jackson.databind.ObjectMapper;
44
import lombok.RequiredArgsConstructor;
55
import lombok.extern.slf4j.Slf4j;
6+
import org.dfbf.soundlink.domain.alert.entity.Alert;
67
import org.dfbf.soundlink.domain.alert.repository.AlertRepository;
78
import org.dfbf.soundlink.global.exception.ErrorCode;
89
import org.dfbf.soundlink.global.exception.ResponseResult;
10+
import org.springframework.data.redis.core.RedisTemplate;
911
import org.springframework.http.MediaType;
1012
import org.springframework.stereotype.Service;
1113
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@@ -21,15 +23,45 @@
2123
public class AlertService {
2224

2325
private final AlertRepository alertRepository;
26+
private final RedisTemplate<String, Object> redisTemplate;
2427

2528
// 60 * 1000 * 60 = 3,600,000{ms} = 1시간
2629
private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
27-
private static final String USER_PREFIX = "user:";
2830

2931
private String createEmitterId(Long userId) {
3032
return String.valueOf(userId) + "_" + System.currentTimeMillis();
3133
}
3234

35+
// 사용자에게 전송되지 않은 알림을 Redis에서 꺼내서 전송하고 삭제
36+
private void sendPendingAlerts(Long userId, SseEmitter sseEmitter) {
37+
String keyPattern = "alert:" + userId + "_*"; // 사용자 알림에 대한 키 패턴
38+
39+
// 해당 키 패턴을 가진 모든 알림을 가져오기
40+
redisTemplate.keys(keyPattern).forEach(key -> {
41+
Alert alert = (Alert) redisTemplate.opsForValue().get(key); // Redis에서 알림 가져오기
42+
if (alert != null) {
43+
try {
44+
log.info("Sending pending alert to user {}", userId);
45+
ObjectMapper objectMapper = new ObjectMapper();
46+
String jsonMsg = objectMapper.writeValueAsString(alert.getData());
47+
48+
// 알림 전송
49+
sseEmitter.send(SseEmitter.event()
50+
.id(alert.getEventId())
51+
.name(alert.getType())
52+
.data(jsonMsg)
53+
);
54+
55+
// 알림을 Redis에서 삭제
56+
redisTemplate.delete(key);
57+
log.info("Pending alert sent and removed from Redis for user {}", userId);
58+
} catch (IOException e) {
59+
log.error("Error sending pending alert", e);
60+
}
61+
}
62+
});
63+
}
64+
3365
// SSE 서버 연결
3466
public SseEmitter connectAlarm(Long id, String lastEventId) {
3567

@@ -57,21 +89,22 @@ public SseEmitter connectAlarm(Long id, String lastEventId) {
5789
log.error("Error sending ping", e);
5890
}
5991

60-
// ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
61-
// scheduler.scheduleAtFixedRate(() -> {
62-
// try {
63-
// this.send(id, "ping", "connection keep-alive");
64-
// } catch (Exception e) {
65-
// log.error("Error sending ping", e);
66-
// }
67-
// }, 0, 45, TimeUnit.SECONDS); // 45초마다 빈 메시지 전송
92+
// 사용자에게 전송되지 않은 알림 전송
93+
this.sendPendingAlerts(id, sseEmitter);
94+
95+
Executors.newSingleThreadExecutor().submit(() -> {
96+
while (true) {
97+
Thread.sleep(40000); // 45초마다 빈 메시지를 전송
98+
sseEmitter.send(SseEmitter.event().name("ping").data("connection keep-alive"));
99+
}
100+
});
68101

69102
return sseEmitter;
70103
}
71104

72105
// SSE를 통해 메시지 전송
73-
public ResponseResult send(Long userId, String alertName, Object data) {
74-
String eventId = alertName.equals("ping") ? "-1" : this.createEmitterId(userId);
106+
public void send(Long userId, String alertName, Object data) {
107+
String eventId = this.createEmitterId(userId);
75108

76109
String emitterId = alertRepository.getEmitterId(userId)
77110
.orElseThrow(() -> new IllegalArgumentException("Not Found EmitterId: " + userId));
@@ -89,11 +122,8 @@ public ResponseResult send(Long userId, String alertName, Object data) {
89122
.name(alertName)
90123
.data(jsonMsg, MediaType.APPLICATION_JSON) // 변환된 JSON 문자열 전송
91124
);
92-
93-
return new ResponseResult(ErrorCode.SUCCESS);
94125
} catch (IOException e) {
95126
alertRepository.delete(userId, emitterId);
96-
return new ResponseResult(ErrorCode.BAD_REQUEST_STATUS, e.getMessage());
97127
}
98128
}
99129

@@ -108,4 +138,15 @@ public void disconnectAlarm(Long userId) {
108138
alertRepository.delete(userId, emitterId);
109139
sseEmitter.complete();
110140
}
141+
142+
// Alert 객체 생성
143+
public Alert createAlert(Long userId, String type, Object data) {
144+
return Alert.builder()
145+
.eventId(this.createEmitterId(userId))
146+
.type(type)
147+
.userId(userId)
148+
.data(data)
149+
.build();
150+
151+
}
111152
}

src/main/java/org/dfbf/soundlink/domain/chat/service/ChatRoomService.java

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@
77
import org.dfbf.soundlink.domain.alert.service.AlertService;
88
import org.dfbf.soundlink.domain.blocklist.repository.BlockListRepository;
99
import org.dfbf.soundlink.domain.chat.dto.ChatRejectDto;
10+
import org.dfbf.soundlink.domain.chat.dto.ChatReqDto;
1011
import org.dfbf.soundlink.domain.chat.dto.ChatRoomInfoDto;
1112
import org.dfbf.soundlink.domain.chat.dto.ChatRoomListDto;
12-
import org.dfbf.soundlink.domain.chat.entity.redis.ChatRequest;
13-
import org.dfbf.soundlink.domain.chat.dto.ChatReqDto;
1413
import org.dfbf.soundlink.domain.chat.entity.ChatRoom;
14+
import org.dfbf.soundlink.domain.chat.entity.redis.ChatRequest;
1515
import org.dfbf.soundlink.domain.chat.exception.ChatRoomNotFoundException;
1616
import org.dfbf.soundlink.domain.chat.exception.UnauthorizedAccessException;
1717
import org.dfbf.soundlink.domain.chat.repository.ChatRoomRepository;
@@ -27,21 +27,16 @@
2727
import org.dfbf.soundlink.global.exception.ErrorCode;
2828
import org.dfbf.soundlink.global.exception.ResponseResult;
2929
import org.dfbf.soundlink.global.feign.chat.DevChatClient;
30+
import org.dfbf.soundlink.global.kafka.KafkaProducer;
3031
import org.springframework.data.redis.core.RedisTemplate;
3132
import org.springframework.security.core.annotation.AuthenticationPrincipal;
3233
import org.springframework.stereotype.Service;
3334
import org.springframework.transaction.annotation.Transactional;
3435

35-
import java.time.Duration;
3636
import java.sql.Timestamp;
37-
38-
import java.util.HashMap;
39-
import java.util.List;
40-
import java.util.Map;
41-
import java.util.Optional;
37+
import java.time.Duration;
4238

4339
import java.util.*;
44-
import java.util.stream.Collectors;
4540

4641

4742
@Service
@@ -56,9 +51,11 @@ public class ChatRoomService {
5651
private final BlockListRepository blockListRepository;
5752
private final AlertService alertService;
5853
private final DevChatClient devChatClient;
54+
private final KafkaProducer kafkaProducer;
5955
private final UserStatusService userStatusService;
6056

6157
private static final String CHAT_REQUEST_KEY = "chatRequest";
58+
private static final String TOPIC = "alert-topic";
6259

6360
// 요청을 Redis에 저장 (TTL: 60초)
6461
public ResponseResult saveRequestToRedis(Long requestUserId, Long emotionRecordId) {
@@ -74,7 +71,14 @@ public ResponseResult saveRequestToRedis(Long requestUserId, Long emotionRecordI
7471
return new ResponseResult(400, "You can't chat with yourself.");
7572
}
7673

77-
//이미 요청이 있는지 확인(Redis에 emotionRecordId에 대한 요청이 있는지 확인)
74+
// Redis에 이미 requestUserId가 포함되어 있는 경우
75+
if (!redisTemplate.keys(CHAT_REQUEST_KEY + requestUserId + "to*").isEmpty()) {
76+
String firstKey = redisTemplate.keys(CHAT_REQUEST_KEY + requestUserId + "to*").iterator().next(); // 첫 번째 키 가져오기
77+
Long ttl = redisTemplate.getExpire(firstKey);
78+
return new ResponseResult(400, ttl + "초 후에 다시 시도해주세요.");
79+
}
80+
81+
// 이미 요청이 있는지 확인(Redis에 emotionRecordId에 대한 요청이 있는지 확인)
7882
Set<String> existIngKeys = redisTemplate.keys(CHAT_REQUEST_KEY + "*to" + emotionRecordId + "*");
7983
if(!existIngKeys.isEmpty()){
8084
//이미 요청이 있을 경우, 예외처리
@@ -87,13 +91,6 @@ public ResponseResult saveRequestToRedis(Long requestUserId, Long emotionRecordI
8791
return new ResponseResult(400, "Blocked user.");
8892
}
8993

90-
// Redis에 이미 requestUserId가 포함되어 있는 경우
91-
if (!redisTemplate.keys(CHAT_REQUEST_KEY + requestUserId + "to*").isEmpty()) {
92-
String firstKey = redisTemplate.keys(CHAT_REQUEST_KEY + requestUserId + "to*").iterator().next(); // 첫 번째 키 가져오기
93-
Long ttl = redisTemplate.getExpire(firstKey);
94-
return new ResponseResult(400, ttl + "초 후에 다시 시도해주세요.");
95-
}
96-
9794
// Key & Request 객체 생성
9895
String key = CHAT_REQUEST_KEY + requestUserId + "to" + emotionRecordId;
9996
ChatRequest chatRequest = new ChatRequest(requestUserId, responseUserId, emotionRecordId);
@@ -105,8 +102,8 @@ public ResponseResult saveRequestToRedis(Long requestUserId, Long emotionRecordI
105102
User requestUser = userRepository.findByUserIdWithCache(requestUserId)
106103
.orElseThrow(UserNotFoundException::new);
107104
AlertChatRequest alertChatRequest = new AlertChatRequest(emotionRecordId, requestUser.getNickname());
108-
Alert alert = new Alert("chatRequest", alertChatRequest);
109-
alertService.send(responseUserId, "alarm", alert);
105+
Alert alert = alertService.createAlert(responseUserId, "alarm", alertChatRequest);
106+
kafkaProducer.send(TOPIC, alert);
110107

111108
return new ResponseResult(ErrorCode.SUCCESS);
112109
} catch (EmotionRecordNotFoundException e) {
@@ -131,7 +128,9 @@ public ResponseResult deleteRequestFromRedis(Long userId, Long emotionRecordId)
131128
// Redis에 Key가 존재하는 경우 삭제 (KEY가 없는 경우 400)
132129
if (Boolean.TRUE.equals(redisTemplate.hasKey(key))) {
133130
redisTemplate.delete(key);
134-
alertService.send(recordIdInUserId, "cancel", "Chat request has been canceled.");
131+
Alert alert = alertService.createAlert(recordIdInUserId, "cancel", "Chat request has been canceled.");
132+
kafkaProducer.send(TOPIC, alert);
133+
log.info("tset");
135134
return new ResponseResult(ErrorCode.SUCCESS);
136135
} else {
137136
return new ResponseResult(400, "ChatRequest not found or expired.");
@@ -165,7 +164,8 @@ public ResponseResult requestRejected(Long responseUserId, ChatRejectDto chatRej
165164
// Redis에 Key가 존재하는 경우 삭제 (KEY가 없는 경우 400)
166165
if (Boolean.TRUE.equals(redisTemplate.hasKey(key))) {
167166
redisTemplate.delete(key);
168-
alertService.send(requestUserId, "fail", "채팅 요청을 거부했습니다");
167+
Alert alert = alertService.createAlert(requestUserId, "fail", "채팅 요청을 거부했습니다");
168+
kafkaProducer.send(TOPIC, alert);
169169
return new ResponseResult(ErrorCode.SUCCESS);
170170
} else {
171171
return new ResponseResult(400, "ChatRequest not found or expired.");
@@ -214,7 +214,10 @@ public ResponseResult createChatRoom(Long userId, Long recordId, String requestN
214214
if (chatRoomId.isPresent()) {
215215
Map<String, Object> map = new HashMap<>();
216216
map.put("chatRoomId", chatRoomId.get());
217-
alertService.send(requestUserId, "accept", map);
217+
218+
Alert alert = alertService.createAlert(requestUserId, "accept", map);
219+
kafkaProducer.send(TOPIC, alert);
220+
218221
return new ResponseResult(map);
219222
}
220223

@@ -241,7 +244,8 @@ public ResponseResult createChatRoom(Long userId, Long recordId, String requestN
241244
map.put("chatRoomId", chatRoom.getChatRoomId());
242245

243246
// 요청자에게 방번호를 보냄
244-
alertService.send(requestUserId, "accept", map);
247+
Alert alert = alertService.createAlert(requestUserId, "accept", map);
248+
kafkaProducer.send(TOPIC, alert);
245249

246250
userStatusService.setChatting(userId, true);
247251

0 commit comments

Comments
 (0)