Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package team.aliens.dms.common.spi

import team.aliens.dms.common.dto.OutboxData
import team.aliens.dms.common.dto.OutboxStatus
import java.util.UUID

interface OutboxPort {
fun save(outbox: OutboxData): OutboxData
fun deleteById(id: UUID)
fun findByStatus(status: OutboxStatus): List<OutboxData>
fun delete(outbox: OutboxData)
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package team.aliens.dms.event.handler

import com.fasterxml.jackson.databind.ObjectMapper
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import org.springframework.transaction.event.TransactionPhase
import org.springframework.transaction.event.TransactionalEventListener
Expand All @@ -13,23 +12,18 @@ import team.aliens.dms.contract.remote.rabbitmq.SaveDeviceTokenMessage
import team.aliens.dms.event.DeleteDeviceTokenEvent
import team.aliens.dms.event.DeviceTokenEvent
import team.aliens.dms.event.SaveDeviceTokenEvent
import team.aliens.dms.thirdparty.messagebroker.NotificationProducer
import java.util.UUID

@Component
class DeviceTokenEventHandler(
private val outboxPort: OutboxPort,
private val notificationProducer: NotificationProducer,
private val objectMapper: ObjectMapper
) {
private val log = LoggerFactory.getLogger(this::class.java)
private val outboxIdsByEventIdentity = ThreadLocal<MutableMap<Int, UUID>>()

@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
fun saveOutbox(event: DeviceTokenEvent) {
val (eventType, message) = createMessage(event)

val saved = outboxPort.save(
outboxPort.save(
OutboxData(
id = null,
aggregateType = "device_token",
Expand All @@ -38,29 +32,6 @@ class DeviceTokenEventHandler(
status = OutboxStatus.PENDING
)
)
val map = outboxIdsByEventIdentity.get() ?: mutableMapOf<Int, UUID>().also {
outboxIdsByEventIdentity.set(it)
}

map[System.identityHashCode(event)] = saved.id!!
}

@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
fun publishMessage(event: DeviceTokenEvent) {
val (_, message) = createMessage(event)
val map = outboxIdsByEventIdentity.get()
val outboxId = map.remove(System.identityHashCode(event))

try {
notificationProducer.sendMessage(message)
outboxId?.let { outboxPort.deleteById(it) }
} catch (e: Exception) {
log.warn("Failed to send device token message immediately, will be retried by scheduler", e)
} finally {
if (map.isNullOrEmpty()) {
outboxIdsByEventIdentity.remove()
}
}
}

private fun createMessage(event: DeviceTokenEvent): Pair<String, Any> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package team.aliens.dms.event.handler

import com.fasterxml.jackson.databind.ObjectMapper
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import org.springframework.transaction.event.TransactionPhase
import org.springframework.transaction.event.TransactionalEventListener
Expand All @@ -15,23 +14,18 @@ import team.aliens.dms.event.GroupNotificationEvent
import team.aliens.dms.event.NotificationEvent
import team.aliens.dms.event.SingleNotificationEvent
import team.aliens.dms.event.TopicNotificationEvent
import team.aliens.dms.thirdparty.messagebroker.NotificationProducer
import java.util.UUID

@Component
class NotificationEventHandler(
private val outboxPort: OutboxPort,
private val notificationProducer: NotificationProducer,
private val objectMapper: ObjectMapper
) {
private val log = LoggerFactory.getLogger(this::class.java)
private val outboxIdsByEventIdentity = ThreadLocal<MutableMap<Int, UUID>>()

@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
fun saveOutbox(event: NotificationEvent) {
val (eventType, message) = createMessage(event)

val saved = outboxPort.save(
outboxPort.save(
OutboxData(
id = null,
aggregateType = "notification",
Expand All @@ -40,30 +34,6 @@ class NotificationEventHandler(
status = OutboxStatus.PENDING
)
)

val map = outboxIdsByEventIdentity.get() ?: mutableMapOf<Int, UUID>().also {
outboxIdsByEventIdentity.set(it)
}

map[System.identityHashCode(event)] = saved.id!!
}

@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
fun publishMessage(event: NotificationEvent) {
val (_, message) = createMessage(event)
val map = outboxIdsByEventIdentity.get()
val outboxId = map.remove(System.identityHashCode(event))

try {
notificationProducer.sendMessage(message)
outboxId?.let { outboxPort.deleteById(it) }
} catch (e: Exception) {
log.warn("Failed to send notification immediately, will be retried by scheduler", e)
} finally {
if (map.isNullOrEmpty()) {
outboxIdsByEventIdentity.remove()
}
}
}

private fun createMessage(event: NotificationEvent): Pair<String, Any> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ spring:
job-store-type: jdbc
jdbc:
initialize-schema: never
overwrite-existing-jobs: true
auto-startup: true
wait-for-jobs-to-complete-on-shutdown: true
properties:
org:
quartz:
Expand All @@ -65,6 +68,9 @@ spring:
tablePrefix: QRTZ_
isClustered: ${QUARTZ_CLUSTERED:false}
useProperties: false
misfireThreshold: 300000
acquireTriggersWithinLock: true
txIsolationLevelSerializable: false
threadPool:
class: org.quartz.simpl.SimpleThreadPool
threadCount: 10
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package team.aliens.dms.event.handler
import com.fasterxml.jackson.databind.ObjectMapper
import io.kotest.core.spec.IsolationMode
import io.kotest.core.spec.style.DescribeSpec
import io.mockk.Runs
import io.mockk.every
import io.mockk.just
import io.mockk.mockk
import io.mockk.verify
import team.aliens.dms.common.dto.OutboxData
Expand All @@ -14,17 +12,15 @@ import team.aliens.dms.common.spi.OutboxPort
import team.aliens.dms.contract.model.notification.DeviceTokenInfo
import team.aliens.dms.contract.remote.rabbitmq.SaveDeviceTokenMessage
import team.aliens.dms.event.SaveDeviceTokenEvent
import team.aliens.dms.thirdparty.messagebroker.NotificationProducer
import java.util.UUID

class DeviceTokenEventHandlerTest : DescribeSpec({

isolationMode = IsolationMode.InstancePerLeaf

val outboxPort = mockk<OutboxPort>()
val notificationProducer = mockk<NotificationProducer>()
val objectMapper = ObjectMapper()
val deviceTokenEventHandler = DeviceTokenEventHandler(outboxPort, notificationProducer, objectMapper)
val deviceTokenEventHandler = DeviceTokenEventHandler(outboxPort, objectMapper)

describe("saveOutbox") {
context("SaveDeviceTokenEvent가 발생하면") {
Expand Down Expand Up @@ -61,68 +57,4 @@ class DeviceTokenEventHandlerTest : DescribeSpec({
}
}
}

describe("publishMessage") {
context("메시지 전송에 성공하면") {
val userId = UUID.randomUUID()
val deviceTokenInfo = DeviceTokenInfo(
userId = userId,
schoolId = UUID.randomUUID(),
token = "test-fcm-token"
)
val event = SaveDeviceTokenEvent(deviceTokenInfo = deviceTokenInfo)
val outboxId = UUID.randomUUID()
val savedOutbox = OutboxData(
id = outboxId,
aggregateType = "device_token",

eventType = "SaveDeviceTokenMessage",
payload = objectMapper.writeValueAsString(SaveDeviceTokenMessage(deviceTokenInfo)),
status = OutboxStatus.PENDING,
retryCount = 0
)

every { outboxPort.save(any()) } returns savedOutbox
every { notificationProducer.sendMessage(any()) } just Runs
every { outboxPort.deleteById(outboxId) } just Runs

it("메시지를 전송하고 Outbox를 삭제한다") {
deviceTokenEventHandler.saveOutbox(event)
deviceTokenEventHandler.publishMessage(event)

verify { notificationProducer.sendMessage(any()) }
verify { outboxPort.deleteById(outboxId) }
}
}

context("메시지 전송에 실패하면") {
val userId = UUID.randomUUID()
val deviceTokenInfo = DeviceTokenInfo(
userId = userId,
schoolId = UUID.randomUUID(),
token = "test-fcm-token"
)
val event = SaveDeviceTokenEvent(deviceTokenInfo = deviceTokenInfo)
val outboxId = UUID.randomUUID()
val savedOutbox = OutboxData(
id = outboxId,
aggregateType = "device_token",
eventType = "SaveDeviceTokenMessage",
payload = objectMapper.writeValueAsString(SaveDeviceTokenMessage(deviceTokenInfo)),
status = OutboxStatus.PENDING,
retryCount = 0
)

every { outboxPort.save(any()) } returns savedOutbox
every { notificationProducer.sendMessage(any()) } throws RuntimeException("Send failed")

it("Outbox를 삭제하지 않고 스케줄러가 재시도하도록 남겨둔다") {
deviceTokenEventHandler.saveOutbox(event)
deviceTokenEventHandler.publishMessage(event)

verify { notificationProducer.sendMessage(any()) }
verify(exactly = 0) { outboxPort.deleteById(any()) }
}
}
}
})
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
import io.kotest.core.spec.IsolationMode
import io.kotest.core.spec.style.DescribeSpec
import io.mockk.Runs
import io.mockk.every
import io.mockk.just
import io.mockk.mockk
import io.mockk.verify
import team.aliens.dms.common.dto.OutboxData
Expand All @@ -15,17 +13,15 @@ import team.aliens.dms.common.spi.OutboxPort
import team.aliens.dms.contract.model.notification.NotificationInfo
import team.aliens.dms.contract.remote.rabbitmq.SingleNotificationMessage
import team.aliens.dms.event.SingleNotificationEvent
import team.aliens.dms.thirdparty.messagebroker.NotificationProducer
import java.util.UUID

class NotificationEventHandlerTest : DescribeSpec({

isolationMode = IsolationMode.InstancePerLeaf

val outboxPort = mockk<OutboxPort>()
val notificationProducer = mockk<NotificationProducer>()
val objectMapper = ObjectMapper().registerModule(JavaTimeModule())
val notificationEventHandler = NotificationEventHandler(outboxPort, notificationProducer, objectMapper)
val notificationEventHandler = NotificationEventHandler(outboxPort, objectMapper)

describe("saveOutbox") {
context("SingleNotificationEvent가 발생하면") {
Expand Down Expand Up @@ -63,69 +59,4 @@ class NotificationEventHandlerTest : DescribeSpec({
}
}
}

describe("publishMessage") {
context("메시지 전송에 성공하면") {
val userId = UUID.randomUUID()
val notificationInfo = mockk<NotificationInfo>(relaxed = true)
val event = SingleNotificationEvent(
userId = userId,
notificationInfo = notificationInfo
)
val outboxId = UUID.randomUUID()
val savedOutbox = OutboxData(
id = outboxId,
aggregateType = "notification",
eventType = SingleNotificationMessage.TYPE,
payload = objectMapper.writeValueAsString(
SingleNotificationMessage(userId, notificationInfo)
),
status = OutboxStatus.PENDING,
retryCount = 0
)

every { outboxPort.save(any()) } returns savedOutbox
every { notificationProducer.sendMessage(any()) } just Runs
every { outboxPort.deleteById(outboxId) } just Runs

it("메시지를 전송하고 Outbox를 삭제한다") {
notificationEventHandler.saveOutbox(event)
notificationEventHandler.publishMessage(event)

verify { notificationProducer.sendMessage(any()) }
verify { outboxPort.deleteById(outboxId) }
}
}

context("메시지 전송에 실패하면") {
val userId = UUID.randomUUID()
val notificationInfo = mockk<NotificationInfo>(relaxed = true)
val event = SingleNotificationEvent(
userId = userId,
notificationInfo = notificationInfo
)
val outboxId = UUID.randomUUID()
val savedOutbox = OutboxData(
id = outboxId,
aggregateType = "notification",
eventType = SingleNotificationMessage.TYPE,
payload = objectMapper.writeValueAsString(
SingleNotificationMessage(userId, notificationInfo)
),
status = OutboxStatus.PENDING,
retryCount = 0
)

every { outboxPort.save(any()) } returns savedOutbox
every { notificationProducer.sendMessage(any()) } throws RuntimeException("Send failed")

it("Outbox를 삭제하지 않고 스케줄러가 재시도하도록 남겨둔다") {
notificationEventHandler.saveOutbox(event)
notificationEventHandler.publishMessage(event)

verify { notificationProducer.sendMessage(any()) }
verify(exactly = 0) { outboxPort.deleteById(any()) }
}
}
}
})
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import team.aliens.dms.common.dto.OutboxStatus
import team.aliens.dms.common.spi.OutboxPort
import team.aliens.dms.persistence.outbox.mapper.OutboxMapper
import team.aliens.dms.persistence.outbox.repository.OutboxJpaRepository
import java.util.UUID

@Component
class OutboxPersistenceAdapter(
Expand All @@ -20,10 +19,6 @@ class OutboxPersistenceAdapter(
return outboxMapper.toDomain(saved)
}

override fun deleteById(id: UUID) {
outboxJpaRepository.deleteById(id)
}

override fun findByStatus(status: OutboxStatus): List<OutboxData> {
val jpaStatus = outboxMapper.toJpaStatus(status)
return outboxJpaRepository.findByStatus(jpaStatus)
Expand Down
Loading