Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import dsm.pick2024.domain.timetable.port.`in`.SaveTimetableUseCase
import dsm.pick2024.domain.timetable.port.out.DeleteTimeTablePort
import dsm.pick2024.domain.weekendmeal.port.`in`.NotificationWeekendMealUseCase
import dsm.pick2024.domain.weekendmeal.port.`in`.UpdateWeekendMealUseCase
import dsm.pick2024.infrastructure.sse.port.out.SseRegistryPort
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component

Expand All @@ -26,7 +27,8 @@ class ScheduleService(
private val saveScheduleUseCase: SaveScheduleUseCase,
private val updateWeekendMealUseCase: UpdateWeekendMealUseCase,
private val sendNotificationSelfStudyTeacherUseCase: SendNotificationSelfStudyTeacherUseCase,
private val notificationWeekendMealUseCase: NotificationWeekendMealUseCase
private val notificationWeekendMealUseCase: NotificationWeekendMealUseCase,
private val sseRegistryPort: SseRegistryPort
) {
@Scheduled(cron = "0 30 20 * * ?", zone = "Asia/Seoul")
fun deleteTable() {
Expand Down Expand Up @@ -76,4 +78,9 @@ class ScheduleService(
fun notificationWeekendMeal() {
notificationWeekendMealUseCase.execute()
}

@Scheduled(cron = "0/30 * 8-21 * * 1-5", zone = "Asia/Seoul")
fun sseHeartbeat() {
sseRegistryPort.sendHeartbeat()
}
}
40 changes: 36 additions & 4 deletions src/main/kotlin/dsm/pick2024/infrastructure/sse/SseRegistry.kt
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package dsm.pick2024.infrastructure.sse

import dsm.pick2024.infrastructure.sse.port.out.SseRegistryPort
import org.apache.catalina.connector.ClientAbortException
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter
import java.io.IOException
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArrayList
Expand All @@ -17,10 +19,7 @@ class SseRegistry : SseRegistryPort {
val emitter = SseEmitter(60 * 60 * 1000L * 2) // 2시간
emitters.computeIfAbsent(userId) { CopyOnWriteArrayList() }.add(emitter)

val cleanup = Runnable { remove(userId, emitter) }
emitter.onCompletion(cleanup)
emitter.onTimeout(cleanup)
emitter.onError { cleanup.run() }
registerEmitterCallbacks(emitter, userId)

return emitter
}
Expand All @@ -37,9 +36,42 @@ class SseRegistry : SseRegistryPort {
try {
emitter.send(SseEmitter.event().data(data ?: "."))
} catch (e: Exception) {
if (isClientAbort(e)) {
emitter.completeWithError(e)
remove(userId, emitter)
continue
}
logger.error("User Event Send Failed ${e.message}", e)
emitter.completeWithError(e)
remove(userId, emitter)
}
}
}

override fun sendHeartbeat() {
for (userId in emitters.keys) {
sendToUser(userId, "heartbeat")
}
}

// Broken pipe error인지 확인
private fun isClientAbort(e: Exception): Boolean {
if (e is ClientAbortException) return true
val io = e as? IOException
val message = (io?.message ?: e.message).orEmpty()
return message.contains("Broken pipe", ignoreCase = true)
}

private fun registerEmitterCallbacks(emitter: SseEmitter, userId: UUID) {
val cleanup = Runnable { remove(userId, emitter) }

emitter.onCompletion(cleanup)
emitter.onTimeout {
emitter.complete()
cleanup.run()
}
emitter.onError {
cleanup.run()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ interface SseRegistryPort {
fun sendToUser(userId: UUID, data: Any?)
fun remove(userId: UUID, emitter: SseEmitter)
fun add(userId: UUID): SseEmitter
fun sendHeartbeat()
}