11package com .memesphere .domain .notification .service ;
22
33import com .memesphere .domain .chartdata .entity .ChartData ;
4+ import com .memesphere .domain .chartdata .repository .ChartDataRepository ;
45import com .memesphere .domain .memecoin .entity .MemeCoin ;
56import com .memesphere .domain .notification .converter .NotificationConverter ;
67import com .memesphere .domain .notification .entity .Notification ;
1112import com .memesphere .global .apipayload .code .status .ErrorStatus ;
1213import com .memesphere .global .apipayload .exception .GeneralException ;
1314import lombok .RequiredArgsConstructor ;
15+ import lombok .extern .log4j .Log4j2 ;
16+ import org .springframework .data .domain .PageRequest ;
17+ import org .springframework .data .domain .Sort ;
1418import org .springframework .stereotype .Service ;
1519import org .springframework .web .servlet .mvc .method .annotation .SseEmitter ;
1620
21+ import java .awt .print .Pageable ;
1722import java .io .IOException ;
1823import java .math .BigDecimal ;
1924import java .time .LocalDateTime ;
2328import java .util .Optional ;
2429import java .util .stream .Collectors ;
2530
31+ @ Log4j2
2632@ Service
2733@ RequiredArgsConstructor
2834public class PushNotificationServiceImpl implements PushNotificationService {
2935
3036 private final EmitterRepository emitterRepository ;
3137 private final NotificationRepository notificationRepository ;
38+ private final ChartDataRepository chartDataRepository ;
3239
3340 // 연결 지속 시간 설정 : 한시간
3441 private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60 ;
3542
3643 @ Override
37- public SseEmitter subscribe (Long UserId , String lastEventId ) {
44+ public SseEmitter subscribe (Long userId , String lastEventId ) {
3845
3946 // 고유한 아이디 생성
40- String emitterId = UserId + "_" + System .currentTimeMillis (); // 사용자 id + 현재 시간을 밀리초 단위의 long값
47+ String emitterId = userId + "_" + System .currentTimeMillis (); // 사용자 id + 현재 시간을 밀리초 단위의 long값
4148 SseEmitter emitter = emitterRepository .save (emitterId , new SseEmitter (DEFAULT_TIMEOUT ));
4249
4350 // 클라이언트가 SSE 연결을 종료하면 실행됨
4451 emitter .onCompletion (() -> emitterRepository .deleteById (emitterId ));
45-
4652 // 지정된 시간이 지나거나 클라이언트가 요청을 안하면 실행됨
4753 emitter .onTimeout (() -> emitterRepository .deleteById (emitterId ));
4854
4955 // 최초 연결 더미데이터가 없으면 503 에러가 나므로 더미 데이터 생성
50- sendToClient (emitter , emitterId , "EventStream Created. [userId=" + UserId + "]" );
56+ sendToClient (emitter , emitterId , "EventStream Created. [userId=" + userId + "]" );
5157
5258 if (!lastEventId .isEmpty ()) {
53- Map <String , Object > events = emitterRepository .findAllEventCacheStartWithByUserId (emitterId );
59+ Map <String , Object > events = emitterRepository .findAllEventCacheStartWithByUserId (String . valueOf ( userId ) );
5460 events .entrySet ().stream ()
5561 .filter (entry -> lastEventId .compareTo (entry .getKey ()) < 0 )
5662 .forEach (entry -> sendToClient (emitter , entry .getKey (), entry .getValue ()));
@@ -71,6 +77,7 @@ public void send(Long userId) {
7177 List <Notification > filteredNotifications = notifications .stream ()
7278 .filter (notification -> isVolatilityExceeded (notification ))
7379 .collect (Collectors .toList ());
80+ System .out .println ("알림:" +filteredNotifications .size ());
7481
7582 if (filteredNotifications .isEmpty ()) {
7683 return ; // 기준을 충족하는 변동성이 없으면 전송하지 않음
@@ -88,10 +95,14 @@ public void send(Long userId) {
8895
8996 private void sendToClient (SseEmitter emitter , String emitterId , Object data ) {
9097 try {
91- emitter .send (SseEmitter .event ()
92- .id (emitterId )
93- .data (data ));
98+ if (emitter != null ) {
99+ System .out .println ("-------" );
100+ emitter .send (SseEmitter .event ()
101+ .id (emitterId )
102+ .data (data ));
103+ }
94104 } catch (IOException exception ) {
105+ log .error ("SSE 연결 오류: {}" , exception .getMessage ());
95106 emitterRepository .deleteById (emitterId );
96107 throw new GeneralException (ErrorStatus .CANNOT_PUSH_NOTIFICATION );
97108 }
@@ -108,46 +119,7 @@ private boolean isVolatilityExceeded(Notification notification) {
108119 throw new GeneralException (ErrorStatus .CANNOT_LOAD_CHARTDATA );
109120 }
110121
111- // 최신 가격 가져오기
112- Optional <ChartData > latestDataOpt = chartDataList .stream ()
113- .max (Comparator .comparing (ChartData ::getPrice ));
114-
115- if (latestDataOpt .isEmpty ()) {
116- throw new GeneralException (ErrorStatus .CANNOT_LOAD_CHARTDATA ); // 최신 데이터가 존재하지 않을 경우
117- }
118-
119- BigDecimal latestPrice = latestDataOpt .get ().getPrice ();
120-
121- // 기준 시간 내 가장 오래된 가격 가져오기
122- Optional <ChartData > oldestDataOpt = chartDataList .stream ()
123- .filter (data -> data .getCreatedAt ().isAfter (LocalDateTime .now ().minusMinutes (notification .getStTime ())))
124- .min (Comparator .comparing (ChartData ::getPrice ));
125-
126- if (oldestDataOpt .isEmpty ()) {
127- throw new GeneralException (ErrorStatus .CANNOT_LOAD_CHARTDATA );
128- }
129-
130- BigDecimal oldestPrice = oldestDataOpt .get ().getPrice ();
131-
132- if (oldestPrice == null || latestPrice == null ) {
133- throw new GeneralException (ErrorStatus .CANNOT_CHECK_VOLATILITY );
134- }
135-
136- // 변동성 계산
137- BigDecimal priceDiff = latestPrice .subtract (oldestPrice );
138- BigDecimal volatility = priceDiff
139- .divide (oldestPrice , 4 , BigDecimal .ROUND_HALF_UP ) // 나눗셈 수행(소수점 4자리 반올림)
140- .multiply (new BigDecimal ("100" )); // 백분율 변환
141- boolean isIncrease = volatility .compareTo (BigDecimal .ZERO ) > 0 ; // 상승 여부 확인 (True: 상승, False: 하락)
142-
143- return volatility .abs ().intValue () > notification .getVolatility ();
144- // if (volatility.abs() > notification.getVolatility()) {
145- // if (notification.getIsRising() & isIncrease) {
146- //
147- // } else if (!(notification.getIsRising()) & isIncrease)) {
148- //
149- // }
150- // }
122+ return true ;
151123 }
152124
153125}
0 commit comments