Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions core/src/main/scala/AggregateSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ import Cqrs.Aggregate.{ AggregateId, DatabaseWithAggregateFailure }
import Cqrs.Database.FoldableDatabase._
import Cqrs.Database._
import Cqrs.DbAdapters.InMemoryDb._
import Cqrs.{ Aggregate, Database, Projection, ProjectionRunner }
import Cqrs.{ Aggregate, Database, EventConsumer }
import cats.implicits._

import scala.concurrent.Await
import scala.concurrent.duration._
import scala.reflect.ClassTag

trait AggregateSpec {

Expand All @@ -32,27 +31,25 @@ trait AggregateSpec {
}
}

case class WhenSteps(val db: DB, startingDbOpNr: Long) {
case class WhenSteps(db: DB, startingDbOpNr: Long) {

def command[E, C, D, S](aggregate: Aggregate[E, C, D, S], id: AggregateId, cmd: C) = {
act(db, aggregate.loadAndHandleCommand(id, cmd)).swap.foreach { err => failStop(err.toString) }
this
}
}

case class ThenSteps(val db: DB, startingDbOpNr: Long) {
case class ThenSteps(db: DB, startingDbOpNr: Long) {

def newEvents[E](tag: Aggregate.EventTagAux[E], aggregateId: AggregateId): List[E] =
readEvents(db, startingDbOpNr, tag, aggregateId)

def failedCommandError[E, C, D, S](aggregate: Aggregate[E, C, D, S], id: AggregateId, cmd: C): Aggregate.Error =
act(db, aggregate.loadAndHandleCommand(id, cmd))
.fold(identity, _ => failStop("Command did not fail, although was expected to"))

def projectionData[D: ClassTag](projection: Projection[D]) = db.getProjectionData[D](projection)
}

def newDb(projections: ProjectionRunner*): GivenSteps = GivenSteps(newInMemoryDb(projections: _*))
def newDb(projections: EventConsumer*): GivenSteps = GivenSteps(newInMemoryDb(projections: _*))

def newDb: GivenSteps = GivenSteps(newInMemoryDb())

Expand All @@ -71,7 +68,7 @@ trait AggregateSpec {

class ThenStepFlow(whenSteps: WhenSteps) {
def thenCheck[R](steps: ThenSteps => R): R =
steps(ThenSteps(whenSteps.db /* .runProjections */ , whenSteps.startingDbOpNr))
steps(ThenSteps(whenSteps.db, whenSteps.startingDbOpNr))
}

private def readEvents[E](db: DB, fromOperation: Long, tag: Aggregate.EventTagAux[E], aggregateId: AggregateId) = {
Expand Down
3 changes: 0 additions & 3 deletions core/src/main/scala/Cqrs/Database.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import cats.free.Free.liftF
import cats.{ Monad, MonadError }

import scala.concurrent.Future
import scala.reflect.ClassTag
import scala.util.Try
import cats.implicits._

Expand Down Expand Up @@ -107,8 +106,6 @@ object Database {
case Right(Right(ret)) => Right(ret)
}
}

def getProjectionData[D: ClassTag](projection: Projection[D]): Option[D]
}

trait FoldableDatabase {
Expand Down
20 changes: 7 additions & 13 deletions core/src/main/scala/Cqrs/DbAdapters/InMemoryDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,25 @@ package Cqrs.DbAdapters
import Cqrs.Aggregate._
import Cqrs.Database.FoldableDatabase.{ EventDataConsumer, RawEventData }
import Cqrs.Database.{ Error, _ }
import Cqrs.{ Projection, ProjectionRunner }
import Cqrs.EventConsumer
import cats._
import cats.data.State
//import cats.std.all._
import cats.implicits._
import lib.foldM

import scala.collection.immutable.TreeMap
import scala.concurrent.Future
import scala.reflect.ClassTag

object InMemoryDb {

final case class StoredSnapshot(version: Int, data: String)

final case class DbBackend(
data: Map[String, Map[String, TreeMap[Int, String]]], // tag -> aggregate id -> version -> event data
log: TreeMap[Long, (String, String, Int)], // operation nr -> tag, aggregate id, aggregate version
lastOperationNr: Long,
projections: List[ProjectionRunner],
snapshots: Map[String, Map[String, StoredSnapshot]] // tag -> id -> data
data: Map[String, Map[String, TreeMap[Int, String]]], // tag -> aggregate id -> version -> event data
log: TreeMap[Long, (String, String, Int)], // operation nr -> tag, aggregate id, aggregate version
lastOperationNr: Long,
projections: List[EventConsumer],
snapshots: Map[String, Map[String, StoredSnapshot]] // tag -> id -> data
)

private type Db[A] = State[DbBackend, A]
Expand Down Expand Up @@ -132,7 +130,7 @@ object InMemoryDb {
)
}

def newInMemoryDb(projections: ProjectionRunner*) = new Backend with FoldableDatabase {
def newInMemoryDb(projections: EventConsumer*) = new Backend with FoldableDatabase {
var db = DbBackend(TreeMap.empty, TreeMap.empty, 0, projections.toList, Map.empty);

def runDb[E, A](actions: EventDatabaseWithFailure[E, A]): Future[Error Either A] = synchronized {
Expand All @@ -141,10 +139,6 @@ object InMemoryDb {
Future.successful(r)
}

def getProjectionData[D: ClassTag](projection: Projection[D]): Option[D] = synchronized {
db.projections.foldLeft(None: Option[D])((ret, p) => ret.orElse(p.getProjectionData[D](projection)))
}

def consumeDbEvents[D](fromOperation: Long, initData: D, queries: List[EventDataConsumer[D]]): Error Either (Long, D) = synchronized {

def findData(tag: String, id: String, version: Int): Error Either String = {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/Cqrs/EventFlow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ object EventFlow {
import scala.meta._
import scala.collection.immutable.Seq

@scala.annotation.compileTimeOnly("scalaplayground.macros not expanded")
@scala.annotation.compileTimeOnly("EventFlow.state not expanded")
class state extends scala.annotation.StaticAnnotation {
inline def apply(defn: Any): Any = meta {
defn match {
Expand Down
60 changes: 32 additions & 28 deletions core/src/main/scala/Cqrs/Projection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,53 @@ package Cqrs
import Cqrs.Aggregate._
import Cqrs.Database.EventData

import scala.reflect.ClassTag

trait Projection[D] {
type Data = D
object Projection {
def named(name: String) = new NamedProjection {
def listeningFor[D](aggregates: AggregateBase*)(handler: D => PartialFunction[EventData[_], D]) = new StartsWithKeyword[D] {
def startsWith(initialData: D) = new EventConsumerWithData[D](aggregates.toList.map(_.tag), handler, initialData)
}
}

def initialData: Data
trait NamedProjection {
def listeningFor[D](aggregates: AggregateBase*)(handler: D => PartialFunction[EventData[_], D]): StartsWithKeyword[D]
}

def listeningFor: List[EventTag]
trait StartsWithKeyword[D] {
def startsWith(initialData: D): EventConsumer
}
}

def accept[E](data: Data): PartialFunction[EventData[E], Data]
trait ProjectionSubscriber[D] {
def update(data: D)
}

trait ProjectionRunner {
trait EventConsumer {
def listeningFor: List[EventTag]

def accept[E](eventData: EventData[E]): ProjectionRunner

def getProjectionData[D: ClassTag](projection: Projection[D]): Option[D]
def accept[E](eventData: EventData[E]): EventConsumer
}

import scala.language.implicitConversions

object ProjectionRunner {
implicit def createProjectionRunner[D](p: Projection[D]): ProjectionRunner = ConcreteProjectionRunner[D](p, p.initialData)
}
final class EventConsumerWithData[Data](val listeningFor: List[EventTag], handler: Data => PartialFunction[EventData[_], Data], initData: Data) extends EventConsumer {

case class ConcreteProjectionRunner[Data](proj: Projection[Data], data: Data) extends ProjectionRunner {
def listeningFor = proj.listeningFor
private var data: Data = initData
private var subscribers: List[ProjectionSubscriber[Data]] = List.empty

def accept[E](eventData: EventData[E]) =
proj.accept(data).lift(eventData) match {
case Some(newData) => copy(data = newData)
def accept[E](eventData: EventData[E]) = this.synchronized {
handler(data).lift(eventData) match {
case Some(newData) =>
subscribers.foreach(_.update(newData))
data = newData
this
case None => this
}
}

def getProjectionData[D: ClassTag](projection: Projection[D]): Option[D] = {
if (proj.getClass.getName == projection.getClass.getName) {
data match {
case asD: D => Some(asD)
case _ => None
}
}
else None
def subscribe(subscriber: ProjectionSubscriber[Data]) = this.synchronized {
subscribers = subscriber :: subscribers
this
}

def getData = this.synchronized(data)
}

Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,21 @@ import java.io.Closeable

import Cqrs.Aggregate._
import Cqrs.Database.{ Error, _ }
import Cqrs.{ Projection, ProjectionRunner }
import Cqrs.EventConsumer
import akka.actor.ActorSystem
import cats._
import cats.implicits._
import eventstore._

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.reflect.ClassTag

object EventStore {

class DbBackend(
system: ActorSystem,
connection: EsConnection,
private var projections: List[ProjectionRunner]
private var projections: List[EventConsumer]
) extends Backend {

val allEventsSubscription = connection.subscribeToAllFrom(new SubscriptionObserver[IndexedEvent] {
Expand Down Expand Up @@ -161,12 +160,9 @@ object EventStore {
}
}

def getProjectionData[D: ClassTag](projection: Projection[D]): Option[D] = {
projections.foldLeft(None: Option[D])((ret, p) => ret.orElse(p.getProjectionData[D](projection)))
}
}

def newEventStoreConn(projections: ProjectionRunner*): DbBackend = {
def newEventStoreConn(projections: EventConsumer*): DbBackend = {
val system = ActorSystem()
val connection = EsConnection(system)
new DbBackend(system, connection, projections.toList)
Expand Down
20 changes: 7 additions & 13 deletions example/src/main/scala/Domain/Counter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import Domain.Counter._
import scala.collection.immutable.TreeMap



object Counter {

sealed trait Event
Expand All @@ -26,6 +25,13 @@ object Counter {
case object Increment extends Command

case object Decrement extends Command

def newCurrentValueProjection = Projection.named("counters").listeningFor(CounterAggregate) { d: TreeMap[AggregateId, Int] => {
case EventData(_, id, _, Created(_, start)) => d + (id -> start)
case EventData(_, id, _, Incremented) => d + (id -> d.get(id).fold(1)(_ + 1))
case EventData(_, id, _, Decremented) => d + (id -> d.get(id).fold(-1)(_ - 1))
}}.
startsWith(TreeMap.empty)
}

object CounterAggregate extends EventFlow[Event, Command] {
Expand All @@ -42,15 +48,3 @@ object CounterAggregate extends EventFlow[Event, Command] {
}
}

object CounterProjection extends Projection[TreeMap[AggregateId, Int]] {
def initialData = TreeMap.empty

val listeningFor = List(CounterAggregate.tag)

def accept[E](d: Data) = {
case EventData(_, id, _, Created(_, start)) => d + (id -> start)
case EventData(_, id, _, Incremented) => d + (id -> d.get(id).fold(1)(_ + 1))
case EventData(_, id, _, Decremented) => d + (id -> d.get(id).fold(-1)(_ - 1))
}
}

21 changes: 7 additions & 14 deletions example/src/main/scala/Domain/Door.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,13 @@ object DoorState {

case object Locked extends DoorState

def newCurrentStateProjection = Projection.named("doorStates").listeningFor(DoorAggregate) { d: TreeMap[AggregateId, DoorState] => {
case EventData(_, id, _, Door.Registered(_)) => d + (id -> Open)
case EventData(_, id, _, Door.Closed) => d + (id -> Closed)
case EventData(_, id, _, Door.Opened) => d + (id -> Open)
case EventData(_, id, _, Door.Locked(_)) => d + (id -> Locked)
case EventData(_, id, _, Door.Unlocked(_)) => d + (id -> Closed)
}}.startsWith(TreeMap.empty)
}

object DoorProjection extends Projection[TreeMap[AggregateId, DoorState]] {

def initialData = TreeMap.empty

val listeningFor = List(DoorAggregate.tag)

def accept[E](d: Data) = {
case EventData(_, id, _, Registered(_)) => d + (id -> DoorState.Open)
case EventData(_, id, _, Closed) => d + (id -> DoorState.Closed)
case EventData(_, id, _, Opened) => d + (id -> DoorState.Open)
case EventData(_, id, _, Locked(_)) => d + (id -> DoorState.Locked)
case EventData(_, id, _, Unlocked(_)) => d + (id -> DoorState.Closed)
}
}

57 changes: 57 additions & 0 deletions example/src/main/scala/Domain/OpenDoors.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package Domain

import Cqrs.Aggregate._
import Cqrs.Database.EventData
import Cqrs.Projection

import scala.collection.immutable.TreeMap

object OpenDoors {

final case class DoorState(counters: TreeMap[AggregateId, Int])

final case class Data(nextDoor: Option[AggregateId], doorCounters: TreeMap[AggregateId, DoorState])

def newCountersProjection = Projection.named("openDoorCounters").listeningFor(CounterAggregate, DoorAggregate) { d: Data => {
case EventData(_, id, _, Door.Registered(_)) => Data(Some(id), d.doorCounters.updated(id, DoorState(TreeMap.empty)))
case EventData(_, id, _, Door.Closed) => d.copy(nextDoor = None)
case EventData(_, id, _, Door.Opened) => Data(Some(id), init(d.doorCounters, id, DoorState(TreeMap.empty)))
case EventData(_, id, _, Counter.Incremented) =>
d.nextDoor match {
case Some(doorId) => updateDoorCounter(d, doorId, id, 1, (c: Int) => c + 1)
case _ => d
}
case EventData(_, id, _, Counter.Decremented) =>
d.nextDoor match {
case Some(doorId) => updateDoorCounter(d, doorId, id, -1, (c: Int) => c - 1)
case _ => d
}
}}.startsWith(Data(None, TreeMap.empty))

private def modify[K, V](kv: TreeMap[K, V], k: K, f: Option[V] => V): TreeMap[K, V] = kv.updated(k, (f compose kv.get) (k))

private def modify[K, V](kv: TreeMap[K, V], k: K, init: => V, f: V => V): TreeMap[K, V] = modify(kv, k, (x: Option[V]) => x match {
case Some(v) => f(v);
case None => init
})

private def init[K, V](kv: TreeMap[K, V], k: K, init: => V): TreeMap[K, V] = modify(kv, k, init, identity[V])

private def updateDoorCounter(d: Data, doorId: AggregateId, counterId: AggregateId, init: => Int, update: Int => Int): Data =
d.copy(
doorCounters = modify(
d.doorCounters,
doorId,
DoorState(TreeMap.empty),
(ds: DoorState) => DoorState(
modify(
ds.counters,
counterId,
init,
update
)
)
)
)
}

Loading