Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions core/src/main/scala/AggregateSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ import Cqrs.Aggregate.{ AggregateId, DatabaseWithAggregateFailure }
import Cqrs.Database.FoldableDatabase._
import Cqrs.Database._
import Cqrs.DbAdapters.InMemoryDb._
import Cqrs.{ Aggregate, Database, Projection, ProjectionRunner }
import Cqrs.{ Aggregate, Database, ProjectionRunner }
import cats.implicits._

import scala.concurrent.Await
import scala.concurrent.duration._
import scala.reflect.ClassTag

trait AggregateSpec {

Expand Down Expand Up @@ -48,8 +47,6 @@ trait AggregateSpec {
def failedCommandError[E, C, D, S](aggregate: Aggregate[E, C, D, S], id: AggregateId, cmd: C): Aggregate.Error =
act(db, aggregate.loadAndHandleCommand(id, cmd))
.fold(identity, _ => failStop("Command did not fail, although was expected to"))

def projectionData[D: ClassTag](projection: Projection[D]) = db.getProjectionData[D](projection)
}

def newDb(projections: ProjectionRunner*): GivenSteps = GivenSteps(newInMemoryDb(projections: _*))
Expand Down
3 changes: 0 additions & 3 deletions core/src/main/scala/Cqrs/Database.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import cats.free.Free.liftF
import cats.{ Monad, MonadError }

import scala.concurrent.Future
import scala.reflect.ClassTag
import scala.util.Try
import cats.implicits._

Expand Down Expand Up @@ -107,8 +106,6 @@ object Database {
case Right(Right(ret)) => Right(ret)
}
}

def getProjectionData[D: ClassTag](projection: Projection[D]): Option[D]
}

