1111import com .memesphere .global .apipayload .ApiResponse ;
1212import com .memesphere .global .apipayload .code .status .ErrorStatus ;
1313import com .memesphere .global .apipayload .exception .GeneralException ;
14+ import com .memesphere .global .jwt .CustomUserDetails ;
1415import lombok .RequiredArgsConstructor ;
1516import lombok .extern .log4j .Log4j2 ;
1617import org .springframework .data .domain .PageRequest ;
2425import java .math .RoundingMode ;
2526import java .time .LocalDateTime ;
2627import java .util .*;
28+ import java .util .concurrent .ExecutorService ;
29+ import java .util .concurrent .Executors ;
2730import java .util .stream .Collectors ;
2831
2932@ Log4j2
@@ -34,6 +37,7 @@ public class PushNotificationServiceImpl implements PushNotificationService {
3437 private final EmitterRepository emitterRepository ;
3538 private final NotificationRepository notificationRepository ;
3639 private final ChartDataRepository chartDataRepository ;
40+ private final ExecutorService executorService = Executors .newSingleThreadExecutor ();
3741
3842 // 연결 지속 시간 설정 : 한시간
3943 private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60 ;
@@ -61,18 +65,6 @@ public SseEmitter subscribe(Long userId, String lastEventId) {
6165 .forEach (entry -> sendToClient (emitter , entry .getKey (), entry .getValue ()));
6266 }
6367
64- List <Notification > notifications = notificationRepository .findAllByUserId (userId ); // 사용자가 등록한 알림 전부 가져오기
65-
66- // 변동성을 초과하는 알림 필터링
67- List <Notification > filteredNotifications = notifications .stream ()
68- .filter (notification -> isVolatilityExceeded (notification ))
69- .collect (Collectors .toList ());
70-
71- if (!filteredNotifications .isEmpty ()) {
72- notifications .forEach (notification -> {
73- send (notification , userId );
74- });
75- }
7668 return emitter ;
7769 }
7870
@@ -91,15 +83,35 @@ private void sendToClient(SseEmitter emitter, String emitterId, Object data) {
9183 }
9284
9385 @ Override
94- public void send (Notification notification , Long userId ) {
86+ public void send (Long userId ) {
9587
96- // 실시간 알림 전송 - 로그인 한 유저의 SseEmitter 모두 가져오기
97- Map <String , SseEmitter > sseEmitters = emitterRepository .findAllEmitterStartWithByUserId (String .valueOf (userId ));
88+ List <Notification > notifications = notificationRepository .findAllByUserId (userId ); // 사용자가 등록한 알림 전부 가져오기
9889
99- sseEmitters .forEach ((key , emitter ) -> {
100- emitterRepository .saveEventCache (key , notification );
101- sendToClient (emitter , key , NotificationConverter .toNotificationCreateResponse (notification , notification .getMemeCoin ()));
102- });
90+ // 변동성을 초과하는 알림 필터링
91+ List <Notification > filteredNotifications = notifications .stream ()
92+ .filter (notification -> isVolatilityExceeded (notification ))
93+ .collect (Collectors .toList ());
94+
95+ if (!filteredNotifications .isEmpty ()) {
96+ // 실시간 알림 전송 - 로그인 한 유저의 SseEmitter 모두 가져오기
97+ Map <String , SseEmitter > sseEmitters = emitterRepository .findAllEmitterStartWithByUserId (String .valueOf (userId ));
98+
99+ sseEmitters .forEach ((key , emitter ) -> {
100+ executorService .submit (() -> {
101+ filteredNotifications .forEach (notification -> {
102+
103+ emitterRepository .saveEventCache (key , notification );
104+ sendToClient (emitter , key , NotificationConverter .toNotificationCreateResponse (notification , notification .getMemeCoin ()));
105+
106+ try {
107+ Thread .sleep (500 ); // 0.5초 간격으로 전송
108+ } catch (InterruptedException e ) {
109+ Thread .currentThread ().interrupt ();
110+ }
111+ });
112+ });
113+ });
114+ }
103115 }
104116
105117 private boolean isVolatilityExceeded (Notification notification ) {
@@ -113,18 +125,20 @@ private boolean isVolatilityExceeded(Notification notification) {
113125 throw new GeneralException (ErrorStatus .CANNOT_LOAD_CHARTDATA );
114126 }
115127
128+ LocalDateTime notificationTime = notification .getCreatedAt ();
129+
116130 Integer count = notification .getStTime () / 10 ; //몇 번 가져올 것인지 결정
117131 Pageable pageable = (Pageable ) PageRequest .of (0 , count , Sort .by (Sort .Direction .DESC , "createdAt" ));
118- List <ChartData > lastNData = chartDataRepository .findByMemeCoinOrderByRecordedTimeDesc (memeCoin , pageable );
132+ List <ChartData > lastNData = chartDataRepository .findByMemeCoinAndRecordedTimeAfterOrderByRecordedTimeDesc (memeCoin , notificationTime , pageable );
119133
120134 if (lastNData .size () < count ) {
121135 return false ; // 비교할 데이터가 부족하면 알림을 보내지 않음
122136 }
123137
124138 BigDecimal sum = lastNData .stream ()
125- .map (ChartData ::getPrice )
139+ .map (ChartData ::getPriceChangeRate )
126140 .reduce (BigDecimal .ZERO , BigDecimal ::add );
127- BigDecimal average = sum .divide (BigDecimal .valueOf (lastNData . size () ), 4 , RoundingMode .HALF_UP );
141+ BigDecimal average = sum .divide (BigDecimal .valueOf (count ), 4 , RoundingMode .HALF_UP );
128142 BigDecimal definedVolatility = new BigDecimal (notification .getVolatility ());
129143
130144 if (notification .getIsRising ()) { // 상승인 경우
@@ -133,5 +147,4 @@ private boolean isVolatilityExceeded(Notification notification) {
133147 return average .compareTo (definedVolatility ) < 0 ;
134148 }
135149 }
136-
137150}
0 commit comments