@@ -4,24 +4,24 @@ package io.github.abaddon.kcqrs.eventstoredb.eventstore
44import io.github.abaddon.kcqrs.core.IAggregate
55import io.github.abaddon.kcqrs.core.IIdentity
66import io.github.abaddon.kcqrs.core.domain.messages.events.IDomainEvent
7- import io.github.abaddon.kcqrs.core.helpers.log
7+ import io.github.abaddon.kcqrs.core.helpers.LoggerFactory. log
88import io.github.abaddon.kcqrs.core.persistence.EventStoreRepository
99import io.github.abaddon.kcqrs.core.projections.IProjection
1010import io.github.abaddon.kcqrs.core.projections.IProjectionHandler
1111import io.github.abaddon.kcqrs.eventstoredb.projection.EventStoreProjectionHandler
1212import io.kurrent.dbclient.*
13- import kotlinx.coroutines.CoroutineDispatcher
1413import kotlinx.coroutines.withContext
1514import java.security.InvalidParameterException
1615import java.util.concurrent.CompletionException
16+ import kotlin.coroutines.CoroutineContext
1717
1818
1919class EventStoreDBRepository <TAggregate : IAggregate >(
2020 eventStoreRepositoryConfig : EventStoreDBRepositoryConfig ,
2121 private val funEmpty : (identity: IIdentity ) -> TAggregate ,
22- dispatcher : CoroutineDispatcher
22+ coroutineContext : CoroutineContext
2323) :
24- EventStoreRepository <TAggregate >(dispatcher ) {
24+ EventStoreRepository <TAggregate >(coroutineContext ) {
2525 private val client: KurrentDBClient =
2626 KurrentDBClient .create(eventStoreRepositoryConfig.eventStoreDBClientSettings())
2727 private val MAX_READ_PAGE_SIZE : Long = eventStoreRepositoryConfig.maxReadPageSize
@@ -38,61 +38,76 @@ class EventStoreDBRepository<TAggregate : IAggregate>(
3838 }
3939 }
4040
41- override suspend fun load (streamName : String , startFrom : Long ): List <IDomainEvent > = withContext(coroutineContext) {
42- val eventsFound = mutableListOf<IDomainEvent >()
43- var currentRevision: Long = startFrom
44- log.debug(" loading events from stream {} with startRevision {}" , streamName, startFrom)
45- try {
46- var hasMoreEvents = true
47- while (hasMoreEvents) {
48- val options = ReadStreamOptions .get()
49- .forwards()
50- .fromRevision(currentRevision)
51- .maxCount(MAX_READ_PAGE_SIZE )
52- val result = client.readStream(streamName, options).get()
53-
54- val events = result.events
41+ override suspend fun load (streamName : String , startFrom : Long ): Result <List <IDomainEvent >> =
42+ withContext(coroutineContext) {
43+ val eventsFound = mutableListOf<IDomainEvent >()
44+ var currentRevision: Long = startFrom
45+ log.debug(" loading events from stream {} with startRevision {}" , streamName, startFrom)
46+ val result = runCatching {
47+ var hasMoreEvents = true
48+ while (hasMoreEvents) {
49+ val options = ReadStreamOptions .get()
50+ .forwards()
51+ .fromRevision(currentRevision)
52+ .maxCount(MAX_READ_PAGE_SIZE )
53+ val result = client.readStream(streamName, options).get()
54+
55+ val events = result.events
56+ log.debug(
57+ " events received: {}, firstStreamPosition: {}, lastStreamPosition {}" ,
58+ events.size,
59+ result.firstStreamPosition,
60+ result.lastStreamPosition
61+ )
62+ val maxRevision = events.maxOfOrNull { event ->
63+ log.debug(" event.originalEvent.revision, {}" , event.originalEvent.revision)
64+ event.originalEvent.revision
65+ }
66+ log.debug(" maxRevision is {}" , maxRevision)
67+ if (events.isEmpty()) {
68+ hasMoreEvents = false
69+ log.debug(" stream is empty" )
70+ } else {
71+ eventsFound.addAll(events.toDomainEvents())
72+ currentRevision + = events.size
73+ if (currentRevision != maxRevision) {
74+ log.warn(
75+ " currentRevision and maxRevision are different! {} and {}" ,
76+ currentRevision,
77+ maxRevision
78+ )
79+ }
80+ }
81+ }
5582 log.debug(
56- " events received: {}, firstStreamPosition: {}, lastStreamPosition {}" ,
57- events.size ,
58- result.firstStreamPosition ,
59- result.lastStreamPosition
83+ " end loading events from stream {} with startRevision {} getting {} events " ,
84+ streamName ,
85+ startFrom ,
86+ eventsFound.size
6087 )
61- val maxRevision = events.maxOfOrNull { event ->
62- log.debug( " event.originalEvent.revision, {} " , event.originalEvent.revision)
63- event.originalEvent.revision
64- }
65- log.debug( " maxRevision is {} " , maxRevision)
66- if (events.isEmpty()) {
67- hasMoreEvents = false
68- log.debug( " stream is empty " )
69- } else {
70- eventsFound.addAll(events.toDomainEvents() )
71- currentRevision + = events.size
72- if (currentRevision != maxRevision) {
73- log.warn(
74- " currentRevision and maxRevision are different! {} and {} " ,
75- currentRevision,
76- maxRevision
77- )
88+ eventsFound
89+ }
90+
91+ when {
92+ result.isFailure -> {
93+ val ex = result.exceptionOrNull() !!
94+ log.debug( " Error reading stream: {} " , streamName, ex)
95+ when (ex.cause) {
96+ is StreamNotFoundException -> {
97+ log.debug( " Stream not found: {} " , streamName )
98+ Result .success(eventsFound)
99+ }
100+
101+ else -> {
102+ log.error( " Error reading stream: {} " , streamName, ex)
103+ result
104+ }
78105 }
79106 }
80- }
81- } catch (ex: CompletionException ) {
82- log.debug(" Error reading stream: {}" , streamName, ex)
83- when (ex.cause) {
84- is StreamNotFoundException -> log.debug(" Stream not found: {}" , streamName)
85- else -> log.error(" Error reading stream: {}" , streamName, ex)
107+
108+ else -> result
86109 }
87110 }
88- log.debug(
89- " end loading events from stream {} with startRevision {} getting {} events" ,
90- streamName,
91- startFrom,
92- eventsFound.size
93- )
94- eventsFound
95- }
96111
97112 override fun aggregateIdStreamName (aggregateId : IIdentity ): String {
98113 check(streamName.isNotEmpty()) { throw InvalidParameterException (" Cannot get streamName empty" ) }
@@ -149,7 +164,7 @@ class EventStoreDBRepository<TAggregate : IAggregate>(
149164
150165 }
151166
152- override suspend fun publish (persistResult : Result < Unit >, events : List <IDomainEvent >): Result <Unit > =
167+ override suspend fun publish (events : List <IDomainEvent >): Result <Unit > =
153168 Result .success(Unit )
154169
155170}
0 commit comments