Skip to content

Commit 32e93ed

Browse files
authored
Merge pull request #644 from DSM-PICK/643-sse-heartbeat-적용
643 sse heartbeat 적용
2 parents 05b73f7 + 398cf65 commit 32e93ed

File tree

3 files changed

+45
-5
lines changed

3 files changed

+45
-5
lines changed

src/main/kotlin/dsm/pick2024/infrastructure/schedule/ScheduleService.kt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import dsm.pick2024.domain.timetable.port.`in`.SaveTimetableUseCase
1111
import dsm.pick2024.domain.timetable.port.out.DeleteTimeTablePort
1212
import dsm.pick2024.domain.weekendmeal.port.`in`.NotificationWeekendMealUseCase
1313
import dsm.pick2024.domain.weekendmeal.port.`in`.UpdateWeekendMealUseCase
14+
import dsm.pick2024.infrastructure.sse.port.out.SseRegistryPort
1415
import org.springframework.scheduling.annotation.Scheduled
1516
import org.springframework.stereotype.Component
1617

@@ -26,7 +27,8 @@ class ScheduleService(
2627
private val saveScheduleUseCase: SaveScheduleUseCase,
2728
private val updateWeekendMealUseCase: UpdateWeekendMealUseCase,
2829
private val sendNotificationSelfStudyTeacherUseCase: SendNotificationSelfStudyTeacherUseCase,
29-
private val notificationWeekendMealUseCase: NotificationWeekendMealUseCase
30+
private val notificationWeekendMealUseCase: NotificationWeekendMealUseCase,
31+
private val sseRegistryPort: SseRegistryPort
3032
) {
3133
@Scheduled(cron = "0 30 20 * * ?", zone = "Asia/Seoul")
3234
fun deleteTable() {
@@ -76,4 +78,9 @@ class ScheduleService(
7678
fun notificationWeekendMeal() {
7779
notificationWeekendMealUseCase.execute()
7880
}
81+
82+
@Scheduled(cron = "0/30 * 8-21 * * 1-5", zone = "Asia/Seoul")
83+
fun sseHeartbeat() {
84+
sseRegistryPort.sendHeartbeat()
85+
}
7986
}

src/main/kotlin/dsm/pick2024/infrastructure/sse/SseRegistry.kt

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package dsm.pick2024.infrastructure.sse
22

33
import dsm.pick2024.infrastructure.sse.port.out.SseRegistryPort
4+
import org.apache.catalina.connector.ClientAbortException
45
import org.slf4j.LoggerFactory
56
import org.springframework.stereotype.Component
67
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter
8+
import java.io.IOException
79
import java.util.UUID
810
import java.util.concurrent.ConcurrentHashMap
911
import java.util.concurrent.CopyOnWriteArrayList
@@ -17,10 +19,7 @@ class SseRegistry : SseRegistryPort {
1719
val emitter = SseEmitter(60 * 60 * 1000L * 2) // 2시간
1820
emitters.computeIfAbsent(userId) { CopyOnWriteArrayList() }.add(emitter)
1921

20-
val cleanup = Runnable { remove(userId, emitter) }
21-
emitter.onCompletion(cleanup)
22-
emitter.onTimeout(cleanup)
23-
emitter.onError { cleanup.run() }
22+
registerEmitterCallbacks(emitter, userId)
2423

2524
return emitter
2625
}
@@ -37,9 +36,42 @@ class SseRegistry : SseRegistryPort {
3736
try {
3837
emitter.send(SseEmitter.event().data(data ?: "."))
3938
} catch (e: Exception) {
39+
if (isClientAbort(e)) {
40+
emitter.completeWithError(e)
41+
remove(userId, emitter)
42+
continue
43+
}
4044
logger.error("User Event Send Failed ${e.message}", e)
45+
emitter.completeWithError(e)
4146
remove(userId, emitter)
4247
}
4348
}
4449
}
50+
51+
override fun sendHeartbeat() {
52+
for (userId in emitters.keys) {
53+
sendToUser(userId, "heartbeat")
54+
}
55+
}
56+
57+
// Broken pipe error인지 확인
58+
private fun isClientAbort(e: Exception): Boolean {
59+
if (e is ClientAbortException) return true
60+
val io = e as? IOException
61+
val message = (io?.message ?: e.message).orEmpty()
62+
return message.contains("Broken pipe", ignoreCase = true)
63+
}
64+
65+
private fun registerEmitterCallbacks(emitter: SseEmitter, userId: UUID) {
66+
val cleanup = Runnable { remove(userId, emitter) }
67+
68+
emitter.onCompletion(cleanup)
69+
emitter.onTimeout {
70+
emitter.complete()
71+
cleanup.run()
72+
}
73+
emitter.onError {
74+
cleanup.run()
75+
}
76+
}
4577
}

src/main/kotlin/dsm/pick2024/infrastructure/sse/port/out/SseRegistryPort.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,5 @@ interface SseRegistryPort {
77
fun sendToUser(userId: UUID, data: Any?)
88
fun remove(userId: UUID, emitter: SseEmitter)
99
fun add(userId: UUID): SseEmitter
10+
fun sendHeartbeat()
1011
}

0 commit comments

Comments
 (0)