Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ class ScheduleService(
private val notificationWeekendMealUseCase: NotificationWeekendMealUseCase,
private val sseRegistryPort: SseRegistryPort
) {
/**
* Removes all classroom and application entries from their data stores.
*
* Runs on the scheduled cron (20:30 Asia/Seoul) to clear classroom and application data.
*/
@Scheduled(cron = "0 30 20 * * ?", zone = "Asia/Seoul")
fun deleteTable() {
deleteClassRoomPort.deleteAll()
Expand Down Expand Up @@ -74,13 +79,21 @@ class ScheduleService(
sendNotificationSelfStudyTeacherUseCase.execute()
}

/**
* Sends notifications about weekend meals to recipients.
*
* Scheduled to run daily at 08:00 Asia/Seoul to trigger weekend-meal notification delivery.
*/
@Scheduled(cron = "0 0 8 * * *", zone = "Asia/Seoul")
fun notificationWeekendMeal() {
notificationWeekendMealUseCase.execute()
}

/**
* Sends a heartbeat signal to the server-sent events registry to keep client connections active.
*/
@Scheduled(cron = "0/30 * 8-21 * * 1-5", zone = "Asia/Seoul")
fun sseHeartbeat() {
sseRegistryPort.sendHeartbeat()
}
}
}
36 changes: 34 additions & 2 deletions src/main/kotlin/dsm/pick2024/infrastructure/sse/SseRegistry.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ class SseRegistry : SseRegistryPort {
private val emitters: MutableMap<UUID, MutableList<SseEmitter>> = ConcurrentHashMap()
private val logger = LoggerFactory.getLogger(SseRegistry::class.java)

/**
* Creates and registers a Server-Sent Events emitter associated with the given user.
*
* @param userId The UUID of the user to associate the emitter with.
* @return The registered `SseEmitter` instance for the user.
*/
override fun add(userId: UUID): SseEmitter {
val emitter = SseEmitter(60 * 60 * 1000L * 2) // 2시간
emitters.computeIfAbsent(userId) { CopyOnWriteArrayList() }.add(emitter)
Expand All @@ -30,6 +36,15 @@ class SseRegistry : SseRegistryPort {
if (list.isEmpty()) emitters.remove(userId)
}

/**
* Sends an SSE event with the given data to all active emitters for the specified user.
*
* If `data` is `null`, a single dot ("." ) is sent as the payload. Emitters that fail
* to send (including client aborts) are completed with the error and removed from the registry.
*
* @param userId The UUID of the target user whose emitters should receive the event.
* @param data The event payload to send; if `null`, a "." placeholder is sent.
*/
override fun sendToUser(userId: UUID, data: Any?) {
val list = emitters[userId] ?: return
for (emitter in list) {
Expand All @@ -48,20 +63,37 @@ class SseRegistry : SseRegistryPort {
}
}

/**
* Broadcasts a heartbeat event to all registered users.
*
* Iterates over the current registry of user IDs and sends a `"heartbeat"` event to each user's emitters.
* Emitters that fail during send (for example due to client disconnects) may be completed and removed from the registry as part of send handling.
*/
override fun sendHeartbeat() {
for (userId in emitters.keys) {
sendToUser(userId, "heartbeat")
}
}

// Broken pipe error인지 확인
/**
* Determines whether an exception represents a client-side abort (for example, a broken pipe).
*
* @param e The exception to inspect.
* @return `true` if the exception is a `ClientAbortException` or its message contains "Broken pipe" (case-insensitive), `false` otherwise.
*/
private fun isClientAbort(e: Exception): Boolean {
if (e is ClientAbortException) return true
val io = e as? IOException
val message = (io?.message ?: e.message).orEmpty()
return message.contains("Broken pipe", ignoreCase = true)
}

/**
* Registers lifecycle callbacks on the given SSE emitter to remove it from the registry when it completes, times out, or errors.
*
* @param emitter The SseEmitter to attach callbacks to.
* @param userId The owner user's UUID whose emitter list will be cleaned up when the emitter ends.
*/
private fun registerEmitterCallbacks(emitter: SseEmitter, userId: UUID) {
val cleanup = Runnable { remove(userId, emitter) }

Expand All @@ -75,4 +107,4 @@ class SseRegistry : SseRegistryPort {
cleanup.run()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,32 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter
import java.util.*

interface SseRegistryPort {
fun sendToUser(userId: UUID, data: Any?)
fun remove(userId: UUID, emitter: SseEmitter)
fun add(userId: UUID): SseEmitter
fun sendHeartbeat()
}
/**
* Sends the given payload to the Server-Sent Events connection for the specified user.
*
* @param userId The UUID of the target user.
* @param data The payload to send to the user's SSE emitter; may be `null`.
*/
fun sendToUser(userId: UUID, data: Any?)
/**
* Removes the specified SSE emitter associated with the given user.
*
* @param userId The UUID of the user whose emitter should be removed.
* @param emitter The SseEmitter instance to remove.
*/
fun remove(userId: UUID, emitter: SseEmitter)
/**
* Registers and returns an SSE emitter associated with the given user ID.
*
* @param userId The UUID of the user to associate with the emitter.
* @return The registered SseEmitter for the specified user.
*/
fun add(userId: UUID): SseEmitter
/**
* Sends a lightweight heartbeat to all registered SSE connections to keep them active.
*
* Implementations should emit a minimal heartbeat event to each active emitter so intermediate
* proxies and clients do not close idle connections due to inactivity.
*/
fun sendHeartbeat()
}