@@ -12,14 +12,15 @@ import io.github.abaddon.kcqrs.eventstoredb.projection.EventStoreProjectionHandl
1212import io.kurrent.dbclient.AppendToStreamOptions
1313import io.kurrent.dbclient.KurrentDBClient
1414import io.kurrent.dbclient.ReadStreamOptions
15+ import io.kurrent.dbclient.ResolvedEvent
1516import io.kurrent.dbclient.StreamNotFoundException
1617import io.kurrent.dbclient.StreamState
1718import io.kurrent.dbclient.SubscribeToAllOptions
1819import kotlinx.coroutines.flow.Flow
19- import kotlinx.coroutines.flow.flow
20- import kotlinx.coroutines.withContext
20+ import kotlinx.coroutines.flow.asFlow
2121import java.security.InvalidParameterException
2222import java.util.concurrent.CompletionException
23+ import java.util.concurrent.ExecutionException
2324
2425
2526class EventStoreDBRepository <TAggregate : IAggregate >(
@@ -35,96 +36,80 @@ class EventStoreDBRepository<TAggregate : IAggregate>(
3536
3637 override fun emptyAggregate (aggregateId : IIdentity ): TAggregate = funEmpty(aggregateId)
3738
38- override suspend fun <TProjection : IProjection > subscribe (projectionHandler : IProjectionHandler <TProjection >) =
39- withContext(coroutineContext) {
40- when (projectionHandler) {
41- is EventStoreProjectionHandler -> subscribeEventStoreProjectionHandler(projectionHandler)
42- else -> log.warn(" EventStoreProjectionHandler required, subscription failed" )
43- }
39+ override suspend fun <TProjection : IProjection > subscribe (projectionHandler : IProjectionHandler <TProjection >) {
40+ when (projectionHandler) {
41+ is EventStoreProjectionHandler -> subscribeEventStoreProjectionHandler(projectionHandler)
42+ else -> log.warn(" EventStoreProjectionHandler required, subscription failed" )
4443 }
44+ }
4545
46- override suspend fun loadEvents (streamName : String , startFrom : Long ): Result <Flow <IDomainEvent >> =
47- withContext(coroutineContext) {
48- var currentRevision: Long = startFrom
49- var totalEventsLoaded = 0
50- log.debug(" loading events from stream {} with startRevision {}" , streamName, startFrom)
51-
52- val result = runCatching {
53- flow<IDomainEvent > {
54- var hasMoreEvents = true
55- while (hasMoreEvents) {
56- val options = ReadStreamOptions .get()
57- .forwards()
58- .fromRevision(currentRevision)
59- .maxCount(MAX_READ_PAGE_SIZE )
60- val result = client.readStream(streamName, options).get()
61-
62- val events = result.events
63- log.debug(
64- " events received: {}, firstStreamPosition: {}, lastStreamPosition {}" ,
65- events.size,
66- result.firstStreamPosition,
67- result.lastStreamPosition
68- )
69-
70- if (events.isEmpty()) {
71- hasMoreEvents = false
72- log.debug(" stream is empty" )
73- } else {
74- val domainEvents = events.toDomainEvents()
75-
76- // Emit each domain event individually
77- domainEvents.forEach { domainEvent ->
78- totalEventsLoaded + = 1
79- emit(domainEvent)
80- }
81-
82- val maxRevision = events.maxOfOrNull { event ->
83- log.debug(" event.originalEvent.revision, {}" , event.originalEvent.revision)
84- event.originalEvent.revision
85- }
86- log.debug(" maxRevision is {}" , maxRevision)
87-
88- currentRevision + = events.size
89- if (currentRevision != maxRevision) {
90- log.warn(
91- " currentRevision and maxRevision are different! {} and {}" ,
92- currentRevision,
93- maxRevision
94- )
95- }
96- }
97- }
46+ override suspend fun loadEvents (streamName : String , startFrom : Long ): Result <Flow <IDomainEvent >> = runCatching {
47+ var currentRevision: Long = startFrom
48+ var totalEventsLoaded = 0
49+ log.debug(" loading events from stream {} with startRevision {}" , streamName, startFrom)
50+ var hasMoreEvents = true
51+ val domainEventFounds: MutableList <IDomainEvent > = mutableListOf ()
52+ while (hasMoreEvents) {
53+ val options = ReadStreamOptions .get()
54+ .forwards()
55+ .fromRevision(currentRevision)
56+ .maxCount(MAX_READ_PAGE_SIZE )
57+
58+ val resolvedEvent = readEventStore(options, streamName);
59+ val domainEvents = resolvedEvent.toDomainEvents()
60+
61+ if (domainEvents.isEmpty()) {
62+ hasMoreEvents = false
63+ log.debug(" stream is empty" )
64+ } else {
65+ // Emit each domain event individually
66+ totalEventsLoaded + = domainEvents.size
67+
68+ val maxRevision = resolvedEvent.maxOfOrNull { event ->
69+ log.debug(" event.originalEvent.revision, {}" , event.originalEvent.revision)
70+ event.originalEvent.revision
71+ }
72+ log.debug(" maxRevision is {}" , maxRevision)
73+
74+ currentRevision + = resolvedEvent.size
75+ if (currentRevision != maxRevision) {
76+ log.warn(
77+ " currentRevision and maxRevision are different! {} and {}" ,
78+ currentRevision,
79+ maxRevision
80+ )
9881 }
9982 }
83+ domainEventFounds.addAll(domainEvents)
84+ }
85+ domainEventFounds.asFlow()
86+ }
87+
10088
89+ private fun readEventStore (
90+ options : ReadStreamOptions ,
91+ streamName : String
92+ ): List <ResolvedEvent > {
93+ try {
94+ val result = client.readStream(streamName, options).get()
10195 log.debug(
102- " end loading events from stream {} with startRevision {} getting {} events " ,
103- streamName ,
104- startFrom ,
105- totalEventsLoaded
96+ " events received: {}, firstStreamPosition: {}, lastStreamPosition {}" ,
97+ result.events.size ,
98+ result.firstStreamPosition ,
99+ result.lastStreamPosition
106100 )
107-
108- when {
109- result.isFailure -> {
110- val ex = result.exceptionOrNull()!!
111- log.debug(" Error reading stream: {}" , streamName, ex)
112- when (ex.cause) {
113- is StreamNotFoundException -> {
114- log.debug(" Stream not found: {}" , streamName)
115- Result .success(flow<IDomainEvent > {})
116- }
117-
118- else -> {
119- log.error(" Error reading stream: {}" , streamName, ex)
120- result
121- }
122- }
123- }
124-
125- else -> result
126- }
101+ log.debug(" stream contains {} events" , result.events.size)
102+ return result.events
103+ // return result.events.toDomainEvents()
104+
105+ } catch (ex: ExecutionException ) {
106+ if (ex.cause is StreamNotFoundException ) {
107+ log.debug(" Stream not found: {}" , streamName)
108+ return listOf<ResolvedEvent >()
109+ } else
110+ throw ex
127111 }
112+ }
128113
129114 override fun aggregateIdStreamName (aggregateId : IIdentity ): String {
130115 check(streamName.isNotEmpty()) { throw InvalidParameterException (" Cannot get streamName empty" ) }
@@ -136,7 +121,7 @@ class EventStoreDBRepository<TAggregate : IAggregate>(
136121 uncommittedEvents : List <IDomainEvent >,
137122 header : Map <String , String >,
138123 currentVersion : Long
139- ): Result <Unit > = withContext(coroutineContext) {
124+ ): Result <Unit > = runCatching {
140125 log.debug(
141126 " persisting uncommittedEvents {} with currentVersion {} on stream {}" ,
142127 uncommittedEvents,
@@ -159,16 +144,6 @@ class EventStoreDBRepository<TAggregate : IAggregate>(
159144 log.error(" Events not published on stream $streamName " )
160145 Result .failure(ex)
161146 }
162-
163- // writeResultFuture.whenComplete { writeResult, error ->
164- // if (error == null) {
165- // log.info("Events published on stream $streamName, nextExpectedRevision: ${writeResult.nextExpectedRevision}")
166- // Result.success(Unit)
167- // } else {
168- // log.error("Events not published on stream $streamName", error)
169- // Result.failure(error)
170- // }
171- // }.get()
172147 }
173148
174149 private fun <TProjection : IProjection > subscribeEventStoreProjectionHandler (projectionHandler : EventStoreProjectionHandler <TProjection >) {
0 commit comments