diff --git a/src/main/kotlin/dsm/pick2024/infrastructure/schedule/ScheduleService.kt b/src/main/kotlin/dsm/pick2024/infrastructure/schedule/ScheduleService.kt index 3c623247..4c791936 100644 --- a/src/main/kotlin/dsm/pick2024/infrastructure/schedule/ScheduleService.kt +++ b/src/main/kotlin/dsm/pick2024/infrastructure/schedule/ScheduleService.kt @@ -11,6 +11,7 @@ import dsm.pick2024.domain.timetable.port.`in`.SaveTimetableUseCase import dsm.pick2024.domain.timetable.port.out.DeleteTimeTablePort import dsm.pick2024.domain.weekendmeal.port.`in`.NotificationWeekendMealUseCase import dsm.pick2024.domain.weekendmeal.port.`in`.UpdateWeekendMealUseCase +import dsm.pick2024.infrastructure.sse.port.out.SseRegistryPort import org.springframework.scheduling.annotation.Scheduled import org.springframework.stereotype.Component @@ -26,7 +27,8 @@ class ScheduleService( private val saveScheduleUseCase: SaveScheduleUseCase, private val updateWeekendMealUseCase: UpdateWeekendMealUseCase, private val sendNotificationSelfStudyTeacherUseCase: SendNotificationSelfStudyTeacherUseCase, - private val notificationWeekendMealUseCase: NotificationWeekendMealUseCase + private val notificationWeekendMealUseCase: NotificationWeekendMealUseCase, + private val sseRegistryPort: SseRegistryPort ) { @Scheduled(cron = "0 30 20 * * ?", zone = "Asia/Seoul") fun deleteTable() { @@ -76,4 +78,9 @@ class ScheduleService( fun notificationWeekendMeal() { notificationWeekendMealUseCase.execute() } + + @Scheduled(cron = "0/30 * 8-21 * * 1-5", zone = "Asia/Seoul") + fun sseHeartbeat() { + sseRegistryPort.sendHeartbeat() + } } diff --git a/src/main/kotlin/dsm/pick2024/infrastructure/sse/SseRegistry.kt b/src/main/kotlin/dsm/pick2024/infrastructure/sse/SseRegistry.kt index 01ce0584..4d85ddb3 100644 --- a/src/main/kotlin/dsm/pick2024/infrastructure/sse/SseRegistry.kt +++ b/src/main/kotlin/dsm/pick2024/infrastructure/sse/SseRegistry.kt @@ -1,9 +1,11 @@ package dsm.pick2024.infrastructure.sse import dsm.pick2024.infrastructure.sse.port.out.SseRegistryPort +import org.apache.catalina.connector.ClientAbortException import org.slf4j.LoggerFactory import org.springframework.stereotype.Component import org.springframework.web.servlet.mvc.method.annotation.SseEmitter +import java.io.IOException import java.util.UUID import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.CopyOnWriteArrayList @@ -17,10 +19,7 @@ class SseRegistry : SseRegistryPort { val emitter = SseEmitter(60 * 60 * 1000L * 2) // 2시간 emitters.computeIfAbsent(userId) { CopyOnWriteArrayList() }.add(emitter) - val cleanup = Runnable { remove(userId, emitter) } - emitter.onCompletion(cleanup) - emitter.onTimeout(cleanup) - emitter.onError { cleanup.run() } + registerEmitterCallbacks(emitter, userId) return emitter } @@ -37,9 +36,42 @@ class SseRegistry : SseRegistryPort { try { emitter.send(SseEmitter.event().data(data ?: ".")) } catch (e: Exception) { + if (isClientAbort(e)) { + emitter.completeWithError(e) + remove(userId, emitter) + continue + } logger.error("User Event Send Failed ${e.message}", e) + emitter.completeWithError(e) remove(userId, emitter) } } } + + override fun sendHeartbeat() { + for (userId in emitters.keys) { + sendToUser(userId, "heartbeat") + } + } + + // Broken pipe error인지 확인 + private fun isClientAbort(e: Exception): Boolean { + if (e is ClientAbortException) return true + val io = e as? IOException + val message = (io?.message ?: e.message).orEmpty() + return message.contains("Broken pipe", ignoreCase = true) + } + + private fun registerEmitterCallbacks(emitter: SseEmitter, userId: UUID) { + val cleanup = Runnable { remove(userId, emitter) } + + emitter.onCompletion(cleanup) + emitter.onTimeout { + emitter.complete() + cleanup.run() + } + emitter.onError { + cleanup.run() + } + } } diff --git a/src/main/kotlin/dsm/pick2024/infrastructure/sse/port/out/SseRegistryPort.kt b/src/main/kotlin/dsm/pick2024/infrastructure/sse/port/out/SseRegistryPort.kt index d4424af6..56d121cd 100644 --- a/src/main/kotlin/dsm/pick2024/infrastructure/sse/port/out/SseRegistryPort.kt +++ b/src/main/kotlin/dsm/pick2024/infrastructure/sse/port/out/SseRegistryPort.kt @@ -7,4 +7,5 @@ interface SseRegistryPort { fun sendToUser(userId: UUID, data: Any?) fun remove(userId: UUID, emitter: SseEmitter) fun add(userId: UUID): SseEmitter + fun sendHeartbeat() }