diff --git a/application-arrow/build.gradle.kts b/application-arrow/build.gradle.kts index 6a6a3700..e56fdc35 100644 --- a/application-arrow/build.gradle.kts +++ b/application-arrow/build.gradle.kts @@ -12,6 +12,7 @@ kotlin { compilations.all { kotlinOptions.jvmTarget = "1.8" kotlinOptions.verbose = true + kotlinOptions.freeCompilerArgs = kotlinOptions.freeCompilerArgs + "-Xcontext-receivers" } withJava() diff --git a/application-arrow/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventSourcingAggregateArrowExtension.kt b/application-arrow/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventSourcingAggregateArrowExtension.kt index 14c59e0d..1d9cbb2b 100644 --- a/application-arrow/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventSourcingAggregateArrowExtension.kt +++ b/application-arrow/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventSourcingAggregateArrowExtension.kt @@ -16,9 +16,10 @@ package com.fraktalio.fmodel.application -import arrow.core.continuations.Effect -import arrow.core.continuations.effect +import arrow.core.Either +import arrow.core.raise.either import com.fraktalio.fmodel.application.Error.CommandHandlingFailed +import com.fraktalio.fmodel.application.Error.CommandPublishingFailed import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.flow.* @@ -26,54 +27,54 @@ import kotlinx.coroutines.flow.* * Extension function - Handles the command message of type [C] * * @param command Command message of type [C] - * @return [Flow] of [Effect] (either [Error] or Events of type [E]) + * @return [Flow] of [Either] (either [Error] or Events of type [E]) * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ @FlowPreview -fun EventSourcingAggregate.handleWithEffect(command: C): Flow> = +fun EventSourcingAggregate.handleWithEffect(command: C): Flow> = command .fetchEvents() .computeNewEvents(command) .save() - .map { effect { it } } - .catch { emit(effect { shift(CommandHandlingFailed(command)) }) } + .map { either { it } } + .catch { emit(either { raise(CommandHandlingFailed(command)) }) } /** * Extension function - Handles the command message of type [C] * * @param command Command message of type [C] - * @return [Flow] of [Effect] (either [Error] or Events of type [E]) + * @return [Flow] of [Either] (either [Error] or Events of type [E]) * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ @FlowPreview -fun EventSourcingOrchestratingAggregate.handleWithEffect(command: C): Flow> = +fun EventSourcingOrchestratingAggregate.handleWithEffect(command: C): Flow> = command .fetchEvents() .computeNewEventsByOrchestrating(command) { it.fetchEvents() } .save() - .map { effect { it } } - .catch { emit(effect { shift(CommandHandlingFailed(command)) }) } + .map { either { it } } + .catch { emit(either { raise(CommandHandlingFailed(command)) }) } /** * Extension function - Handles the command message of type [C] * * @param command Command message of type [C] - * @return [Flow] of [Effect] (either [Error] or Events of type [Pair]<[E], [V]>) + * @return [Flow] of [Either] (either [Error] or Events of type [Pair]<[E], [V]>) * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ @FlowPreview -fun EventSourcingLockingAggregate.handleOptimisticallyWithEffect(command: C): Flow>> = +fun EventSourcingLockingAggregate.handleOptimisticallyWithEffect(command: C): Flow>> = flow { val events = command.fetchEvents() emitAll( events.map { it.first } .computeNewEvents(command) .save(events.lastOrNull()) - .map { effect> { it } } - .catch { emit(effect { shift(CommandHandlingFailed(command)) }) } + .map { either> { it } } + .catch { emit(either { raise(CommandHandlingFailed(command)) }) } ) } @@ -81,72 +82,72 @@ fun EventSourcingLockingAggregate.handleOptimisticallyW * Extension function - Handles the command message of type [C] * * @param command Command message of type [C] - * @return [Flow] of [Effect] (either [Error] or Events of type [Pair]<[E], [V]>) + * @return [Flow] of [Either] (either [Error] or Events of type [Pair]<[E], [V]>) * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ @FlowPreview -fun EventSourcingLockingOrchestratingAggregate.handleOptimisticallyWithEffect(command: C): Flow>> = +fun EventSourcingLockingOrchestratingAggregate.handleOptimisticallyWithEffect(command: C): Flow>> = command .fetchEvents().map { it.first } .computeNewEventsByOrchestrating(command) { it.fetchEvents().map { pair -> pair.first } } .save(latestVersionProvider) - .map { effect> { it } } - .catch { emit(effect { shift(CommandHandlingFailed(command)) }) } + .map { either> { it } } + .catch { emit(either { raise(CommandHandlingFailed(command)) }) } @FlowPreview -fun EventSourcingAggregate.handleWithEffect(commands: Flow): Flow> = +fun EventSourcingAggregate.handleWithEffect(commands: Flow): Flow> = commands .flatMapConcat { handleWithEffect(it) } - .catch { emit(effect { shift(CommandHandlingFailed(it)) }) } + .catch { emit(either { raise(CommandPublishingFailed(it)) }) } @FlowPreview -fun EventSourcingOrchestratingAggregate.handleWithEffect(commands: Flow): Flow> = +fun EventSourcingOrchestratingAggregate.handleWithEffect(commands: Flow): Flow> = commands .flatMapConcat { handleWithEffect(it) } - .catch { emit(effect { shift(CommandHandlingFailed(it)) }) } + .catch { emit(either { raise(CommandPublishingFailed(it)) }) } @FlowPreview -fun EventSourcingLockingAggregate.handleOptimisticallyWithEffect(commands: Flow): Flow>> = +fun EventSourcingLockingAggregate.handleOptimisticallyWithEffect(commands: Flow): Flow>> = commands .flatMapConcat { handleOptimisticallyWithEffect(it) } - .catch { emit(effect { shift(CommandHandlingFailed(it)) }) } + .catch { emit(either { raise(CommandPublishingFailed(it)) }) } @FlowPreview -fun EventSourcingLockingOrchestratingAggregate.handleOptimisticallyWithEffect(commands: Flow): Flow>> = +fun EventSourcingLockingOrchestratingAggregate.handleOptimisticallyWithEffect(commands: Flow): Flow>> = commands .flatMapConcat { handleOptimisticallyWithEffect(it) } - .catch { emit(effect { shift(CommandHandlingFailed(it)) }) } + .catch { emit(either { raise(CommandPublishingFailed(it)) }) } @FlowPreview -fun C.publishWithEffect(aggregate: EventSourcingAggregate): Flow> = +fun C.publishWithEffect(aggregate: EventSourcingAggregate): Flow> = aggregate.handleWithEffect(this) @FlowPreview -fun C.publishWithEffect(aggregate: EventSourcingOrchestratingAggregate): Flow> = +fun C.publishWithEffect(aggregate: EventSourcingOrchestratingAggregate): Flow> = aggregate.handleWithEffect(this) @FlowPreview -fun C.publishOptimisticallyWithEffect(aggregate: EventSourcingLockingAggregate): Flow>> = +fun C.publishOptimisticallyWithEffect(aggregate: EventSourcingLockingAggregate): Flow>> = aggregate.handleOptimisticallyWithEffect(this) @FlowPreview -fun C.publishOptimisticallyWithEffect(aggregate: EventSourcingLockingOrchestratingAggregate): Flow>> = +fun C.publishOptimisticallyWithEffect(aggregate: EventSourcingLockingOrchestratingAggregate): Flow>> = aggregate.handleOptimisticallyWithEffect(this) @FlowPreview -fun Flow.publishWithEffect(aggregate: EventSourcingAggregate): Flow> = +fun Flow.publishWithEffect(aggregate: EventSourcingAggregate): Flow> = aggregate.handleWithEffect(this) @FlowPreview -fun Flow.publishWithEffect(aggregate: EventSourcingOrchestratingAggregate): Flow> = +fun Flow.publishWithEffect(aggregate: EventSourcingOrchestratingAggregate): Flow> = aggregate.handleWithEffect(this) @FlowPreview -fun Flow.publishOptimisticallyWithEffect(aggregate: EventSourcingLockingAggregate): Flow>> = +fun Flow.publishOptimisticallyWithEffect(aggregate: EventSourcingLockingAggregate): Flow>> = aggregate.handleOptimisticallyWithEffect(this) @FlowPreview -fun Flow.publishOptimisticallyWithEffect(aggregate: EventSourcingLockingOrchestratingAggregate): Flow>> = +fun Flow.publishOptimisticallyWithEffect(aggregate: EventSourcingLockingOrchestratingAggregate): Flow>> = aggregate.handleOptimisticallyWithEffect(this) diff --git a/application-arrow/src/commonMain/kotlin/com/fraktalio/fmodel/application/MaterializedViewArrowExtension.kt b/application-arrow/src/commonMain/kotlin/com/fraktalio/fmodel/application/MaterializedViewArrowExtension.kt index b829e47f..b84c9cbb 100644 --- a/application-arrow/src/commonMain/kotlin/com/fraktalio/fmodel/application/MaterializedViewArrowExtension.kt +++ b/application-arrow/src/commonMain/kotlin/com/fraktalio/fmodel/application/MaterializedViewArrowExtension.kt @@ -16,10 +16,11 @@ package com.fraktalio.fmodel.application -import arrow.core.continuations.Effect -import arrow.core.continuations.effect -import arrow.core.nonFatalOrThrow -import com.fraktalio.fmodel.application.Error.* +import arrow.core.Either +import arrow.core.raise.catch +import arrow.core.raise.either +import com.fraktalio.fmodel.application.Error.EventHandlingFailed +import com.fraktalio.fmodel.application.Error.EventPublishingFailed import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.map @@ -28,89 +29,36 @@ import kotlinx.coroutines.flow.map * Extension function - Handles the event of type [E] * * @param event Event of type [E] to be handled - * @return [Effect] (either [Error] or State of type [S]) + * @return [Either] (either [Error] or State of type [S]) * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ -suspend fun I.handleWithEffect(event: E): Effect where I : ViewStateComputation, I : ViewStateRepository { - - fun S?.computeNewStateWithEffect(event: E): Effect = - effect { - try { - computeNewState(event) - } catch (t: Throwable) { - shift(CalculatingNewViewStateFailed(this@computeNewStateWithEffect, event, t.nonFatalOrThrow())) - } - } - - suspend fun E.fetchStateWithEffect(): Effect = - effect { - try { - fetchState() - } catch (t: Throwable) { - shift(FetchingViewStateFailed(this@fetchStateWithEffect, t.nonFatalOrThrow())) - } - } - - suspend fun S.saveWithEffect(): Effect = - effect { - try { - save() - } catch (t: Throwable) { - shift(StoringStateFailed(this@saveWithEffect, t.nonFatalOrThrow())) - } +suspend fun I.handleWithEffect(event: E): Either where I : ViewStateComputation, I : ViewStateRepository = + either { + catch({ + event.fetchState().computeNewState(event).save() + }) { + raise(EventHandlingFailed(event, it)) } - - return effect { - event.fetchStateWithEffect().bind() - .computeNewStateWithEffect(event).bind() - .saveWithEffect().bind() } -} /** * Extension function - Handles the event of type [E] * * @param event Event of type [E] to be handled - * @return [Effect] (either [Error] or State of type [Pair]<[S], [V]>) + * @return [Either] (either [Error] or State of type [Pair]<[S], [V]>) * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ -suspend fun I.handleOptimisticallyWithEffect(event: E): Effect> where I : ViewStateComputation, I : ViewStateLockingRepository { - fun S?.computeNewStateWithEffect(event: E): Effect = - effect { - try { - computeNewState(event) - } catch (t: Throwable) { - shift(CalculatingNewViewStateFailed(this@computeNewStateWithEffect, event, t.nonFatalOrThrow())) - } - } - - suspend fun E.fetchStateWithEffect(): Effect> = - effect { - try { - fetchState() - } catch (t: Throwable) { - shift(FetchingViewStateFailed(this@fetchStateWithEffect, t.nonFatalOrThrow())) - } - } - - suspend fun S.saveWithEffect(currentVersion: V?): Effect> = - effect { - try { - save(currentVersion) - } catch (t: Throwable) { - shift(StoringStateFailed(this@saveWithEffect, t.nonFatalOrThrow())) - } +suspend fun I.handleOptimisticallyWithEffect(event: E): Either> where I : ViewStateComputation, I : ViewStateLockingRepository = + either { + catch({ + val (state, version) = event.fetchState() + state.computeNewState(event).save(version) + }) { + raise(EventHandlingFailed(event, it)) } - - return effect { - val (state, version) = event.fetchStateWithEffect().bind() - state - .computeNewStateWithEffect(event).bind() - .saveWithEffect(version).bind() } -} /** * Extension function - Handles the event of type [E] @@ -119,44 +67,19 @@ suspend fun I.handleOptimisticallyWithEffect(event: E): Effect to be handled - * @return [Effect] (either [Error] or State of type [Pair]<[S], [SV]>) + * @return [Either] (either [Error] or State of type [Pair]<[S], [SV]>) * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ -suspend fun I.handleOptimisticallyWithDeduplicationWithEffect(eventAndVersion: Pair): Effect> where I : ViewStateComputation, I : ViewStateLockingDeduplicationRepository { - fun S?.computeNewStateWithEffect(event: E): Effect = - effect { - try { - computeNewState(event) - } catch (t: Throwable) { - shift(CalculatingNewViewStateFailed(this@computeNewStateWithEffect, event, t.nonFatalOrThrow())) - } - } - - suspend fun E.fetchStateWithEffect(): Effect> = - effect { - try { - fetchState() - } catch (t: Throwable) { - shift(FetchingViewStateFailed(this@fetchStateWithEffect, t.nonFatalOrThrow())) - } - } - - suspend fun S.saveWithEffect(entityVersion: EV, currentStateVersion: SV?): Effect> = - effect { - try { - save(entityVersion, currentStateVersion) - } catch (t: Throwable) { - shift(StoringStateFailed(this@saveWithEffect, t.nonFatalOrThrow())) - } - } - - return effect { +suspend fun I.handleOptimisticallyWithDeduplicationWithEffect(eventAndVersion: Pair): Either> where I : ViewStateComputation, I : ViewStateLockingDeduplicationRepository { + return either { val (event, eventVersion) = eventAndVersion - val (state, currentStateVersion) = event.fetchStateWithEffect().bind() - state - .computeNewStateWithEffect(event).bind() - .saveWithEffect(eventVersion, currentStateVersion).bind() + catch({ + val (state, currentStateVersion) = event.fetchState() + state.computeNewState(event).save(eventVersion, currentStateVersion) + }) { + raise(EventHandlingFailed(event, it)) + } } } @@ -164,104 +87,104 @@ suspend fun I.handleOptimisticallyWithDeduplicationWithEffect( * Extension function - Handles the flow of events of type [E] * * @param events Flow of Events of type [E] to be handled - * @return [Flow] of [Effect] (either [Error] or State of type [S]) + * @return [Flow] of [Either] (either [Error] or State of type [S]) * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ -fun I.handleWithEffect(events: Flow): Flow> where I : ViewStateComputation, I : ViewStateRepository = +fun I.handleWithEffect(events: Flow): Flow> where I : ViewStateComputation, I : ViewStateRepository = events .map { handleWithEffect(it) } - .catch { emit(effect { shift(EventPublishingFailed(it)) }) } + .catch { emit(either { raise(EventPublishingFailed(it)) }) } /** * Extension function - Handles the flow of events of type [E] * * @param events Flow of Events of type [E] to be handled - * @return [Flow] of [Effect] (either [Error] or State of type [Pair]<[S], [V]>) + * @return [Flow] of [Either] (either [Error] or State of type [Pair]<[S], [V]>) * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ -fun I.handleOptimisticallyWithEffect(events: Flow): Flow>> where I : ViewStateComputation, I : ViewStateLockingRepository = +fun I.handleOptimisticallyWithEffect(events: Flow): Flow>> where I : ViewStateComputation, I : ViewStateLockingRepository = events .map { handleOptimisticallyWithEffect(it) } - .catch { emit(effect { shift(EventPublishingFailed(it)) }) } + .catch { emit(either { raise(EventPublishingFailed(it)) }) } /** * Extension function - Handles the flow of events of type [Pair]<[E], [EV]> * * @param eventsAndVersions Flow of Events of type [E] to be handled - * @return [Flow] of [Effect] (either [Error] or State of type [Pair]<[S], [SV]>) + * @return [Flow] of [Either] (either [Error] or State of type [Pair]<[S], [SV]>) * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ -fun I.handleOptimisticallyWithDeduplicationWithEffect(eventsAndVersions: Flow>): Flow>> where I : ViewStateComputation, I : ViewStateLockingDeduplicationRepository = +fun I.handleOptimisticallyWithDeduplicationWithEffect(eventsAndVersions: Flow>): Flow>> where I : ViewStateComputation, I : ViewStateLockingDeduplicationRepository = eventsAndVersions .map { handleOptimisticallyWithDeduplicationWithEffect(it) } - .catch { emit(effect { shift(EventPublishingFailed(it)) }) } + .catch { emit(either { raise(EventPublishingFailed(it)) }) } /** * Extension function - Publishes the event of type [E] to the materialized view * @receiver event of type [E] * @param materializedView of type [ViewStateComputation]<[S], [E]>, [ViewStateRepository]<[E], [S]> - * @return [Effect] (either [Error] or the successfully stored State of type [S]) + * @return [Either] (either [Error] or the successfully stored State of type [S]) * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ -suspend fun E.publishWithEffect(materializedView: M): Effect where M : ViewStateComputation, M : ViewStateRepository = +suspend fun E.publishWithEffect(materializedView: M): Either where M : ViewStateComputation, M : ViewStateRepository = materializedView.handleWithEffect(this) /** * Extension function - Publishes the event of type [E] to the materialized view * @receiver event of type [E] * @param materializedView of type [ViewStateComputation]<[S], [E]>, [ViewStateLockingRepository]<[E], [S], [V]> - * @return [Effect] (either [Error] or the successfully stored State of type [Pair]<[S], [V]>) + * @return [Either] (either [Error] or the successfully stored State of type [Pair]<[S], [V]>) * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ -suspend fun E.publishOptimisticallyWithEffect(materializedView: M): Effect> where M : ViewStateComputation, M : ViewStateLockingRepository = +suspend fun E.publishOptimisticallyWithEffect(materializedView: M): Either> where M : ViewStateComputation, M : ViewStateLockingRepository = materializedView.handleOptimisticallyWithEffect(this) /** * Extension function - Publishes the event of type [E] to the materialized view * @receiver event of type [E] * @param materializedView of type [ViewStateComputation]<[S], [E]>, [ViewStateLockingDeduplicationRepository]<[E], [S], [EV], [SV]> - * @return [Effect] (either [Error] or the successfully stored State of type [Pair]<[S], [SV]>) + * @return [Either] (either [Error] or the successfully stored State of type [Pair]<[S], [SV]>) * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ -suspend fun Pair.publishOptimisticallyWithDeduplicationWithEffect(materializedView: M): Effect> where M : ViewStateComputation, M : ViewStateLockingDeduplicationRepository = +suspend fun Pair.publishOptimisticallyWithDeduplicationWithEffect(materializedView: M): Either> where M : ViewStateComputation, M : ViewStateLockingDeduplicationRepository = materializedView.handleOptimisticallyWithDeduplicationWithEffect(this) /** * Extension function - Publishes the event of type [E] to the materialized view * @receiver [Flow] of events of type [E] * @param materializedView of type [ViewStateComputation]<[S], [E]>, [ViewStateRepository]<[E], [S]> - * @return [Flow] of [Effect] (either [Error] or the successfully stored State of type [S]) + * @return [Flow] of [Either] (either [Error] or the successfully stored State of type [S]) * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ -fun Flow.publishWithEffect(materializedView: M): Flow> where M : ViewStateComputation, M : ViewStateRepository = +fun Flow.publishWithEffect(materializedView: M): Flow> where M : ViewStateComputation, M : ViewStateRepository = materializedView.handleWithEffect(this) /** * Extension function - Publishes the event of type [E] to the materialized view * @receiver [Flow] of events of type [E] * @param materializedView of type [ViewStateComputation]<[S], [E]>, [ViewStateLockingRepository]<[E], [S], [V]> - * @return [Flow] of [Effect] (either [Error] or the successfully stored State of type [Pair]<[S], [V]>) + * @return [Flow] of [Either] (either [Error] or the successfully stored State of type [Pair]<[S], [V]>) * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ -fun Flow.publishOptimisticallyWithEffect(materializedView: M): Flow>> where M : ViewStateComputation, M : ViewStateLockingRepository = +fun Flow.publishOptimisticallyWithEffect(materializedView: M): Flow>> where M : ViewStateComputation, M : ViewStateLockingRepository = materializedView.handleOptimisticallyWithEffect(this) /** * Extension function - Publishes the event of type [E] to the materialized view * @receiver [Flow] of events of type [E] * @param materializedView of type [ViewStateComputation]<[S], [E]>, [ViewStateLockingDeduplicationRepository]<[E], [S], [EV], [SV]> - * @return [Flow] of [Effect] (either [Error] or the successfully stored State of type [Pair]<[S], [SV]>) + * @return [Flow] of [Either] (either [Error] or the successfully stored State of type [Pair]<[S], [SV]>) * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ -fun Flow>.publishOptimisticallyWithDeduplicationWithEffect(materializedView: M): Flow>> where M : ViewStateComputation, M : ViewStateLockingDeduplicationRepository = +fun Flow>.publishOptimisticallyWithDeduplicationWithEffect(materializedView: M): Flow>> where M : ViewStateComputation, M : ViewStateLockingDeduplicationRepository = materializedView.handleOptimisticallyWithDeduplicationWithEffect(this) diff --git a/application-arrow/src/commonMain/kotlin/com/fraktalio/fmodel/application/SagaManagerArrowExtension.kt b/application-arrow/src/commonMain/kotlin/com/fraktalio/fmodel/application/SagaManagerArrowExtension.kt index 33243480..caacae3b 100644 --- a/application-arrow/src/commonMain/kotlin/com/fraktalio/fmodel/application/SagaManagerArrowExtension.kt +++ b/application-arrow/src/commonMain/kotlin/com/fraktalio/fmodel/application/SagaManagerArrowExtension.kt @@ -16,8 +16,8 @@ package com.fraktalio.fmodel.application -import arrow.core.continuations.Effect -import arrow.core.continuations.effect +import arrow.core.Either +import arrow.core.raise.either import com.fraktalio.fmodel.application.Error.ActionResultHandlingFailed import com.fraktalio.fmodel.application.Error.ActionResultPublishingFailed import kotlinx.coroutines.FlowPreview @@ -30,52 +30,52 @@ import kotlinx.coroutines.flow.map * Extension function - Handles the action result of type [AR]. * * @param actionResult Action Result represent the outcome of some action you want to handle in some way - * @return [Flow] of [Effect] (either [Error] or Actions of type [A]) + * @return [Flow] of [Either] (either [Error] or Actions of type [A]) * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ -fun SagaManager.handleWithEffect(actionResult: AR): Flow> = +fun SagaManager.handleWithEffect(actionResult: AR): Flow> = actionResult .computeNewActions() .publish() - .map { effect { it } } - .catch { emit(effect { shift(ActionResultHandlingFailed(actionResult)) }) } + .map { either { it } } + .catch { emit(either { raise(ActionResultHandlingFailed(actionResult, it)) }) } /** * Extension function - Handles the [Flow] of action results of type [AR]. * * @param actionResults Action Results represent the outcome of some action you want to handle in some way - * @return [Flow] of [Effect] (either [Error] or Actions of type [A]) + * @return [Flow] of [Either] (either [Error] or Actions of type [A]) * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ @FlowPreview -fun SagaManager.handleWithEffect(actionResults: Flow): Flow> = +fun SagaManager.handleWithEffect(actionResults: Flow): Flow> = actionResults .flatMapConcat { handleWithEffect(it) } - .catch { emit(effect { shift(ActionResultPublishingFailed(it)) }) } + .catch { emit(either { raise(ActionResultPublishingFailed(it)) }) } /** * Extension function - Publishes the action result of type [AR] to the saga manager of type [SagaManager]<[AR], [A]> * @receiver action result of type [AR] * @param sagaManager of type [SagaManager]<[AR], [A]> - * @return the [Flow] of [Effect] (either [Error] or successfully published Actions of type [A]) + * @return the [Flow] of [Either] (either [Error] or successfully published Actions of type [A]) * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ -fun AR.publishWithEffect(sagaManager: SagaManager): Flow> = +fun AR.publishWithEffect(sagaManager: SagaManager): Flow> = sagaManager.handleWithEffect(this) /** * Extension function - Publishes the action result of type [AR] to the saga manager of type [SagaManager]<[AR], [A]> * @receiver [Flow] of action results of type [AR] * @param sagaManager of type [SagaManager]<[AR], [A]> - * @return the [Flow] of [Effect] (either [Error] or successfully published Actions of type [A]) + * @return the [Flow] of [Either] (either [Error] or successfully published Actions of type [A]) * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ @FlowPreview -fun Flow.publishWithEffect(sagaManager: SagaManager): Flow> = +fun Flow.publishWithEffect(sagaManager: SagaManager): Flow> = sagaManager.handleWithEffect(this) diff --git a/application-arrow/src/commonMain/kotlin/com/fraktalio/fmodel/application/StateStoredAggregateArrowExtension.kt b/application-arrow/src/commonMain/kotlin/com/fraktalio/fmodel/application/StateStoredAggregateArrowExtension.kt index efe6527b..c867756e 100644 --- a/application-arrow/src/commonMain/kotlin/com/fraktalio/fmodel/application/StateStoredAggregateArrowExtension.kt +++ b/application-arrow/src/commonMain/kotlin/com/fraktalio/fmodel/application/StateStoredAggregateArrowExtension.kt @@ -16,10 +16,11 @@ package com.fraktalio.fmodel.application -import arrow.core.continuations.Effect -import arrow.core.continuations.effect -import arrow.core.nonFatalOrThrow -import com.fraktalio.fmodel.application.Error.* +import arrow.core.Either +import arrow.core.raise.catch +import arrow.core.raise.either +import com.fraktalio.fmodel.application.Error.CommandHandlingFailed +import com.fraktalio.fmodel.application.Error.CommandPublishingFailed import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.catch @@ -29,171 +30,84 @@ import kotlinx.coroutines.flow.map * Extension function - Handles the command message of type [C] * * @param command Command message of type [C] - * @return [Effect] (either [Error] or State of type [S]) + * @return [Either] (either [Error] or State of type [S]) * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ @FlowPreview -suspend fun I.handleWithEffect(command: C): Effect where I : StateComputation, - I : StateRepository { - /** - * Inner function - Computes new State based on the previous State and the [command] or fails. - * - * @param command of type [C] - * @return [Effect] (either the newly computed state of type [S] or [Error]) - */ - suspend fun S?.computeNewStateWithEffect(command: C): Effect = - effect { - try { - computeNewState(command) - } catch (t: Throwable) { - shift(CalculatingNewStateFailed(this@computeNewStateWithEffect, command, t.nonFatalOrThrow())) - } - } - - /** - * Inner function - Fetch state - either version - * - * @receiver Command of type [C] - * @return [Effect] (either [Error] or the State of type [S]?) - */ - suspend fun C.fetchStateWithEffect(): Effect = - effect { - try { - fetchState() - } catch (t: Throwable) { - shift(FetchingStateFailed(this@fetchStateWithEffect, t.nonFatalOrThrow())) - } - } +suspend fun I.handleWithEffect(command: C): Either where I : StateComputation, + I : StateRepository = + either { + catch({ + command.fetchState().computeNewState(command).save() + }) { + raise(CommandHandlingFailed(this@handleWithEffect, it)) - /** - * Inner function - Save state - either version - * - * @receiver State of type [S] - * @return [Effect] (either [Error] or the newly saved State of type [S]) - */ - suspend fun S.saveWithEffect(): Effect = - effect { - try { - save() - } catch (t: Throwable) { - shift(StoringStateFailed(this@saveWithEffect, t.nonFatalOrThrow())) - } } - - return effect { - command - .fetchStateWithEffect().bind() - .computeNewStateWithEffect(command).bind() - .saveWithEffect().bind() } -} /** * Extension function - Handles the command message of type [C] to the locking state stored aggregate, optimistically * * @param command Command message of type [C] - * @return [Effect] (either [Error] or State of type [Pair]<[S], [V]>), in which [V] represents Version + * @return [Either] (either [Error] or State of type [Pair]<[S], [V]>), in which [V] represents Version * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ @FlowPreview -suspend fun I.handleOptimisticallyWithEffect(command: C): Effect> where I : StateComputation, - I : StateLockingRepository { - /** - * Inner function - Computes new State based on the previous State and the [command] or fails. - * - * @param command of type [C] - * @return [Effect] (either the newly computed state of type [S] or [Error]) - */ - suspend fun S?.computeNewStateWithEffect(command: C): Effect = - effect { - try { - computeNewState(command) - } catch (t: Throwable) { - shift(CalculatingNewStateFailed(this@computeNewStateWithEffect, command, t.nonFatalOrThrow())) - } - } - - /** - * Inner function - Fetch state - either version - * - * @receiver Command of type [C] - * @return [Effect] (either [Error] or the State of type [S]?) - */ - suspend fun C.fetchStateWithEffect(): Effect> = - effect { - try { - fetchState() - } catch (t: Throwable) { - shift(FetchingStateFailed(this@fetchStateWithEffect, t.nonFatalOrThrow())) - } +suspend fun I.handleOptimisticallyWithEffect(command: C): Either> where I : StateComputation, + I : StateLockingRepository = + either { + catch({ + val (state, version) = command.fetchState() + state + .computeNewState(command) + .save(version) + }) { + raise(CommandHandlingFailed(this@handleOptimisticallyWithEffect, it)) } - - /** - * Inner function - Save state - either version - * - * @receiver State of type [S] - * @return [Effect] (either [Error] or the newly saved State of type [S]) - */ - suspend fun S.saveWithEffect(currentStateVersion: V?): Effect> = - effect { - try { - save(currentStateVersion) - } catch (t: Throwable) { - shift(StoringStateFailed(this@saveWithEffect, t.nonFatalOrThrow())) - } - } - - return effect { - val (state, version) = command.fetchStateWithEffect().bind() - state - .computeNewStateWithEffect(command).bind() - .saveWithEffect(version).bind() } -} - /** * Extension function - Handles the [Flow] of command messages of type [C] * * @param commands [Flow] of Command messages of type [C] - * @return [Flow] of [Effect] (either [Error] or State of type [S]) + * @return [Flow] of [Either] (either [Error] or State of type [S]) * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ @FlowPreview -fun I.handleWithEffect(commands: Flow): Flow> where I : StateComputation, +fun I.handleWithEffect(commands: Flow): Flow> where I : StateComputation, I : StateRepository = commands .map { handleWithEffect(it) } - .catch { emit(effect { shift(CommandPublishingFailed(it)) }) } + .catch { emit(either { raise(CommandPublishingFailed(it)) }) } /** * Extension function - Handles the [Flow] of command messages of type [C] to the locking state stored aggregate, optimistically * * @param commands [Flow] of Command messages of type [C] - * @return [Flow] of [Effect] (either [Error] or State of type [Pair]<[S], [V]>) + * @return [Flow] of [Either] (either [Error] or State of type [Pair]<[S], [V]>) * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ @FlowPreview -fun I.handleOptimisticallyWithEffect(commands: Flow): Flow>> where I : StateComputation, +fun I.handleOptimisticallyWithEffect(commands: Flow): Flow>> where I : StateComputation, I : StateLockingRepository = commands .map { handleOptimisticallyWithEffect(it) } - .catch { emit(effect { shift(CommandPublishingFailed(it)) }) } + .catch { emit(either { raise(CommandPublishingFailed(it)) }) } /** * Extension function - Publishes the command of type [C] to the state stored aggregate * @receiver command of type [C] * @param aggregate of type [StateComputation]<[C], [S], [E]>, [StateRepository]<[C], [S]> - * @return [Effect] (either [Error] or successfully stored State of type [S]) + * @return [Either] (either [Error] or successfully stored State of type [S]) * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ @FlowPreview -suspend fun C.publishWithEffect(aggregate: A): Effect where A : StateComputation, +suspend fun C.publishWithEffect(aggregate: A): Either where A : StateComputation, A : StateRepository = aggregate.handleWithEffect(this) @@ -201,12 +115,12 @@ suspend fun C.publishWithEffect(aggregate: A): Effect whe * Extension function - Publishes the command of type [C] to the locking state stored aggregate, optimistically * @receiver command of type [C] * @param aggregate of type [StateComputation]<[C], [S], [E]>, [StateLockingRepository]<[C], [S], [V]> - * @return [Effect] (either [Error] or successfully stored State of type [Pair]<[S], [V]>) + * @return [Either] (either [Error] or successfully stored State of type [Pair]<[S], [V]>) * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ @FlowPreview -suspend fun C.publishOptimisticallyWithEffect(aggregate: A): Effect> where A : StateComputation, +suspend fun C.publishOptimisticallyWithEffect(aggregate: A): Either> where A : StateComputation, A : StateLockingRepository = aggregate.handleOptimisticallyWithEffect(this) @@ -214,12 +128,12 @@ suspend fun C.publishOptimisticallyWithEffect(aggregate: A): Eff * Extension function - Publishes the command of type [C] to the state stored aggregate * @receiver [Flow] of commands of type [C] * @param aggregate of type [StateComputation]<[C], [S], [E]>, [StateRepository]<[C], [S]> - * @return the [Flow] of [Effect] (either [Error] or successfully stored State of type [S]) + * @return the [Flow] of [Either] (either [Error] or successfully stored State of type [S]) * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ @FlowPreview -fun Flow.publishWithEffect(aggregate: A): Flow> where A : StateComputation, +fun Flow.publishWithEffect(aggregate: A): Flow> where A : StateComputation, A : StateRepository = aggregate.handleWithEffect(this) @@ -227,13 +141,11 @@ fun Flow.publishWithEffect(aggregate: A): Flow> * Extension function - Publishes the command of type [C] to the locking state stored aggregate, optimistically * @receiver [Flow] of commands of type [C] * @param aggregate of type [StateComputation]<[C], [S], [E]>, [StateLockingRepository]<[C], [S], [V]> - * @return the [Flow] of [Effect] (either [Error] or successfully stored State of type [Pair]<[S], [V]>) + * @return the [Flow] of [Either] (either [Error] or successfully stored State of type [Pair]<[S], [V]>) * * @author Иван Дугалић / Ivan Dugalic / @idugalic */ @FlowPreview -fun Flow.publishOptimisticallyWithEffect(aggregate: A): Flow>> where A : StateComputation, +fun Flow.publishOptimisticallyWithEffect(aggregate: A): Flow>> where A : StateComputation, A : StateLockingRepository = aggregate.handleOptimisticallyWithEffect(this) - -private fun S.pairWith(version: V): Pair = Pair(this, version) \ No newline at end of file diff --git a/application-arrow/src/commonTest/kotlin/com/fraktalio/fmodel/application/EventSourcedAggregateTest.kt b/application-arrow/src/commonTest/kotlin/com/fraktalio/fmodel/application/EventSourcedAggregateTest.kt index 17cc2e0e..45d9c98f 100644 --- a/application-arrow/src/commonTest/kotlin/com/fraktalio/fmodel/application/EventSourcedAggregateTest.kt +++ b/application-arrow/src/commonTest/kotlin/com/fraktalio/fmodel/application/EventSourcedAggregateTest.kt @@ -3,7 +3,6 @@ package com.fraktalio.fmodel.application import arrow.core.Either import arrow.core.Either.Left import arrow.core.Either.Right -import arrow.core.continuations.Effect import com.fraktalio.fmodel.application.examples.numbers.NumberRepository import com.fraktalio.fmodel.application.examples.numbers.even.command.EvenNumberLockingRepository import com.fraktalio.fmodel.application.examples.numbers.even.command.EvenNumberRepository @@ -22,7 +21,6 @@ import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.collections.shouldContainExactly import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.toList /** @@ -32,7 +30,7 @@ import kotlinx.coroutines.flow.toList private fun IDecider.given( repository: EventRepository, command: () -> C -): Flow> = +): Flow> = eventSourcingAggregate( decider = this, eventRepository = repository @@ -42,7 +40,7 @@ private fun IDecider.given( private fun IDecider.given( repository: EventLockingRepository, command: () -> C -): Flow>> = +): Flow>> = eventSourcingLockingAggregate( decider = this, eventRepository = repository @@ -57,11 +55,11 @@ private fun IDecider.whenCommand(command: C): C = command /** * DSL - Then */ -private suspend infix fun Flow>.thenEvents(expected: Iterable>) = - map { it.toEither() }.toList() shouldContainExactly (expected) +private suspend infix fun Flow>.thenEvents(expected: Iterable>) = + toList() shouldContainExactly (expected) -private suspend infix fun Flow>>.thenEventPairs(expected: Iterable>>) = - map { it.toEither() }.toList() shouldContainExactly (expected) +private suspend infix fun Flow>>.thenEventPairs(expected: Iterable>>) = + toList() shouldContainExactly (expected) /** * Event sourced aggregate test diff --git a/application-arrow/src/commonTest/kotlin/com/fraktalio/fmodel/application/MaterializedViewTest.kt b/application-arrow/src/commonTest/kotlin/com/fraktalio/fmodel/application/MaterializedViewTest.kt index 6b90b6d5..022a38df 100644 --- a/application-arrow/src/commonTest/kotlin/com/fraktalio/fmodel/application/MaterializedViewTest.kt +++ b/application-arrow/src/commonTest/kotlin/com/fraktalio/fmodel/application/MaterializedViewTest.kt @@ -1,7 +1,6 @@ package com.fraktalio.fmodel.application import arrow.core.Either -import arrow.core.continuations.Effect import com.fraktalio.fmodel.application.examples.numbers.NumberViewRepository import com.fraktalio.fmodel.application.examples.numbers.even.query.EvenNumberLockingViewRepository import com.fraktalio.fmodel.application.examples.numbers.even.query.EvenNumberViewRepository @@ -20,12 +19,11 @@ import com.fraktalio.fmodel.domain.examples.numbers.even.query.evenNumberView import com.fraktalio.fmodel.domain.examples.numbers.odd.query.oddNumberView import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.shouldBe -import io.kotest.matchers.types.shouldBeInstanceOf /** * DSL - Given */ -private suspend fun IView.given(repository: ViewStateRepository, event: () -> E): Effect = +private suspend fun IView.given(repository: ViewStateRepository, event: () -> E): Either = materializedView( view = this, viewStateRepository = repository @@ -34,12 +32,13 @@ private suspend fun IView.given(repository: ViewStateRepository IView.given( repository: ViewStateLockingRepository, event: () -> E -): Effect> = +): Either> = materializedLockingView( view = this, viewStateRepository = repository ).handleOptimisticallyWithEffect(event()) + /** * DSL - When */ @@ -49,16 +48,16 @@ private fun IView.whenEvent(event: E): E = event /** * DSL - Then */ -private suspend infix fun Effect.thenState(expected: S) { - val state = when (val result = this.toEither()) { +private infix fun Either.thenState(expected: S) { + val state = when (val result = this) { is Either.Right -> result.value is Either.Left -> throw AssertionError("Expected Either.Right, but found Either.Left with value ${result.value}") } return state shouldBe expected } -private suspend infix fun Effect>.thenStateAndVersion(expected: Pair) { - val state = when (val result = this.toEither()) { +private infix fun Either>.thenStateAndVersion(expected: Pair) { + val state = when (val result = this) { is Either.Right -> result.value is Either.Left -> throw AssertionError("Expected Either.Right, but found Either.Left with value ${result.value}") } @@ -66,14 +65,6 @@ private suspend infix fun Effect>.thenStateAndVersion(e } -private suspend fun Effect.thenError() { - val error = when (val result = this.toEither()) { - is Either.Right -> throw AssertionError("Expected Either.Left, but found Either.Right with value ${result.value}") - is Either.Left -> result.value - } - error.shouldBeInstanceOf() -} - /** * Materialized View Test */ diff --git a/application-arrow/src/commonTest/kotlin/com/fraktalio/fmodel/application/StateStoredAggregateTest.kt b/application-arrow/src/commonTest/kotlin/com/fraktalio/fmodel/application/StateStoredAggregateTest.kt index c178d024..37e722b9 100644 --- a/application-arrow/src/commonTest/kotlin/com/fraktalio/fmodel/application/StateStoredAggregateTest.kt +++ b/application-arrow/src/commonTest/kotlin/com/fraktalio/fmodel/application/StateStoredAggregateTest.kt @@ -1,7 +1,6 @@ package com.fraktalio.fmodel.application import arrow.core.Either -import arrow.core.continuations.Effect import com.fraktalio.fmodel.application.examples.numbers.NumberStateRepository import com.fraktalio.fmodel.application.examples.numbers.even.command.EvenNumberLockingStateRepository import com.fraktalio.fmodel.application.examples.numbers.even.command.EvenNumberStateRepository @@ -29,7 +28,7 @@ import kotlinx.coroutines.FlowPreview private suspend fun IDecider.given( repository: StateRepository, command: () -> C -): Effect = +): Either = stateStoredAggregate( decider = this, stateRepository = repository @@ -39,7 +38,7 @@ private suspend fun IDecider.given( private suspend fun IDecider.given( repository: StateLockingRepository, command: () -> C -): Effect> = +): Either> = stateStoredLockingAggregate( decider = this, stateRepository = repository @@ -54,16 +53,16 @@ private fun IDecider.whenCommand(command: C): C = command /** * DSL - Then */ -private suspend infix fun Effect.thenState(expected: S) { - val state = when (val result = this.toEither()) { +private infix fun Either.thenState(expected: S) { + val state = when (val result = this) { is Either.Right -> result.value is Either.Left -> throw AssertionError("Expected Either.Right, but found Either.Left with value ${result.value}") } return state shouldBe expected } -private suspend infix fun Effect>.thenStateAndVersion(expected: Pair) { - val state = when (val result = this.toEither()) { +private infix fun Either>.thenStateAndVersion(expected: Pair) { + val state = when (val result = this) { is Either.Right -> result.value is Either.Left -> throw AssertionError("Expected Either.Right, but found Either.Left with value ${result.value}") } @@ -71,8 +70,8 @@ private suspend infix fun Effect>.thenStateAndVersion(e } -private suspend fun Effect.thenError() { - val error = when (val result = this.toEither()) { +private fun Either.thenError() { + val error = when (val result = this) { is Either.Right -> throw AssertionError("Expected Either.Left, but found Either.Right with value ${result.value}") is Either.Left -> result.value } @@ -119,9 +118,7 @@ class StateStoredAggregateTest : FunSpec({ given(evenNumberStateRepository) { whenCommand(command) }.thenError() - } - } test("Combined State-stored aggregate - add even number") { diff --git a/application-arrow/src/jvmMain/kotlin/com/fraktalio/fmodel/application/EventSourcingAggregateArrowContextualExtension.kt b/application-arrow/src/jvmMain/kotlin/com/fraktalio/fmodel/application/EventSourcingAggregateArrowContextualExtension.kt new file mode 100644 index 00000000..21ee3e7a --- /dev/null +++ b/application-arrow/src/jvmMain/kotlin/com/fraktalio/fmodel/application/EventSourcingAggregateArrowContextualExtension.kt @@ -0,0 +1,68 @@ +package com.fraktalio.fmodel.application + +import arrow.core.Either +import arrow.core.raise.either +import com.fraktalio.fmodel.application.Error.CommandHandlingFailed +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.* + + +context (EventComputation, EventRepository) +@FlowPreview +fun C.handleWithEffect(): Flow> = + fetchEvents() + .computeNewEvents(this) + .save() + .map { either { it } } + .catch { emit(either { raise(CommandHandlingFailed(this@handleWithEffect)) }) } + +context (EventComputation, EventLockingRepository) +@FlowPreview +fun C.handleOptimisticallyWithEffect(): Flow>> = flow { + val events = this@handleOptimisticallyWithEffect.fetchEvents() + emitAll( + events.map { it.first } + .computeNewEvents(this@handleOptimisticallyWithEffect) + .save(events.lastOrNull()) + .map { either> { it } } + .catch { emit(either { raise(CommandHandlingFailed(this@handleOptimisticallyWithEffect)) }) } + ) +} + +context (EventOrchestratingComputation, EventRepository) +@FlowPreview +fun C.handleWithEffect(): Flow> = + fetchEvents() + .computeNewEventsByOrchestrating(this) { it.fetchEvents() } + .save() + .map { either { it } } + .catch { emit(either { raise(CommandHandlingFailed(this@handleWithEffect)) }) } + +context (EventOrchestratingComputation, EventLockingRepository) +@FlowPreview +fun C.handleOptimisticallyWithEffect(): Flow>> = + fetchEvents().map { it.first } + .computeNewEventsByOrchestrating(this) { it.fetchEvents().map { pair -> pair.first } } + .save(latestVersionProvider) + .map { either> { it } } + .catch { emit(either { raise(CommandHandlingFailed(this@handleOptimisticallyWithEffect)) }) } + +context (EventComputation, EventRepository) +@FlowPreview +fun Flow.handleWithEffect(): Flow> = + flatMapConcat { it.handleWithEffect() }.catch { emit(either { raise(Error.CommandPublishingFailed(it)) }) } + +context (EventComputation, EventLockingRepository) +@FlowPreview +fun Flow.handleOptimisticallyWithEffect(): Flow>> = + flatMapConcat { it.handleOptimisticallyWithEffect() }.catch { emit(either { raise(Error.CommandPublishingFailed(it)) }) } + +context (EventOrchestratingComputation, EventRepository) +@FlowPreview +fun Flow.handleWithEffect(): Flow> = + flatMapConcat { it.handleWithEffect() }.catch { emit(either { raise(Error.CommandPublishingFailed(it)) }) } + +context (EventOrchestratingComputation, EventLockingRepository) +@FlowPreview +fun Flow.handleOptimisticallyWithEffect(): Flow>> = + flatMapConcat { it.handleOptimisticallyWithEffect() }.catch { emit(either { raise(Error.CommandPublishingFailed(it)) }) } diff --git a/application-arrow/src/jvmMain/kotlin/com/fraktalio/fmodel/application/MaterializedViewArrowContextualExtension.kt b/application-arrow/src/jvmMain/kotlin/com/fraktalio/fmodel/application/MaterializedViewArrowContextualExtension.kt new file mode 100644 index 00000000..654a67dd --- /dev/null +++ b/application-arrow/src/jvmMain/kotlin/com/fraktalio/fmodel/application/MaterializedViewArrowContextualExtension.kt @@ -0,0 +1,50 @@ +package com.fraktalio.fmodel.application + +import arrow.core.Either +import arrow.core.raise.Raise +import arrow.core.raise.catch +import arrow.core.raise.either +import com.fraktalio.fmodel.application.Error.EventHandlingFailed +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.map + + +context (ViewStateComputation, ViewStateRepository, Raise) +suspend fun E.handleWithEffect(): S = catch({ + fetchState().computeNewState(this@handleWithEffect).save() +}) { + raise(EventHandlingFailed(this@handleWithEffect, it)) +} + +context (ViewStateComputation, ViewStateLockingRepository, Raise) +suspend fun E.handleOptimisticallyWithEffect(): Pair = catch({ + val (state, version) = fetchState() + state.computeNewState(this@handleOptimisticallyWithEffect).save(version) +}) { + raise(EventHandlingFailed(this@handleOptimisticallyWithEffect, it)) +} + +context (ViewStateComputation, ViewStateLockingDeduplicationRepository, Raise) +suspend fun E.handleOptimisticallyWithDeduplicationAndEffect(eventAndVersion: Pair): Pair { + val (event, eventVersion) = eventAndVersion + return catch({ + val (state, stateVersion) = event.fetchState() + state.computeNewState(event).save(eventVersion, stateVersion) + }) { + raise(EventHandlingFailed(this@handleOptimisticallyWithDeduplicationAndEffect, it)) + } +} + +context (ViewStateComputation, ViewStateRepository) +fun Flow.handleWithEffect(): Flow> = + map { either { it.handleWithEffect() } }.catch { emit(either { raise(Error.EventPublishingFailed(it)) }) } + +context (ViewStateComputation, ViewStateLockingRepository) +fun Flow.handleOptimisticallyWithEffect(): Flow>> = + map { either { it.handleOptimisticallyWithEffect() } }.catch { emit(either { raise(Error.EventPublishingFailed(it)) }) } + +context (ViewStateComputation, ViewStateLockingDeduplicationRepository) +fun Flow.handleOptimisticallyWithDeduplicationAndEffect(eventAndVersion: Pair): Flow>> = + map { either { it.handleOptimisticallyWithDeduplicationAndEffect(eventAndVersion) } } + .catch { emit(either { raise(Error.EventPublishingFailed(it)) }) } diff --git a/application-arrow/src/jvmMain/kotlin/com/fraktalio/fmodel/application/SagaManagerArrowContextualExtension.kt.kt b/application-arrow/src/jvmMain/kotlin/com/fraktalio/fmodel/application/SagaManagerArrowContextualExtension.kt.kt new file mode 100644 index 00000000..4c77af72 --- /dev/null +++ b/application-arrow/src/jvmMain/kotlin/com/fraktalio/fmodel/application/SagaManagerArrowContextualExtension.kt.kt @@ -0,0 +1,23 @@ +package com.fraktalio.fmodel.application + +import arrow.core.Either +import arrow.core.raise.either +import com.fraktalio.fmodel.application.Error.ActionResultHandlingFailed +import com.fraktalio.fmodel.application.Error.ActionResultPublishingFailed +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.flatMapConcat +import kotlinx.coroutines.flow.map + +context (SagaManager) +fun AR.handleWithEffect(): Flow> = + computeNewActions() + .publish() + .map { either { it } } + .catch { emit(either { raise(ActionResultHandlingFailed(this@handleWithEffect, it)) }) } + +context (SagaManager) +@FlowPreview +fun Flow.handleWithEffect(): Flow> = + flatMapConcat { it.handleWithEffect() }.catch { emit(either { raise(ActionResultPublishingFailed(it)) }) } diff --git a/application-arrow/src/jvmMain/kotlin/com/fraktalio/fmodel/application/StateStoredAggregateArrowContextualExtension.kt b/application-arrow/src/jvmMain/kotlin/com/fraktalio/fmodel/application/StateStoredAggregateArrowContextualExtension.kt new file mode 100644 index 00000000..8ebcd94e --- /dev/null +++ b/application-arrow/src/jvmMain/kotlin/com/fraktalio/fmodel/application/StateStoredAggregateArrowContextualExtension.kt @@ -0,0 +1,72 @@ +package com.fraktalio.fmodel.application + +import arrow.core.Either +import arrow.core.raise.Raise +import arrow.core.raise.catch +import arrow.core.raise.either +import com.fraktalio.fmodel.application.Error.CommandHandlingFailed +import com.fraktalio.fmodel.application.Error.CommandPublishingFailed +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.map + +context (StateComputation, StateRepository, Raise) +suspend fun C.handleWithEffect(): S = + catch({ + fetchState().computeNewState(this@handleWithEffect).save() + }) { + raise(CommandHandlingFailed(this@handleWithEffect, it)) + } + +context (StateComputation, StateRepository) +fun Flow.handleWithEffect(): Flow> = + map { either { it.handleWithEffect() } }.catch { emit(either { raise(CommandPublishingFailed(it)) }) } + + +context (StateComputation, StateLockingRepository, Raise) +suspend fun C.handleOptimisticallyWithEffect(): Pair = + catch({ + val (state, version) = this@handleOptimisticallyWithEffect.fetchState() + state + .computeNewState(this@handleOptimisticallyWithEffect) + .save(version) + }) { + raise(CommandHandlingFailed(this@handleOptimisticallyWithEffect, it)) + } + +context (StateComputation, StateLockingRepository) +fun Flow.handleOptimisticallyWithEffect(): Flow>> = + map { either { it.handleOptimisticallyWithEffect() } }.catch { emit(either { raise(CommandPublishingFailed(it)) }) } + +context (StateOrchestratingComputation, StateRepository, Raise) +@FlowPreview +suspend fun C.handleWithEffect(): S = + catch({ + fetchState().computeNewState(this@handleWithEffect).save() + }) { + raise(CommandHandlingFailed(this@handleWithEffect, it)) + } + +context (StateOrchestratingComputation, StateRepository) +@FlowPreview +suspend fun Flow.handleWithEffect(): Flow> = + map { either { it.handleWithEffect() } }.catch { emit(either { raise(CommandPublishingFailed(it)) }) } + + +context (StateOrchestratingComputation, StateLockingRepository, Raise) +@FlowPreview +suspend fun C.handleOptimisticallyWithEffect(): Pair = + catch({ + val (state, version) = this@handleOptimisticallyWithEffect.fetchState() + state + .computeNewState(this@handleOptimisticallyWithEffect) + .save(version) + }) { + raise(CommandHandlingFailed(this@handleOptimisticallyWithEffect, it)) + } + +context (StateOrchestratingComputation, StateLockingRepository) +@FlowPreview +suspend fun Flow.handleOptimisticallyWithEffect(): Flow>> = + map { either { it.handleOptimisticallyWithEffect() } }.catch { emit(either { raise(CommandPublishingFailed(it)) }) } diff --git a/application-arrow/src/jvmTest/kotlin/com/fraktalio/fmodel/application/EventSourcedAggregateArrowContextualTest.kt b/application-arrow/src/jvmTest/kotlin/com/fraktalio/fmodel/application/EventSourcedAggregateArrowContextualTest.kt new file mode 100644 index 00000000..d2bf640c --- /dev/null +++ b/application-arrow/src/jvmTest/kotlin/com/fraktalio/fmodel/application/EventSourcedAggregateArrowContextualTest.kt @@ -0,0 +1,62 @@ +package com.fraktalio.fmodel.application + +import arrow.core.Either +import arrow.core.Either.Left +import arrow.core.Either.Right +import com.fraktalio.fmodel.application.Error.CommandPublishingFailed +import com.fraktalio.fmodel.application.examples.numbers.even.command.EvenNumberRepository +import com.fraktalio.fmodel.application.examples.numbers.even.command.evenNumberRepository +import com.fraktalio.fmodel.domain.examples.numbers.api.Description +import com.fraktalio.fmodel.domain.examples.numbers.api.NumberCommand.EvenNumberCommand +import com.fraktalio.fmodel.domain.examples.numbers.api.NumberCommand.EvenNumberCommand.AddEvenNumber +import com.fraktalio.fmodel.domain.examples.numbers.api.NumberEvent.EvenNumberEvent.EvenNumberAdded +import com.fraktalio.fmodel.domain.examples.numbers.api.NumberValue +import com.fraktalio.fmodel.domain.examples.numbers.even.command.evenNumberDecider +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.collections.shouldContainExactly +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.toList +import kotlin.contracts.ExperimentalContracts + +private suspend infix fun Flow>.thenEvents(expected: Iterable>) = + toList() shouldContainExactly (expected) + +/** + * Event sourced aggregate contextual (context receivers) test + */ +@ExperimentalContracts +@FlowPreview +class EventSourcedAggregateArrowContextualTest : FunSpec({ + val evenDecider = evenNumberDecider() + val evenNumberRepository = evenNumberRepository() as EvenNumberRepository + + test("Event-sourced aggregate arrow contextual - add even number") { + evenNumberRepository.deleteAll() + with(eventSourcingAggregate(evenDecider, evenNumberRepository)) { + flowOf( + AddEvenNumber(Description("desc"), NumberValue(4)) + ).handleWithEffect() thenEvents ( + listOf( + Right(EvenNumberAdded(Description("desc"), NumberValue(4))) + ) + ) + } + } + test("Event-sourced aggregate arrow contextual - add even number") { + evenNumberRepository.deleteAll() + with(eventSourcingAggregate(evenDecider, evenNumberRepository)) { + val exception = IllegalStateException("Error") + flow { + emit(AddEvenNumber(Description("desc"), NumberValue(6))) + throw exception + }.handleWithEffect() thenEvents listOf( + Right(EvenNumberAdded(Description("desc"), NumberValue(6))), + Left(CommandPublishingFailed(exception)) + ) + } + } + +}) diff --git a/application-arrow/src/jvmTest/kotlin/com/fraktalio/fmodel/application/MaterializedViewArrowContextualTest.kt b/application-arrow/src/jvmTest/kotlin/com/fraktalio/fmodel/application/MaterializedViewArrowContextualTest.kt new file mode 100644 index 00000000..5980c4c2 --- /dev/null +++ b/application-arrow/src/jvmTest/kotlin/com/fraktalio/fmodel/application/MaterializedViewArrowContextualTest.kt @@ -0,0 +1,61 @@ +package com.fraktalio.fmodel.application + +import arrow.core.Either +import com.fraktalio.fmodel.application.examples.numbers.even.query.EvenNumberViewRepository +import com.fraktalio.fmodel.application.examples.numbers.even.query.evenNumberViewRepository +import com.fraktalio.fmodel.domain.examples.numbers.api.Description +import com.fraktalio.fmodel.domain.examples.numbers.api.EvenNumberState +import com.fraktalio.fmodel.domain.examples.numbers.api.NumberEvent.EvenNumberEvent.EvenNumberAdded +import com.fraktalio.fmodel.domain.examples.numbers.api.NumberValue +import com.fraktalio.fmodel.domain.examples.numbers.even.query.evenNumberView +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.shouldBe +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.flowOf +import kotlin.contracts.ExperimentalContracts + + +private infix fun Either.thenState(expected: S) { + val state = when (val result = this) { + is Either.Right -> result.value + is Either.Left -> throw AssertionError("Expected Either.Right, but found Either.Left with value ${result.value}") + } + return state shouldBe expected +} + +/** + * Materialized View Contextual Test + */ +@FlowPreview +@ExperimentalContracts +class MaterializedViewArrowContextualTest : FunSpec({ + val evenView = evenNumberView() + val evenNumberViewRepository = evenNumberViewRepository() as EvenNumberViewRepository + + test("Materialized view arrow contextual - even number added") { + evenNumberViewRepository.deleteAll() + with(viewStateComputation(evenView)) { + with(evenNumberViewRepository) { + flowOf( + EvenNumberAdded(Description("EvenNumberAdded"), NumberValue(2)), + ).handleWithEffect().first() thenState EvenNumberState( + Description("Initial state, EvenNumberAdded"), + NumberValue(2) + ) + } + } + } + + test("Materialized view arrow contextual materialized view interface - even number added") { + evenNumberViewRepository.deleteAll() + with(materializedView(evenView, evenNumberViewRepository)) { + flowOf( + EvenNumberAdded(Description("EvenNumberAdded"), NumberValue(2)), + ).handleWithEffect().first() thenState EvenNumberState( + Description("Initial state, EvenNumberAdded"), + NumberValue(2) + ) + } + } +}) diff --git a/application-arrow/src/jvmTest/kotlin/com/fraktalio/fmodel/application/StateStoredAggregateArrowContextualTest.kt b/application-arrow/src/jvmTest/kotlin/com/fraktalio/fmodel/application/StateStoredAggregateArrowContextualTest.kt new file mode 100644 index 00000000..543f0994 --- /dev/null +++ b/application-arrow/src/jvmTest/kotlin/com/fraktalio/fmodel/application/StateStoredAggregateArrowContextualTest.kt @@ -0,0 +1,89 @@ +package com.fraktalio.fmodel.application + +import arrow.core.Either +import com.fraktalio.fmodel.application.examples.numbers.even.command.EvenNumberStateRepository +import com.fraktalio.fmodel.application.examples.numbers.even.command.evenNumberStateRepository +import com.fraktalio.fmodel.domain.examples.numbers.api.Description +import com.fraktalio.fmodel.domain.examples.numbers.api.EvenNumberState +import com.fraktalio.fmodel.domain.examples.numbers.api.NumberCommand.EvenNumberCommand.AddEvenNumber +import com.fraktalio.fmodel.domain.examples.numbers.api.NumberValue +import com.fraktalio.fmodel.domain.examples.numbers.even.command.evenNumberDecider +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.shouldBe +import io.kotest.matchers.types.shouldBeInstanceOf +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.last +import kotlin.contracts.ExperimentalContracts + + +private fun Either.thenError() { + val error = when (val result = this) { + is Either.Right -> throw AssertionError("Expected Either.Left, but found Either.Right with value ${result.value}") + is Either.Left -> result.value + } + error.shouldBeInstanceOf() +} + +private infix fun Either.thenState(expected: S) { + val state = when (val result = this) { + is Either.Right -> result.value + is Either.Left -> throw AssertionError("Expected Either.Right, but found Either.Left with value ${result.value}") + } + return state shouldBe expected +} + +/** + * State-stored aggregate arrow, contextual test + */ +@ExperimentalContracts +@FlowPreview +class StateStoredAggregateArrowContextualTest : FunSpec({ + val evenDecider = evenNumberDecider() + val evenNumberStateRepository = evenNumberStateRepository() as EvenNumberStateRepository + + test("State-stored aggregatearrow contextual - add even number") { + evenNumberStateRepository.deleteAll() + with(stateComputation(evenDecider)) { + with(evenNumberStateRepository) { + flowOf( + AddEvenNumber(Description("desc"), NumberValue(6)) + ).handleWithEffect().first() thenState EvenNumberState(Description("desc"), NumberValue(6)) + } + } + } + + test("State-stored aggregate arrow contextual with aggregate interface - add even number") { + evenNumberStateRepository.deleteAll() + with(stateStoredAggregate(evenDecider, evenNumberStateRepository)) { + flowOf( + AddEvenNumber(Description("desc"), NumberValue(6)) + ).handleWithEffect().first() thenState EvenNumberState(Description("desc"), NumberValue(6)) + } + } + + test("State-stored aggregate arrow contextual - add even number - exception (large number > 1000)") { + evenNumberStateRepository.deleteAll() + with(stateComputation(evenDecider)) { + with(evenNumberStateRepository) { + flowOf( + AddEvenNumber(Description("desc"), NumberValue(6000)), + AddEvenNumber(Description("desc"), NumberValue(6)) + ).handleWithEffect().first().thenError() + } + } + } + + test("State-stored aggregate arrow contextual - add even number - exception (large number > 1000) - 2") { + evenNumberStateRepository.deleteAll() + with(stateComputation(evenDecider)) { + with(evenNumberStateRepository) { + flowOf( + AddEvenNumber(Description("desc"), NumberValue(6000)), + AddEvenNumber(Description("desc"), NumberValue(6)) + ).handleWithEffect().last() thenState EvenNumberState(Description("desc"), NumberValue(6)) + } + } + } +}) diff --git a/application-vanilla/build.gradle.kts b/application-vanilla/build.gradle.kts index 2f5e1383..c2f8d8db 100644 --- a/application-vanilla/build.gradle.kts +++ b/application-vanilla/build.gradle.kts @@ -12,6 +12,7 @@ kotlin { compilations.all { kotlinOptions.jvmTarget = "1.8" kotlinOptions.verbose = true + kotlinOptions.freeCompilerArgs = kotlinOptions.freeCompilerArgs + "-Xcontext-receivers" } withJava() diff --git a/application-vanilla/src/jvmMain/kotlin/com/fraktalio/fmodel/application/EventSourcingAggregateContextualExtension.kt b/application-vanilla/src/jvmMain/kotlin/com/fraktalio/fmodel/application/EventSourcingAggregateContextualExtension.kt new file mode 100644 index 00000000..93b8edfb --- /dev/null +++ b/application-vanilla/src/jvmMain/kotlin/com/fraktalio/fmodel/application/EventSourcingAggregateContextualExtension.kt @@ -0,0 +1,96 @@ +package com.fraktalio.fmodel.application + +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.* + + +/** + * Handle command - Event-sourced aggregate/decider + * @receiver [EventComputation] - context receiver + * @receiver [EventRepository] - context receiver + * @receiver command of type C to be handled + */ +context (EventComputation, EventRepository) +fun C.handle(): Flow = fetchEvents().computeNewEvents(this).save() + + +/** + * Handle command optimistically - Event-sourced locking aggregate/decider + * @receiver [EventComputation] - context receiver + * @receiver [EventLockingRepository] - context receiver + * @receiver command of type C to be handled + */ +context (EventComputation, EventLockingRepository) +@FlowPreview +fun C.handleOptimistically(): Flow> = flow { + val events = this@handleOptimistically.fetchEvents() + emitAll( + events.map { it.first } + .computeNewEvents(this@handleOptimistically) + .save(events.lastOrNull()) + ) +} + +/** + * Handle command - Event-sourced orchestrating aggregate/decider + * @receiver [EventOrchestratingComputation] - context receiver + * @receiver [EventRepository] - context receiver + * @receiver command of type C to be handled + */ +context (EventOrchestratingComputation, EventRepository) +@FlowPreview +fun C.handle(): Flow = fetchEvents().computeNewEventsByOrchestrating(this) { it.fetchEvents() }.save() + +/** + * Handle command optimistically - Event-sourced orchestrating locking aggregate/decider + * @receiver [EventOrchestratingComputation] - context receiver + * @receiver [EventLockingRepository] - context receiver + * @receiver command of type C to be handled + */ +context (EventOrchestratingComputation, EventLockingRepository) +@FlowPreview +fun C.handleOptimistically(): Flow> = + this + .fetchEvents().map { it.first } + .computeNewEventsByOrchestrating(this) { it.fetchEvents().map { pair -> pair.first } } + .save(latestVersionProvider) + +/** + * Handle command(s) - Event-sourced aggregate/decider + * @receiver [EventComputation] - context receiver + * @receiver [EventRepository] - context receiver + * @receiver commands of type Flow to be handled + */ +context (EventComputation, EventRepository) +@FlowPreview +fun Flow.handle(): Flow = flatMapConcat { it.handle() } + +/** + * Handle command(s) optimistically - Event-sourced locking aggregate/decider + * @receiver [EventComputation] - context receiver + * @receiver [EventLockingRepository] - context receiver + * @receiver commands of type Flow to be handled + */ +context (EventComputation, EventLockingRepository) +@FlowPreview +fun Flow.handleOptimistically(): Flow> = flatMapConcat { it.handleOptimistically() } + +/** + * Handle command(s) - Event-sourced orchestrating aggregate/decider + * @receiver [EventOrchestratingComputation] - context receiver + * @receiver [EventRepository] - context receiver + * @receiver commands of type Flow to be handled + */ +context (EventOrchestratingComputation, EventRepository) +@FlowPreview +fun Flow.handle(): Flow = flatMapConcat { it.handle() } + +/** + * Handle command(s) optimistically - Event-sourced orchestrating locking aggregate/decider + * @receiver [EventOrchestratingComputation] - context receiver + * @receiver [EventLockingRepository] - context receiver + * @receiver commands of type Flow to be handled + */ +context (EventOrchestratingComputation, EventLockingRepository) +@FlowPreview +fun Flow.handleOptimistically(): Flow> = flatMapConcat { it.handleOptimistically() } diff --git a/application-vanilla/src/jvmMain/kotlin/com/fraktalio/fmodel/application/MaterializedViewContextualExtension.kt b/application-vanilla/src/jvmMain/kotlin/com/fraktalio/fmodel/application/MaterializedViewContextualExtension.kt new file mode 100644 index 00000000..fe6e0748 --- /dev/null +++ b/application-vanilla/src/jvmMain/kotlin/com/fraktalio/fmodel/application/MaterializedViewContextualExtension.kt @@ -0,0 +1,72 @@ +package com.fraktalio.fmodel.application + +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map + + +/** + * Handle event - Materialized View + * @receiver [ViewStateComputation] - context receiver + * @receiver [ViewStateRepository] - context receiver + * @receiver event of type E to be handled + */ +context (ViewStateComputation, ViewStateRepository) +suspend fun E.handle(): S = fetchState().computeNewState(this).save() + + +/** + * Handle event - Materialized View + * @receiver [ViewStateComputation] - context receiver + * @receiver [ViewStateLockingRepository] - context receiver + * @receiver event of type E to be handled + */ +context (ViewStateComputation, ViewStateLockingRepository) +suspend fun E.handleOptimistically(): Pair { + val (state, version) = this@handleOptimistically.fetchState() + return state + .computeNewState(this@handleOptimistically) + .save(version) +} + +/** + * Handle event - Materialized View + * @receiver [ViewStateComputation] - context receiver + * @receiver [ViewStateLockingDeduplicationRepository] - context receiver + * @receiver event of type E to be handled + */ +context (ViewStateComputation, ViewStateLockingDeduplicationRepository) +suspend fun E.handleOptimisticallyWithDeduplication(eventAndVersion: Pair): Pair { + val (event, eventVersion) = eventAndVersion + val (state, currentStateVersion) = event.fetchState() + return state + .computeNewState(event) + .save(eventVersion, currentStateVersion) +} + +/** + * Handle event(s) - Materialized View + * @receiver [ViewStateComputation] - context receiver + * @receiver [ViewStateRepository] - context receiver + * @receiver events of type Flow to be handled + */ +context (ViewStateComputation, ViewStateRepository) +fun Flow.handle(): Flow = map { it.handle() } + +/** + * Handle event(s) - Materialized View + * @receiver [ViewStateComputation] - context receiver + * @receiver [ViewStateLockingRepository] - context receiver + * @receiver events of type Flow to be handled + */ +context (ViewStateComputation, ViewStateLockingRepository) +fun Flow.handleOptimistically(): Flow> = map { it.handleOptimistically() } + +/** + * Handle event(s) - Materialized View + * @receiver [ViewStateComputation] - context receiver + * @receiver [ViewStateLockingDeduplicationRepository] - context receiver + * @receiver events of type Flow to be handled + */ +context (ViewStateComputation, ViewStateLockingDeduplicationRepository) +fun Flow.handleOptimisticallyWithDeduplication(eventAndVersion: Pair): Flow> = + map { it.handleOptimisticallyWithDeduplication(eventAndVersion) } diff --git a/application-vanilla/src/jvmMain/kotlin/com/fraktalio/fmodel/application/SagaManagerContextualExtension.kt b/application-vanilla/src/jvmMain/kotlin/com/fraktalio/fmodel/application/SagaManagerContextualExtension.kt new file mode 100644 index 00000000..f3fa3418 --- /dev/null +++ b/application-vanilla/src/jvmMain/kotlin/com/fraktalio/fmodel/application/SagaManagerContextualExtension.kt @@ -0,0 +1,108 @@ +package com.fraktalio.fmodel.application + +import com.fraktalio.fmodel.domain.ISaga +import kotlinx.coroutines.CoroutineStart +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flatMapConcat +import kotlin.contracts.ExperimentalContracts +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext + + +/** + * Materialized View algorithm + * Computes new State based on the previous State and the Event. + */ +context (ISaga, ActionPublisher) +internal fun AR.computeNewActions(): Flow = react(this) + +/** + * Handle event / action result - Saga Manager + * @receiver [ISaga] - context receiver + * @receiver [ActionPublisher] - context receiver + * @receiver event/action result of type AR to be handled + */ +context (ISaga, ActionPublisher) +fun AR.handle(): Flow = computeNewActions().publish() + +/** + * Handle event / action result - Saga Manager + * @receiver [SagaManager] - context receiver + * @receiver event/action result of type AR to be handled + * + * Alternative function to `context (ISaga, ActionPublisher) AR.handle()`, which combines multiple contexts ([ISaga], [ActionPublisher]) into a single meaningful interface/context [SagaManager] + */ +context (SagaManager) +fun AR.handleIt(): Flow = computeNewActions().publish() + +/** + * Handle event / action result - Saga Manager + * @receiver [ISaga] - context receiver + * @receiver [ActionPublisher] - context receiver + * @receiver events/actions result of type Flow to be handled + */ +context (ISaga, ActionPublisher) +@FlowPreview +fun Flow.handle(): Flow = flatMapConcat { it.handle() } + +/** + * Handle event / action result concurrently by the finite number of actors - Saga Manager + * @receiver [ISaga] - context receiver + * @receiver [ActionPublisher] - context receiver + * @receiver events/actions result of type Flow to be handled + */ +context (ISaga, ActionPublisher) +@ExperimentalContracts +@FlowPreview +fun Flow.handleConcurrently( + numberOfActors: Int = 100, + actorsCapacity: Int = Channel.BUFFERED, + actorsStart: CoroutineStart = CoroutineStart.LAZY, + actorsContext: CoroutineContext = EmptyCoroutineContext, + partitionKey: (AR) -> Int +): Flow = publishConcurrentlyTo( + sagaManager(this@ISaga, this@ActionPublisher), + numberOfActors, + actorsCapacity, + actorsStart, + actorsContext, + partitionKey +) + +/** + * Handle event / action result - Saga Manager + * @receiver [SagaManager] - context receiver + * @receiver events/actions result of type Flow to be handled + * + * Alternative function to `context (ISaga, ActionPublisher) Flow.handle()`, which combines multiple contexts ([ISaga], [ActionPublisher]) into a single meaningful interface/context [SagaManager] + */ +context (SagaManager) +@FlowPreview +fun Flow.handleIt(): Flow = flatMapConcat { it.handleIt() } + +/** + * Handle event / action result concurrently by the finite number of actors - Saga Manager + * @receiver [SagaManager] - context receiver + * @receiver events/actions result of type Flow to be handled + * + * Alternative function to `context (ISaga, ActionPublisher) Flow.handleConcurrently(...)`, which combines multiple contexts ([ISaga], [ActionPublisher]) into a single meaningful interface/context [SagaManager] + */ +context (SagaManager) +@ExperimentalContracts +@FlowPreview +fun Flow.handleItConcurrently( + numberOfActors: Int = 100, + actorsCapacity: Int = Channel.BUFFERED, + actorsStart: CoroutineStart = CoroutineStart.LAZY, + actorsContext: CoroutineContext = EmptyCoroutineContext, + partitionKey: (AR) -> Int +): Flow = publishConcurrentlyTo( + this@SagaManager, + numberOfActors, + actorsCapacity, + actorsStart, + actorsContext, + partitionKey +) diff --git a/application-vanilla/src/jvmMain/kotlin/com/fraktalio/fmodel/application/StateStoredAggregateContextualExtension.kt b/application-vanilla/src/jvmMain/kotlin/com/fraktalio/fmodel/application/StateStoredAggregateContextualExtension.kt new file mode 100644 index 00000000..8c4cbba4 --- /dev/null +++ b/application-vanilla/src/jvmMain/kotlin/com/fraktalio/fmodel/application/StateStoredAggregateContextualExtension.kt @@ -0,0 +1,93 @@ +package com.fraktalio.fmodel.application + +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.map + +/** + * Handle command - State-stored aggregate/decider + * @receiver [StateComputation] - context receiver + * @receiver [StateRepository] - context receiver + * @receiver command of type C to be handled + */ +context (StateComputation, StateRepository) +suspend fun C.handle(): S = fetchState().computeNewState(this).save() + +/** + * Handle command - State-stored aggregate/decider + * @receiver [StateComputation] - context receiver + * @receiver [StateLockingRepository] - context receiver + * @receiver command of type C to be handled + */ +context (StateComputation, StateLockingRepository) +suspend fun C.handleOptimistically(): Pair { + val (state, version) = this@handleOptimistically.fetchState() + return state + .computeNewState(this@handleOptimistically) + .save(version) +} + +/** + * Handle command - State-stored orchestrating aggregate/decider + * @receiver [StateOrchestratingComputation] - context receiver + * @receiver [StateRepository] - context receiver + * @receiver command of type C to be handled + */ +context (StateOrchestratingComputation, StateRepository) +@FlowPreview +suspend fun C.handle(): S = fetchState().computeNewState(this).save() + +/** + * Handle command - State-stored orchestrating aggregate/decider + * @receiver [StateOrchestratingComputation] - context receiver + * @receiver [StateLockingRepository] - context receiver + * @receiver command of type C to be handled + */ +context (StateOrchestratingComputation, StateLockingRepository) +@FlowPreview +suspend fun C.handleOptimistically(): Pair { + val (state, version) = this@handleOptimistically.fetchState() + return state + .computeNewState(this@handleOptimistically) + .save(version) +} + +/** + * Handle command(s) - State-stored aggregate/decider + * @receiver [StateComputation] - context receiver + * @receiver [StateRepository] - context receiver + * @receiver commands of type `Flow` to be handled + */ +context (StateComputation, StateRepository) +fun Flow.handle(): Flow = map { it.handle() } + +/** + * Handle command(s) - State-stored aggregate/decider + * @receiver [StateComputation] - context receiver + * @receiver [StateLockingRepository] - context receiver + * @receiver commands of type `Flow` to be handled + */ +context (StateComputation, StateLockingRepository) +fun Flow.handleOptimistically(): Flow> = map { it.handleOptimistically() } + + +/** + * Handle command(s) - State-stored orchestrating aggregate/decider + * @receiver [StateOrchestratingComputation] - context receiver + * @receiver [StateRepository] - context receiver + * @receiver commands of type `Flow` to be handled + */ +context (StateOrchestratingComputation, StateRepository) +@FlowPreview +suspend fun Flow.handle(): Flow = map { it.handle() } + +/** + * Handle command(s) - State-stored orchestrating aggregate/decider + * @receiver [StateOrchestratingComputation] - context receiver + * @receiver [StateLockingRepository] - context receiver + * @receiver commands of type `Flow` to be handled + */ +context (StateOrchestratingComputation, StateLockingRepository) +@FlowPreview +suspend fun Flow.handleOptimistically(): Flow> = + map { it.handleOptimistically() } diff --git a/application-vanilla/src/jvmTest/kotlin/com/fraktalio/fmodel/application/EventSourcedAggregateContextualTest.kt b/application-vanilla/src/jvmTest/kotlin/com/fraktalio/fmodel/application/EventSourcedAggregateContextualTest.kt new file mode 100644 index 00000000..d6244957 --- /dev/null +++ b/application-vanilla/src/jvmTest/kotlin/com/fraktalio/fmodel/application/EventSourcedAggregateContextualTest.kt @@ -0,0 +1,52 @@ +package com.fraktalio.fmodel.application + +import com.fraktalio.fmodel.application.examples.numbers.even.command.EvenNumberRepository +import com.fraktalio.fmodel.application.examples.numbers.even.command.evenNumberRepository +import com.fraktalio.fmodel.domain.examples.numbers.api.Description +import com.fraktalio.fmodel.domain.examples.numbers.api.NumberCommand.EvenNumberCommand.AddEvenNumber +import com.fraktalio.fmodel.domain.examples.numbers.api.NumberEvent.EvenNumberEvent.EvenNumberAdded +import com.fraktalio.fmodel.domain.examples.numbers.api.NumberValue +import com.fraktalio.fmodel.domain.examples.numbers.even.command.evenNumberDecider +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.collections.shouldContainExactly +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.toList +import kotlin.contracts.ExperimentalContracts + +/** + * Event sourced aggregate contextual (context receivers) test + */ +@ExperimentalContracts +@FlowPreview +class EventSourcedAggregateContextualTest : FunSpec({ + val evenDecider = evenNumberDecider() + val evenNumberRepository = evenNumberRepository() as EvenNumberRepository + + // version 1 + test("Event-sourced aggregate contextual - add even number") { + evenNumberRepository.deleteAll() + with(eventSourcingAggregate(evenDecider, evenNumberRepository)) { + flowOf( + AddEvenNumber(Description("desc"), NumberValue(4)) + ).handle().toList() shouldContainExactly listOf( + EvenNumberAdded(Description("desc"), NumberValue(4)) + ) + } + + } + // version 2 + test("Event-sourced decider and repository - contextual - add even number") { + evenNumberRepository.deleteAll() + with(eventComputation(evenDecider)) { + with(evenNumberRepository) { + flowOf( + AddEvenNumber(Description("desc"), NumberValue(4)) + ).handle().toList() shouldContainExactly listOf( + EvenNumberAdded(Description("desc"), NumberValue(4)) + ) + } + } + + } +}) diff --git a/application-vanilla/src/jvmTest/kotlin/com/fraktalio/fmodel/application/MaterializedViewContextualTest.kt b/application-vanilla/src/jvmTest/kotlin/com/fraktalio/fmodel/application/MaterializedViewContextualTest.kt new file mode 100644 index 00000000..c9cbfe65 --- /dev/null +++ b/application-vanilla/src/jvmTest/kotlin/com/fraktalio/fmodel/application/MaterializedViewContextualTest.kt @@ -0,0 +1,53 @@ +package com.fraktalio.fmodel.application + +import com.fraktalio.fmodel.application.examples.numbers.even.query.EvenNumberViewRepository +import com.fraktalio.fmodel.application.examples.numbers.even.query.evenNumberViewRepository +import com.fraktalio.fmodel.domain.examples.numbers.api.Description +import com.fraktalio.fmodel.domain.examples.numbers.api.EvenNumberState +import com.fraktalio.fmodel.domain.examples.numbers.api.NumberEvent.EvenNumberEvent.EvenNumberAdded +import com.fraktalio.fmodel.domain.examples.numbers.api.NumberValue +import com.fraktalio.fmodel.domain.examples.numbers.even.query.evenNumberView +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.collections.shouldContainExactly +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.toList +import kotlin.contracts.ExperimentalContracts + +/** + * Materialized View Contextual Test + */ +@FlowPreview +@ExperimentalContracts +class MaterializedViewContextualTest : FunSpec({ + val evenView = evenNumberView() + val evenNumberViewRepository = evenNumberViewRepository() as EvenNumberViewRepository + + test("Materialized view contextual - even number added") { + evenNumberViewRepository.deleteAll() + with(viewStateComputation(evenView)) { + with(evenNumberViewRepository) { + flowOf( + EvenNumberAdded(Description("EvenNumberAdded"), NumberValue(2)), + EvenNumberAdded(Description("EvenNumberAdded"), NumberValue(4)) + ).handle().toList() shouldContainExactly listOf( + EvenNumberState(Description("Initial state, EvenNumberAdded"), NumberValue(2)), + EvenNumberState(Description("Initial state, EvenNumberAdded, EvenNumberAdded"), NumberValue(6)) + ) + } + } + } + + test("Materialized view contextual materialized view interface - even number added") { + evenNumberViewRepository.deleteAll() + with(materializedView(evenView, evenNumberViewRepository)) { + flowOf( + EvenNumberAdded(Description("EvenNumberAdded"), NumberValue(2)), + EvenNumberAdded(Description("EvenNumberAdded"), NumberValue(4)) + ).handle().toList() shouldContainExactly listOf( + EvenNumberState(Description("Initial state, EvenNumberAdded"), NumberValue(2)), + EvenNumberState(Description("Initial state, EvenNumberAdded, EvenNumberAdded"), NumberValue(6)) + ) + } + } +}) diff --git a/application-vanilla/src/jvmTest/kotlin/com/fraktalio/fmodel/application/StateStoredAggregateContextualTest.kt b/application-vanilla/src/jvmTest/kotlin/com/fraktalio/fmodel/application/StateStoredAggregateContextualTest.kt new file mode 100644 index 00000000..3bd78257 --- /dev/null +++ b/application-vanilla/src/jvmTest/kotlin/com/fraktalio/fmodel/application/StateStoredAggregateContextualTest.kt @@ -0,0 +1,68 @@ +package com.fraktalio.fmodel.application + +import com.fraktalio.fmodel.application.examples.numbers.even.command.EvenNumberStateRepository +import com.fraktalio.fmodel.application.examples.numbers.even.command.evenNumberStateRepository +import com.fraktalio.fmodel.domain.examples.numbers.api.Description +import com.fraktalio.fmodel.domain.examples.numbers.api.EvenNumberState +import com.fraktalio.fmodel.domain.examples.numbers.api.NumberCommand.EvenNumberCommand.AddEvenNumber +import com.fraktalio.fmodel.domain.examples.numbers.api.NumberValue +import com.fraktalio.fmodel.domain.examples.numbers.even.command.evenNumberDecider +import io.kotest.assertions.throwables.shouldThrow +import io.kotest.core.spec.style.FunSpec +import io.kotest.matchers.collections.shouldContainExactly +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.toList +import kotlin.contracts.ExperimentalContracts + +/** + * State-stored aggregate contextual test + */ +@ExperimentalContracts +@FlowPreview +class StateStoredAggregateContextualTest : FunSpec({ + val evenDecider = evenNumberDecider() + val evenNumberStateRepository = evenNumberStateRepository() as EvenNumberStateRepository + + test("State-stored aggregate contextual - add even number") { + evenNumberStateRepository.deleteAll() + with(stateComputation(evenDecider)) { + with(evenNumberStateRepository) { + flowOf( + AddEvenNumber(Description("desc"), NumberValue(6)), + AddEvenNumber(Description("desc"), NumberValue(4)) + ).handle().toList() shouldContainExactly listOf( + EvenNumberState(Description("desc"), NumberValue(6)), + EvenNumberState(Description("desc"), NumberValue(10)) + ) + } + } + } + + test("State-stored aggregate contextual with aggregate interface - add even number") { + evenNumberStateRepository.deleteAll() + with(stateStoredAggregate(evenDecider, evenNumberStateRepository)) { + flowOf( + AddEvenNumber(Description("desc"), NumberValue(6)), + AddEvenNumber(Description("desc"), NumberValue(4)) + ).handle().toList() shouldContainExactly listOf( + EvenNumberState(Description("desc"), NumberValue(6)), + EvenNumberState(Description("desc"), NumberValue(10)) + ) + } + } + + test("State-stored aggregate contextual - add even number - exception (large number > 1000)") { + shouldThrow { + with(stateComputation(evenDecider)) { + with(evenNumberStateRepository) { + flowOf( + AddEvenNumber(Description("desc"), NumberValue(6000)) + ).handle().toList() shouldContainExactly listOf( + EvenNumberState(Description("desc"), NumberValue(6000)) + ) + } + } + } + } +}) diff --git a/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventSourcingAggregate.kt b/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventSourcingAggregate.kt index 43104ebc..24ab96ab 100644 --- a/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventSourcingAggregate.kt +++ b/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/EventSourcingAggregate.kt @@ -243,5 +243,20 @@ fun eventSourcingLockingOrchestratingAggregate( ): EventSourcingLockingOrchestratingAggregate = object : EventSourcingLockingOrchestratingAggregate, EventLockingRepository by eventRepository, + IDecider by decider, + ISaga by saga {} + + +fun eventComputation( + decider: IDecider +): EventComputation = + object : EventComputation, + IDecider by decider {} + +fun eventOrchestratingComputation( + decider: IDecider, + saga: ISaga +): EventOrchestratingComputation = + object : EventOrchestratingComputation, IDecider by decider, ISaga by saga {} \ No newline at end of file diff --git a/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/MaterializedView.kt b/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/MaterializedView.kt index 3744feca..6c626c5f 100644 --- a/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/MaterializedView.kt +++ b/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/MaterializedView.kt @@ -138,4 +138,10 @@ fun materializedLockingDeduplicationView( ): MaterializedLockingDeduplicationView = object : MaterializedLockingDeduplicationView, ViewStateLockingDeduplicationRepository by viewStateRepository, + IView by view {} + +fun viewStateComputation( + view: IView, +): ViewStateComputation = + object : ViewStateComputation, IView by view {} \ No newline at end of file diff --git a/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/Result.kt b/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/Result.kt index f1d51e58..7dc47c66 100644 --- a/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/Result.kt +++ b/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/Result.kt @@ -39,22 +39,8 @@ sealed class Error : Result() { data class EventPublishingFailed(override val throwable: Throwable? = null) : Error() data class ActionResultPublishingFailed(override val throwable: Throwable? = null) : Error() data class CommandHandlingFailed(val command: C, override val throwable: Throwable? = null) : Error() + data class EventHandlingFailed(val event: E, override val throwable: Throwable? = null) : Error() data class ActionResultHandlingFailed(val actionResult: AR, override val throwable: Throwable? = null) : Error() - data class FetchingStateFailed(val command: C, override val throwable: Throwable? = null) : Error() - data class FetchingViewStateFailed(val event: E, override val throwable: Throwable? = null) : Error() - data class CalculatingNewStateFailed( - val state: S, - val command: C, - override val throwable: Throwable? = null - ) : Error() - - data class CalculatingNewViewStateFailed( - val state: S, - val event: E, - override val throwable: Throwable? = null - ) : Error() - - data class StoringStateFailed(val state: S, override val throwable: Throwable? = null) : Error() } diff --git a/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/StateStoredAggregate.kt b/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/StateStoredAggregate.kt index 1af4c0bf..1a53efed 100644 --- a/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/StateStoredAggregate.kt +++ b/application/src/commonMain/kotlin/com/fraktalio/fmodel/application/StateStoredAggregate.kt @@ -34,7 +34,6 @@ interface StateComputation : IDecider { * @param command of type [C] * @return The newly computed state of type [S] */ - @FlowPreview suspend fun S?.computeNewState(command: C): S { val currentState = this ?: initialState val events = decide(command, currentState) @@ -257,5 +256,19 @@ fun stateStoredLockingOrchestratingAggregate( ): StateStoredLockingOrchestratingAggregate = object : StateStoredLockingOrchestratingAggregate, StateLockingRepository by stateRepository, + IDecider by decider, + ISaga by saga {} + +fun stateComputation( + decider: IDecider +): StateComputation = + object : StateComputation, + IDecider by decider {} + +fun stateOrchestratingComputation( + decider: IDecider, + saga: ISaga +): StateOrchestratingComputation = + object : StateOrchestratingComputation, IDecider by decider, ISaga by saga {} \ No newline at end of file diff --git a/build.gradle.kts b/build.gradle.kts index 97ef194c..44a68181 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -14,6 +14,7 @@ allprojects { repositories { mavenCentral() mavenLocal() + maven("https://oss.sonatype.org/content/repositories/snapshots") } } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 030290ce..536a76cf 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,10 +1,11 @@ [versions] coroutines = "1.6.4" dokka = "1.7.20" + kotest = "5.5.4" kotest-plugin = "5.5.4" kotlin = "1.8.0" -arrow = "1.1.3" +arrow = "2.0.0-SNAPSHOT" [libraries] coroutines-core = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version.ref = "coroutines" }