trait FoldableDatabase {
Expand Down
8 changes: 1 addition & 7 deletions core/src/main/scala/Cqrs/DbAdapters/InMemoryDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,14 @@ package Cqrs.DbAdapters
import Cqrs.Aggregate._
import Cqrs.Database.FoldableDatabase.{ EventDataConsumer, RawEventData }
import Cqrs.Database.{ Error, _ }
import Cqrs.{ Projection, ProjectionRunner }
import Cqrs.ProjectionRunner
import cats._
import cats.data.State
//import cats.std.all._
import cats.implicits._
import lib.foldM

import scala.collection.immutable.TreeMap
import scala.concurrent.Future
import scala.reflect.ClassTag

object InMemoryDb {

Expand Down Expand Up @@ -141,10 +139,6 @@ object InMemoryDb {
Future.successful(r)
}

def getProjectionData[D: ClassTag](projection: Projection[D]): Option[D] = synchronized {
db.projections.foldLeft(None: Option[D])((ret, p) => ret.orElse(p.getProjectionData[D](projection)))
}

def consumeDbEvents[D](fromOperation: Long, initData: D, queries: List[EventDataConsumer[D]]): Error Either (Long, D) = synchronized {

def findData(tag: String, id: String, version: Int): Error Either String = {
Expand Down
57 changes: 30 additions & 27 deletions core/src/main/scala/Cqrs/Projection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,52 @@ package Cqrs
import Cqrs.Aggregate._
import Cqrs.Database.EventData

import scala.reflect.ClassTag

trait Projection[D] {
type Data = D

def initialData: Data
object Projection {
def listeningFor(events: EventTag*) = new OnEventKeyword {
def onEvent[D](handler: D => PartialFunction[EventData[_], D]) = new StartsWithKeyword[D] {
def startsWith(initialData: D) = new ConcreteProjectionRunner[D](events.toList, handler, initialData)
}
}

def listeningFor: List[EventTag]
trait OnEventKeyword {
def onEvent[D](handler: D => PartialFunction[EventData[_], D]): StartsWithKeyword[D]
}
trait StartsWithKeyword[D] {
def startsWith(initialData: D): ProjectionRunner
}
}

def accept[E](data: Data): PartialFunction[EventData[E], Data]
trait ProjectionSubscriber[D] {
def update(data: D)
}

trait ProjectionRunner {
def listeningFor: List[EventTag]

def accept[E](eventData: EventData[E]): ProjectionRunner

def getProjectionData[D: ClassTag](projection: Projection[D]): Option[D]
}

import scala.language.implicitConversions

object ProjectionRunner {
implicit def createProjectionRunner[D](p: Projection[D]): ProjectionRunner = ConcreteProjectionRunner[D](p, p.initialData)
}
final class ConcreteProjectionRunner[Data](val listeningFor: List[EventTag], handler: Data => PartialFunction[EventData[_], Data], initData: Data) extends ProjectionRunner {

case class ConcreteProjectionRunner[Data](proj: Projection[Data], data: Data) extends ProjectionRunner {
def listeningFor = proj.listeningFor
private var data: Data = initData
private var subscribers: List[ProjectionSubscriber[Data]] = List.empty

def accept[E](eventData: EventData[E]) =
proj.accept(data).lift(eventData) match {
case Some(newData) => copy(data = newData)
def accept[E](eventData: EventData[E]) = this.synchronized {
handler(data).lift(eventData) match {
case Some(newData) =>
subscribers.foreach(_.update(newData))
data = newData
this
case None => this
}
}

def getProjectionData[D: ClassTag](projection: Projection[D]): Option[D] = {
if (proj.getClass.getName == projection.getClass.getName) {
data match {
case asD: D => Some(asD)
case _ => None
}
}
else None
def subscribe(subscriber: ProjectionSubscriber[Data]) = this.synchronized {
subscribers = subscriber :: subscribers
this
}

def getData = this.synchronized(data)
}

Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ import java.io.Closeable

import Cqrs.Aggregate._
import Cqrs.Database.{ Error, _ }
import Cqrs.{ Projection, ProjectionRunner }
import Cqrs.ProjectionRunner
import akka.actor.ActorSystem
import cats._
import cats.implicits._
import eventstore._

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.reflect.ClassTag

object EventStore {

Expand Down Expand Up @@ -161,9 +160,6 @@ object EventStore {
}
}

def getProjectionData[D: ClassTag](projection: Projection[D]): Option[D] = {
projections.foldLeft(None: Option[D])((ret, p) => ret.orElse(p.getProjectionData[D](projection)))
}
}

def newEventStoreConn(projections: ProjectionRunner*): DbBackend = {
Expand Down
22 changes: 9 additions & 13 deletions example/src/main/scala/Domain/Counter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import Domain.Counter._
import scala.collection.immutable.TreeMap



object Counter {

sealed trait Event
Expand All @@ -26,6 +25,15 @@ object Counter {
case object Increment extends Command

case object Decrement extends Command

val currentValueProjection = Projection.
listeningFor(CounterAggregate.tag).
onEvent((d: TreeMap[AggregateId, Int]) => {
case EventData(_, id, _, Created(_, start)) => d + (id -> start)
case EventData(_, id, _, Incremented) => d + (id -> d.get(id).fold(1)(_ + 1))
case EventData(_, id, _, Decremented) => d + (id -> d.get(id).fold(-1)(_ - 1))
}).
startsWith(TreeMap.empty)
}

object CounterAggregate extends EventFlow[Event, Command] {
Expand All @@ -42,15 +50,3 @@ object CounterAggregate extends EventFlow[Event, Command] {
}
}

object CounterProjection extends Projection[TreeMap[AggregateId, Int]] {
def initialData = TreeMap.empty

val listeningFor = List(CounterAggregate.tag)

def accept[E](d: Data) = {
case EventData(_, id, _, Created(_, start)) => d + (id -> start)
case EventData(_, id, _, Incremented) => d + (id -> d.get(id).fold(1)(_ + 1))
case EventData(_, id, _, Decremented) => d + (id -> d.get(id).fold(-1)(_ - 1))
}
}

21 changes: 7 additions & 14 deletions example/src/main/scala/Domain/Door.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,20 +80,13 @@ object DoorState {

case object Locked extends DoorState

val currentStateProjection = Projection.listeningFor(DoorAggregate.tag).onEvent { d: TreeMap[AggregateId, DoorState] => {
case EventData(_, id, _, Door.Registered(_)) => d + (id -> Open)
case EventData(_, id, _, Door.Closed) => d + (id -> Closed)
case EventData(_, id, _, Door.Opened) => d + (id -> Open)
case EventData(_, id, _, Door.Locked(_)) => d + (id -> Locked)
case EventData(_, id, _, Door.Unlocked(_)) => d + (id -> Closed)
}}.startsWith(TreeMap.empty)
}

object DoorProjection extends Projection[TreeMap[AggregateId, DoorState]] {

def initialData = TreeMap.empty

val listeningFor = List(DoorAggregate.tag)

def accept[E](d: Data) = {
case EventData(_, id, _, Registered(_)) => d + (id -> DoorState.Open)
case EventData(_, id, _, Closed) => d + (id -> DoorState.Closed)
case EventData(_, id, _, Opened) => d + (id -> DoorState.Open)
case EventData(_, id, _, Locked(_)) => d + (id -> DoorState.Locked)
case EventData(_, id, _, Unlocked(_)) => d + (id -> DoorState.Closed)
}
}

57 changes: 57 additions & 0 deletions example/src/main/scala/Domain/OpenDoors.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package Domain

import Cqrs.Aggregate._
import Cqrs.Database.EventData
import Cqrs.Projection

import scala.collection.immutable.TreeMap

object OpenDoors {

final case class DoorState(counters: TreeMap[AggregateId, Int])

final case class Data(nextDoor: Option[AggregateId], doorCounters: TreeMap[AggregateId, DoorState])

val countersProjection = Projection.listeningFor(CounterAggregate.tag, DoorAggregate.tag).onEvent{ d: Data => {
case EventData(_, id, _, Door.Registered(_)) => Data(Some(id), d.doorCounters.updated(id, DoorState(TreeMap.empty)))
case EventData(_, id, _, Door.Closed) => d.copy(nextDoor = None)
case EventData(_, id, _, Door.Opened) => Data(Some(id), init(d.doorCounters, id, DoorState(TreeMap.empty)))
case EventData(_, id, _, Counter.Incremented) =>
d.nextDoor match {
case Some(doorId) => updateDoorCounter(d, doorId, id, 1, (c: Int) => c + 1)
case _ => d
}
case EventData(_, id, _, Counter.Decremented) =>
d.nextDoor match {
case Some(doorId) => updateDoorCounter(d, doorId, id, -1, (c: Int) => c - 1)
case _ => d
}
}}.startsWith(Data(None, TreeMap.empty))

private def modify[K, V](kv: TreeMap[K, V], k: K, f: Option[V] => V): TreeMap[K, V] = kv.updated(k, (f compose kv.get) (k))

private def modify[K, V](kv: TreeMap[K, V], k: K, init: => V, f: V => V): TreeMap[K, V] = modify(kv, k, (x: Option[V]) => x match {
case Some(v) => f(v);
case None => init
})

private def init[K, V](kv: TreeMap[K, V], k: K, init: => V): TreeMap[K, V] = modify(kv, k, init, identity[V])

private def updateDoorCounter(d: Data, doorId: AggregateId, counterId: AggregateId, init: => Int, update: Int => Int): Data =
d.copy(
doorCounters = modify(
d.doorCounters,
doorId,
DoorState(TreeMap.empty),
(ds: DoorState) => DoorState(
modify(
ds.counters,
counterId,
init,
update
)
)
)
)
}

65 changes: 0 additions & 65 deletions example/src/main/scala/Domain/OpenDoorsCounters.scala

This file was deleted.

10 changes: 5 additions & 5 deletions example/src/main/scala/EventflowExample.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,26 +50,26 @@ object EventflowExample {

val futurePool = FuturePool(Executors.newCachedThreadPool())

val db = newEventStoreConn(CounterProjection, DoorProjection, OpenDoorsCountersProjection)
val db = newEventStoreConn(Counter.currentValueProjection, DoorState.currentStateProjection, OpenDoors.countersProjection)

val counter = commandEndpoint("counter", CounterAggregate)

val counterRead = projectionEndpoint("counter" :: string) {
id => db.getProjectionData(CounterProjection).flatMap(_.get(id))
id => Counter.currentValueProjection.getData.get(id)
}

val countersRead = projectionEndpoint("counters") {
_ => db.getProjectionData(CounterProjection).map(_.keySet.toList)
_ => Some(Counter.currentValueProjection.getData.keySet.toList)
}

val door = commandEndpoint("door", DoorAggregate)

val doorRead = projectionEndpoint("door" :: string) {
id => db.getProjectionData(DoorProjection).flatMap(_.get(id))
id => DoorState.currentStateProjection.getData.get(id)
}

val doorsRead = projectionEndpoint("doors") {
_ => db.getProjectionData(DoorProjection).map(_.keySet.toList)
_ => Some(DoorState.currentStateProjection.getData.keySet.toList)
}

def main(args: Array[String]) = {
Expand Down
Loading