Skip to content

Commit 0920170

Browse files
authored
merge: (#962) Outbox 패턴 단일 처리 지점으로 리팩토링
2 parents f1330e6 + 46a9f5b commit 0920170

File tree

7 files changed

+10
-207
lines changed

7 files changed

+10
-207
lines changed

dms-main/main-core/src/main/kotlin/team/aliens/dms/common/spi/OutboxPort.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,9 @@ package team.aliens.dms.common.spi
22

33
import team.aliens.dms.common.dto.OutboxData
44
import team.aliens.dms.common.dto.OutboxStatus
5-
import java.util.UUID
65

76
interface OutboxPort {
87
fun save(outbox: OutboxData): OutboxData
9-
fun deleteById(id: UUID)
108
fun findByStatus(status: OutboxStatus): List<OutboxData>
119
fun delete(outbox: OutboxData)
1210
}

dms-main/main-infrastructure/src/main/kotlin/team/aliens/dms/event/handler/DeviceTokenEventHandler.kt

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package team.aliens.dms.event.handler
22

33
import com.fasterxml.jackson.databind.ObjectMapper
4-
import org.slf4j.LoggerFactory
54
import org.springframework.stereotype.Component
65
import org.springframework.transaction.event.TransactionPhase
76
import org.springframework.transaction.event.TransactionalEventListener
@@ -13,23 +12,18 @@ import team.aliens.dms.contract.remote.rabbitmq.SaveDeviceTokenMessage
1312
import team.aliens.dms.event.DeleteDeviceTokenEvent
1413
import team.aliens.dms.event.DeviceTokenEvent
1514
import team.aliens.dms.event.SaveDeviceTokenEvent
16-
import team.aliens.dms.thirdparty.messagebroker.NotificationProducer
17-
import java.util.UUID
1815

1916
@Component
2017
class DeviceTokenEventHandler(
2118
private val outboxPort: OutboxPort,
22-
private val notificationProducer: NotificationProducer,
2319
private val objectMapper: ObjectMapper
2420
) {
25-
private val log = LoggerFactory.getLogger(this::class.java)
26-
private val outboxIdsByEventIdentity = ThreadLocal<MutableMap<Int, UUID>>()
2721

2822
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
2923
fun saveOutbox(event: DeviceTokenEvent) {
3024
val (eventType, message) = createMessage(event)
3125

32-
val saved = outboxPort.save(
26+
outboxPort.save(
3327
OutboxData(
3428
id = null,
3529
aggregateType = "device_token",
@@ -38,29 +32,6 @@ class DeviceTokenEventHandler(
3832
status = OutboxStatus.PENDING
3933
)
4034
)
41-
val map = outboxIdsByEventIdentity.get() ?: mutableMapOf<Int, UUID>().also {
42-
outboxIdsByEventIdentity.set(it)
43-
}
44-
45-
map[System.identityHashCode(event)] = saved.id!!
46-
}
47-
48-
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
49-
fun publishMessage(event: DeviceTokenEvent) {
50-
val (_, message) = createMessage(event)
51-
val map = outboxIdsByEventIdentity.get()
52-
val outboxId = map.remove(System.identityHashCode(event))
53-
54-
try {
55-
notificationProducer.sendMessage(message)
56-
outboxId?.let { outboxPort.deleteById(it) }
57-
} catch (e: Exception) {
58-
log.warn("Failed to send device token message immediately, will be retried by scheduler", e)
59-
} finally {
60-
if (map.isNullOrEmpty()) {
61-
outboxIdsByEventIdentity.remove()
62-
}
63-
}
6435
}
6536

6637
private fun createMessage(event: DeviceTokenEvent): Pair<String, Any> {

dms-main/main-infrastructure/src/main/kotlin/team/aliens/dms/event/handler/NotificationEventHandler.kt

Lines changed: 1 addition & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package team.aliens.dms.event.handler
22

33
import com.fasterxml.jackson.databind.ObjectMapper
4-
import org.slf4j.LoggerFactory
54
import org.springframework.stereotype.Component
65
import org.springframework.transaction.event.TransactionPhase
76
import org.springframework.transaction.event.TransactionalEventListener
@@ -15,23 +14,18 @@ import team.aliens.dms.event.GroupNotificationEvent
1514
import team.aliens.dms.event.NotificationEvent
1615
import team.aliens.dms.event.SingleNotificationEvent
1716
import team.aliens.dms.event.TopicNotificationEvent
18-
import team.aliens.dms.thirdparty.messagebroker.NotificationProducer
19-
import java.util.UUID
2017

2118
@Component
2219
class NotificationEventHandler(
2320
private val outboxPort: OutboxPort,
24-
private val notificationProducer: NotificationProducer,
2521
private val objectMapper: ObjectMapper
2622
) {
27-
private val log = LoggerFactory.getLogger(this::class.java)
28-
private val outboxIdsByEventIdentity = ThreadLocal<MutableMap<Int, UUID>>()
2923

3024
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
3125
fun saveOutbox(event: NotificationEvent) {
3226
val (eventType, message) = createMessage(event)
3327

34-
val saved = outboxPort.save(
28+
outboxPort.save(
3529
OutboxData(
3630
id = null,
3731
aggregateType = "notification",
@@ -40,30 +34,6 @@ class NotificationEventHandler(
4034
status = OutboxStatus.PENDING
4135
)
4236
)
43-
44-
val map = outboxIdsByEventIdentity.get() ?: mutableMapOf<Int, UUID>().also {
45-
outboxIdsByEventIdentity.set(it)
46-
}
47-
48-
map[System.identityHashCode(event)] = saved.id!!
49-
}
50-
51-
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
52-
fun publishMessage(event: NotificationEvent) {
53-
val (_, message) = createMessage(event)
54-
val map = outboxIdsByEventIdentity.get()
55-
val outboxId = map.remove(System.identityHashCode(event))
56-
57-
try {
58-
notificationProducer.sendMessage(message)
59-
outboxId?.let { outboxPort.deleteById(it) }
60-
} catch (e: Exception) {
61-
log.warn("Failed to send notification immediately, will be retried by scheduler", e)
62-
} finally {
63-
if (map.isNullOrEmpty()) {
64-
outboxIdsByEventIdentity.remove()
65-
}
66-
}
6737
}
6838

6939
private fun createMessage(event: NotificationEvent): Pair<String, Any> {

dms-main/main-infrastructure/src/main/resources/application.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ spring:
5353
job-store-type: jdbc
5454
jdbc:
5555
initialize-schema: never
56+
overwrite-existing-jobs: true
57+
auto-startup: true
58+
wait-for-jobs-to-complete-on-shutdown: true
5659
properties:
5760
org:
5861
quartz:
@@ -65,6 +68,9 @@ spring:
6568
tablePrefix: QRTZ_
6669
isClustered: ${QUARTZ_CLUSTERED:false}
6770
useProperties: false
71+
misfireThreshold: 300000
72+
acquireTriggersWithinLock: true
73+
txIsolationLevelSerializable: false
6874
threadPool:
6975
class: org.quartz.simpl.SimpleThreadPool
7076
threadCount: 10

dms-main/main-infrastructure/src/test/kotlin/team/aliens/dms/event/handler/DeviceTokenEventHandlerTest.kt

Lines changed: 1 addition & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@ package team.aliens.dms.event.handler
33
import com.fasterxml.jackson.databind.ObjectMapper
44
import io.kotest.core.spec.IsolationMode
55
import io.kotest.core.spec.style.DescribeSpec
6-
import io.mockk.Runs
76
import io.mockk.every
8-
import io.mockk.just
97
import io.mockk.mockk
108
import io.mockk.verify
119
import team.aliens.dms.common.dto.OutboxData
@@ -14,17 +12,15 @@ import team.aliens.dms.common.spi.OutboxPort
1412
import team.aliens.dms.contract.model.notification.DeviceTokenInfo
1513
import team.aliens.dms.contract.remote.rabbitmq.SaveDeviceTokenMessage
1614
import team.aliens.dms.event.SaveDeviceTokenEvent
17-
import team.aliens.dms.thirdparty.messagebroker.NotificationProducer
1815
import java.util.UUID
1916

2017
class DeviceTokenEventHandlerTest : DescribeSpec({
2118

2219
isolationMode = IsolationMode.InstancePerLeaf
2320

2421
val outboxPort = mockk<OutboxPort>()
25-
val notificationProducer = mockk<NotificationProducer>()
2622
val objectMapper = ObjectMapper()
27-
val deviceTokenEventHandler = DeviceTokenEventHandler(outboxPort, notificationProducer, objectMapper)
23+
val deviceTokenEventHandler = DeviceTokenEventHandler(outboxPort, objectMapper)
2824

2925
describe("saveOutbox") {
3026
context("SaveDeviceTokenEvent가 발생하면") {
@@ -61,68 +57,4 @@ class DeviceTokenEventHandlerTest : DescribeSpec({
6157
}
6258
}
6359
}
64-
65-
describe("publishMessage") {
66-
context("메시지 전송에 성공하면") {
67-
val userId = UUID.randomUUID()
68-
val deviceTokenInfo = DeviceTokenInfo(
69-
userId = userId,
70-
schoolId = UUID.randomUUID(),
71-
token = "test-fcm-token"
72-
)
73-
val event = SaveDeviceTokenEvent(deviceTokenInfo = deviceTokenInfo)
74-
val outboxId = UUID.randomUUID()
75-
val savedOutbox = OutboxData(
76-
id = outboxId,
77-
aggregateType = "device_token",
78-
79-
eventType = "SaveDeviceTokenMessage",
80-
payload = objectMapper.writeValueAsString(SaveDeviceTokenMessage(deviceTokenInfo)),
81-
status = OutboxStatus.PENDING,
82-
retryCount = 0
83-
)
84-
85-
every { outboxPort.save(any()) } returns savedOutbox
86-
every { notificationProducer.sendMessage(any()) } just Runs
87-
every { outboxPort.deleteById(outboxId) } just Runs
88-
89-
it("메시지를 전송하고 Outbox를 삭제한다") {
90-
deviceTokenEventHandler.saveOutbox(event)
91-
deviceTokenEventHandler.publishMessage(event)
92-
93-
verify { notificationProducer.sendMessage(any()) }
94-
verify { outboxPort.deleteById(outboxId) }
95-
}
96-
}
97-
98-
context("메시지 전송에 실패하면") {
99-
val userId = UUID.randomUUID()
100-
val deviceTokenInfo = DeviceTokenInfo(
101-
userId = userId,
102-
schoolId = UUID.randomUUID(),
103-
token = "test-fcm-token"
104-
)
105-
val event = SaveDeviceTokenEvent(deviceTokenInfo = deviceTokenInfo)
106-
val outboxId = UUID.randomUUID()
107-
val savedOutbox = OutboxData(
108-
id = outboxId,
109-
aggregateType = "device_token",
110-
eventType = "SaveDeviceTokenMessage",
111-
payload = objectMapper.writeValueAsString(SaveDeviceTokenMessage(deviceTokenInfo)),
112-
status = OutboxStatus.PENDING,
113-
retryCount = 0
114-
)
115-
116-
every { outboxPort.save(any()) } returns savedOutbox
117-
every { notificationProducer.sendMessage(any()) } throws RuntimeException("Send failed")
118-
119-
it("Outbox를 삭제하지 않고 스케줄러가 재시도하도록 남겨둔다") {
120-
deviceTokenEventHandler.saveOutbox(event)
121-
deviceTokenEventHandler.publishMessage(event)
122-
123-
verify { notificationProducer.sendMessage(any()) }
124-
verify(exactly = 0) { outboxPort.deleteById(any()) }
125-
}
126-
}
127-
}
12860
})

dms-main/main-infrastructure/src/test/kotlin/team/aliens/dms/event/handler/NotificationEventHandlerTest.kt

Lines changed: 1 addition & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
44
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule
55
import io.kotest.core.spec.IsolationMode
66
import io.kotest.core.spec.style.DescribeSpec
7-
import io.mockk.Runs
87
import io.mockk.every
9-
import io.mockk.just
108
import io.mockk.mockk
119
import io.mockk.verify
1210
import team.aliens.dms.common.dto.OutboxData
@@ -15,17 +13,15 @@ import team.aliens.dms.common.spi.OutboxPort
1513
import team.aliens.dms.contract.model.notification.NotificationInfo
1614
import team.aliens.dms.contract.remote.rabbitmq.SingleNotificationMessage
1715
import team.aliens.dms.event.SingleNotificationEvent
18-
import team.aliens.dms.thirdparty.messagebroker.NotificationProducer
1916
import java.util.UUID
2017

2118
class NotificationEventHandlerTest : DescribeSpec({
2219

2320
isolationMode = IsolationMode.InstancePerLeaf
2421

2522
val outboxPort = mockk<OutboxPort>()
26-
val notificationProducer = mockk<NotificationProducer>()
2723
val objectMapper = ObjectMapper().registerModule(JavaTimeModule())
28-
val notificationEventHandler = NotificationEventHandler(outboxPort, notificationProducer, objectMapper)
24+
val notificationEventHandler = NotificationEventHandler(outboxPort, objectMapper)
2925

3026
describe("saveOutbox") {
3127
context("SingleNotificationEvent가 발생하면") {
@@ -63,69 +59,4 @@ class NotificationEventHandlerTest : DescribeSpec({
6359
}
6460
}
6561
}
66-
67-
describe("publishMessage") {
68-
context("메시지 전송에 성공하면") {
69-
val userId = UUID.randomUUID()
70-
val notificationInfo = mockk<NotificationInfo>(relaxed = true)
71-
val event = SingleNotificationEvent(
72-
userId = userId,
73-
notificationInfo = notificationInfo
74-
)
75-
val outboxId = UUID.randomUUID()
76-
val savedOutbox = OutboxData(
77-
id = outboxId,
78-
aggregateType = "notification",
79-
eventType = SingleNotificationMessage.TYPE,
80-
payload = objectMapper.writeValueAsString(
81-
SingleNotificationMessage(userId, notificationInfo)
82-
),
83-
status = OutboxStatus.PENDING,
84-
retryCount = 0
85-
)
86-
87-
every { outboxPort.save(any()) } returns savedOutbox
88-
every { notificationProducer.sendMessage(any()) } just Runs
89-
every { outboxPort.deleteById(outboxId) } just Runs
90-
91-
it("메시지를 전송하고 Outbox를 삭제한다") {
92-
notificationEventHandler.saveOutbox(event)
93-
notificationEventHandler.publishMessage(event)
94-
95-
verify { notificationProducer.sendMessage(any()) }
96-
verify { outboxPort.deleteById(outboxId) }
97-
}
98-
}
99-
100-
context("메시지 전송에 실패하면") {
101-
val userId = UUID.randomUUID()
102-
val notificationInfo = mockk<NotificationInfo>(relaxed = true)
103-
val event = SingleNotificationEvent(
104-
userId = userId,
105-
notificationInfo = notificationInfo
106-
)
107-
val outboxId = UUID.randomUUID()
108-
val savedOutbox = OutboxData(
109-
id = outboxId,
110-
aggregateType = "notification",
111-
eventType = SingleNotificationMessage.TYPE,
112-
payload = objectMapper.writeValueAsString(
113-
SingleNotificationMessage(userId, notificationInfo)
114-
),
115-
status = OutboxStatus.PENDING,
116-
retryCount = 0
117-
)
118-
119-
every { outboxPort.save(any()) } returns savedOutbox
120-
every { notificationProducer.sendMessage(any()) } throws RuntimeException("Send failed")
121-
122-
it("Outbox를 삭제하지 않고 스케줄러가 재시도하도록 남겨둔다") {
123-
notificationEventHandler.saveOutbox(event)
124-
notificationEventHandler.publishMessage(event)
125-
126-
verify { notificationProducer.sendMessage(any()) }
127-
verify(exactly = 0) { outboxPort.deleteById(any()) }
128-
}
129-
}
130-
}
13162
})

dms-main/main-persistence/src/main/kotlin/team/aliens/dms/persistence/outbox/OutboxPersistenceAdapter.kt

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import team.aliens.dms.common.dto.OutboxStatus
66
import team.aliens.dms.common.spi.OutboxPort
77
import team.aliens.dms.persistence.outbox.mapper.OutboxMapper
88
import team.aliens.dms.persistence.outbox.repository.OutboxJpaRepository
9-
import java.util.UUID
109

1110
@Component
1211
class OutboxPersistenceAdapter(
@@ -20,10 +19,6 @@ class OutboxPersistenceAdapter(
2019
return outboxMapper.toDomain(saved)
2120
}
2221

23-
override fun deleteById(id: UUID) {
24-
outboxJpaRepository.deleteById(id)
25-
}
26-
2722
override fun findByStatus(status: OutboxStatus): List<OutboxData> {
2823
val jpaStatus = outboxMapper.toJpaStatus(status)
2924
return outboxJpaRepository.findByStatus(jpaStatus)

0 commit comments

Comments
 (0)