Skip to content

Commit f00329b

Browse files
committed
refactor: (#962) race condition 문제를 rabbitMQ 단일 처리 지점 생성으로 해결
1 parent 0600d77 commit f00329b

File tree

5 files changed

+8
-68
lines changed

5 files changed

+8
-68
lines changed

dms-main/main-core/src/main/kotlin/team/aliens/dms/common/spi/OutboxPort.kt

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,9 @@ package team.aliens.dms.common.spi
22

33
import team.aliens.dms.common.dto.OutboxData
44
import team.aliens.dms.common.dto.OutboxStatus
5-
import java.util.UUID
65

76
interface OutboxPort {
87
fun save(outbox: OutboxData): OutboxData
9-
fun deleteById(id: UUID)
108
fun findByStatus(status: OutboxStatus): List<OutboxData>
119
fun delete(outbox: OutboxData)
1210
}

dms-main/main-infrastructure/src/main/kotlin/team/aliens/dms/event/handler/DeviceTokenEventHandler.kt

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package team.aliens.dms.event.handler
22

33
import com.fasterxml.jackson.databind.ObjectMapper
4-
import org.slf4j.LoggerFactory
54
import org.springframework.stereotype.Component
65
import org.springframework.transaction.event.TransactionPhase
76
import org.springframework.transaction.event.TransactionalEventListener
@@ -13,23 +12,18 @@ import team.aliens.dms.contract.remote.rabbitmq.SaveDeviceTokenMessage
1312
import team.aliens.dms.event.DeleteDeviceTokenEvent
1413
import team.aliens.dms.event.DeviceTokenEvent
1514
import team.aliens.dms.event.SaveDeviceTokenEvent
16-
import team.aliens.dms.thirdparty.messagebroker.NotificationProducer
17-
import java.util.UUID
1815

1916
@Component
2017
class DeviceTokenEventHandler(
2118
private val outboxPort: OutboxPort,
22-
private val notificationProducer: NotificationProducer,
2319
private val objectMapper: ObjectMapper
2420
) {
25-
private val log = LoggerFactory.getLogger(this::class.java)
26-
private val outboxIdsByEventIdentity = ThreadLocal<MutableMap<Int, UUID>>()
2721

2822
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
2923
fun saveOutbox(event: DeviceTokenEvent) {
3024
val (eventType, message) = createMessage(event)
3125

32-
val saved = outboxPort.save(
26+
outboxPort.save(
3327
OutboxData(
3428
id = null,
3529
aggregateType = "device_token",
@@ -38,29 +32,6 @@ class DeviceTokenEventHandler(
3832
status = OutboxStatus.PENDING
3933
)
4034
)
41-
val map = outboxIdsByEventIdentity.get() ?: mutableMapOf<Int, UUID>().also {
42-
outboxIdsByEventIdentity.set(it)
43-
}
44-
45-
map[System.identityHashCode(event)] = saved.id!!
46-
}
47-
48-
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
49-
fun publishMessage(event: DeviceTokenEvent) {
50-
val (_, message) = createMessage(event)
51-
val map = outboxIdsByEventIdentity.get()
52-
val outboxId = map.remove(System.identityHashCode(event))
53-
54-
try {
55-
notificationProducer.sendMessage(message)
56-
outboxId?.let { outboxPort.deleteById(it) }
57-
} catch (e: Exception) {
58-
log.warn("Failed to send device token message immediately, will be retried by scheduler", e)
59-
} finally {
60-
if (map.isNullOrEmpty()) {
61-
outboxIdsByEventIdentity.remove()
62-
}
63-
}
6435
}
6536

6637
private fun createMessage(event: DeviceTokenEvent): Pair<String, Any> {

dms-main/main-infrastructure/src/main/kotlin/team/aliens/dms/event/handler/NotificationEventHandler.kt

Lines changed: 1 addition & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package team.aliens.dms.event.handler
22

33
import com.fasterxml.jackson.databind.ObjectMapper
4-
import org.slf4j.LoggerFactory
54
import org.springframework.stereotype.Component
65
import org.springframework.transaction.event.TransactionPhase
76
import org.springframework.transaction.event.TransactionalEventListener
@@ -15,23 +14,18 @@ import team.aliens.dms.event.GroupNotificationEvent
1514
import team.aliens.dms.event.NotificationEvent
1615
import team.aliens.dms.event.SingleNotificationEvent
1716
import team.aliens.dms.event.TopicNotificationEvent
18-
import team.aliens.dms.thirdparty.messagebroker.NotificationProducer
19-
import java.util.UUID
2017

2118
@Component
2219
class NotificationEventHandler(
2320
private val outboxPort: OutboxPort,
24-
private val notificationProducer: NotificationProducer,
2521
private val objectMapper: ObjectMapper
2622
) {
27-
private val log = LoggerFactory.getLogger(this::class.java)
28-
private val outboxIdsByEventIdentity = ThreadLocal<MutableMap<Int, UUID>>()
2923

3024
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
3125
fun saveOutbox(event: NotificationEvent) {
3226
val (eventType, message) = createMessage(event)
3327

34-
val saved = outboxPort.save(
28+
outboxPort.save(
3529
OutboxData(
3630
id = null,
3731
aggregateType = "notification",
@@ -40,30 +34,6 @@ class NotificationEventHandler(
4034
status = OutboxStatus.PENDING
4135
)
4236
)
43-
44-
val map = outboxIdsByEventIdentity.get() ?: mutableMapOf<Int, UUID>().also {
45-
outboxIdsByEventIdentity.set(it)
46-
}
47-
48-
map[System.identityHashCode(event)] = saved.id!!
49-
}
50-
51-
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
52-
fun publishMessage(event: NotificationEvent) {
53-
val (_, message) = createMessage(event)
54-
val map = outboxIdsByEventIdentity.get()
55-
val outboxId = map.remove(System.identityHashCode(event))
56-
57-
try {
58-
notificationProducer.sendMessage(message)
59-
outboxId?.let { outboxPort.deleteById(it) }
60-
} catch (e: Exception) {
61-
log.warn("Failed to send notification immediately, will be retried by scheduler", e)
62-
} finally {
63-
if (map.isNullOrEmpty()) {
64-
outboxIdsByEventIdentity.remove()
65-
}
66-
}
6737
}
6838

6939
private fun createMessage(event: NotificationEvent): Pair<String, Any> {

dms-main/main-infrastructure/src/main/resources/application.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ spring:
5353
job-store-type: jdbc
5454
jdbc:
5555
initialize-schema: never
56+
overwrite-existing-jobs: true
57+
auto-startup: true
58+
wait-for-jobs-to-complete-on-shutdown: true
5659
properties:
5760
org:
5861
quartz:
@@ -65,6 +68,9 @@ spring:
6568
tablePrefix: QRTZ_
6669
isClustered: ${QUARTZ_CLUSTERED:false}
6770
useProperties: false
71+
misfireThreshold: 300000
72+
acquireTriggersWithinLock: true
73+
txIsolationLevelSerializable: false
6874
threadPool:
6975
class: org.quartz.simpl.SimpleThreadPool
7076
threadCount: 10

dms-main/main-persistence/src/main/kotlin/team/aliens/dms/persistence/outbox/OutboxPersistenceAdapter.kt

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import team.aliens.dms.common.dto.OutboxStatus
66
import team.aliens.dms.common.spi.OutboxPort
77
import team.aliens.dms.persistence.outbox.mapper.OutboxMapper
88
import team.aliens.dms.persistence.outbox.repository.OutboxJpaRepository
9-
import java.util.UUID
109

1110
@Component
1211
class OutboxPersistenceAdapter(
@@ -20,10 +19,6 @@ class OutboxPersistenceAdapter(
2019
return outboxMapper.toDomain(saved)
2120
}
2221

23-
override fun deleteById(id: UUID) {
24-
outboxJpaRepository.deleteById(id)
25-
}
26-
2722
override fun findByStatus(status: OutboxStatus): List<OutboxData> {
2823
val jpaStatus = outboxMapper.toJpaStatus(status)
2924
return outboxJpaRepository.findByStatus(jpaStatus)

0 commit comments

Comments
 (0)