Skip to content

Commit e874d76

Browse files
committed
refactoring: logging simplified, sync ACK/NACK, cleanup
1 parent e644071 commit e874d76

File tree

1 file changed

+30
-20
lines changed

1 file changed

+30
-20
lines changed

src/main/kotlin/com/fraktalio/example/fmodelspringdemo/adapter/eventstream/EventStreamProcessor.kt

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -88,16 +88,21 @@ class EventStreamProcessor(
8888
.buffer(buffer)
8989
.collect {
9090
// The event handler and lock/token management are executed in one transaction
91-
transactionalOperator.executeAndAwait { reactiveTransaction ->
92-
try {
93-
materializedView.handle(it.first)
94-
lockRepository.executeAction(view, Ack(it.second, it.first.deciderId()))
95-
logger.debug { "handled event successfully: $it" }
96-
} catch (e: Exception) {
97-
reactiveTransaction.setRollbackOnly()
98-
lockRepository.executeAction(view, ScheduleNack(nackInMilliseconds, it.first.deciderId()))
99-
logger.debug { "handled event unsuccessfully. setting ScheduleNack/RETRY in 10 seconds, for: $it" }
91+
try {
92+
transactionalOperator.executeAndAwait { reactiveTransaction ->
93+
try {
94+
materializedView.handle(it.first)
95+
lockRepository.executeAction(view, Ack(it.second, it.first.deciderId()))
96+
logger.debug { "handled event successfully: $it" }
97+
} catch (e: Exception) {
98+
logger.error { "handled event exceptionally: $e" }
99+
reactiveTransaction.setRollbackOnly()
100+
throw e
101+
}
100102
}
103+
} catch (e: Exception) {
104+
lockRepository.executeAction(view, ScheduleNack(nackInMilliseconds, it.first.deciderId()))
105+
logger.debug { "handled event unsuccessfully. setting ScheduleNack/RETRY in 10 seconds, for: $it" }
101106
}
102107
}
103108
}
@@ -134,17 +139,22 @@ class EventStreamProcessor(
134139
}
135140
.buffer(buffer)
136141
.collect {
137-
// The event handler and lock/token management are executed in one transaction
138-
transactionalOperator.executeAndAwait { reactiveTransaction ->
139-
try {
140-
sagaManager.handle(it.first).collect()
141-
lockRepository.executeAction(view, Ack(it.second, it.first.deciderId()))
142-
logger.debug { "saga: handled event successfully: $it" }
143-
} catch (e: Exception) {
144-
reactiveTransaction.setRollbackOnly()
145-
lockRepository.executeAction(view, ScheduleNack(nackInMilliseconds, it.first.deciderId()))
146-
logger.debug { "saga: handled event unsuccessfully. setting ScheduleNack/RETRY in 10 seconds, for: $it" }
142+
// The saga manager and lock/token management are executed in one transaction
143+
try {
144+
transactionalOperator.executeAndAwait { reactiveTransaction ->
145+
try {
146+
sagaManager.handle(it.first).collect()
147+
lockRepository.executeAction(view, Ack(it.second, it.first.deciderId()))
148+
logger.debug { "saga: handled event successfully: $it" }
149+
} catch (e: Exception) {
150+
logger.error { "saga: handled event exceptionally: $e" }
151+
reactiveTransaction.setRollbackOnly()
152+
throw e
153+
}
147154
}
155+
} catch (e: Exception) {
156+
lockRepository.executeAction(view, ScheduleNack(nackInMilliseconds, it.first.deciderId()))
157+
logger.debug { "saga: handled event unsuccessfully. setting ScheduleNack/RETRY in 10 seconds, for: $it" }
148158
}
149159
}
150160
}
@@ -156,8 +166,8 @@ class EventStreamProcessor(
156166
@EventListener
157167
fun onApplicationEvent(event: ContextRefreshedEvent?) {
158168
logger.info { "spring context refreshed: $event" }
159-
materializedView?.let { registerEventHandlerAndStartPooling("view", it, nackInMilliseconds = 10000) }
160169
sagaManager?.let { registerSagaManagerAndStartPooling("saga", it) }
170+
materializedView?.let { registerEventHandlerAndStartPooling("view", it) }
161171
}
162172
}
163173

0 commit comments

Comments
 (0)