Skip to content

Commit 50bcdb8

Browse files
authored
Add StateT based runtime (#24)
1 parent 233fe63 commit 50bcdb8

File tree

6 files changed

+451
-4
lines changed

6 files changed

+451
-4
lines changed
Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
package aecor.aggregate
22

3+
import cats.{ Foldable, Monad }
4+
import cats.implicits._
5+
36
trait Folder[F[_], A, B] {
47
def zero: B
58
def fold(b: B, a: A): F[B]
9+
def consume[I[_]: Foldable](f: I[A])(implicit F: Monad[F]): F[B] = f.foldM(zero)(fold)
610
}
711

812
object Folder {
@@ -11,5 +15,4 @@ object Folder {
1115
override def zero: B = b
1216
override def fold(b: B, a: A): F[B] = f(b)(a)
1317
}
14-
1518
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package aecor.aggregate
2+
3+
import aecor.data.Handler
4+
import cats.data._
5+
import cats.implicits._
6+
import cats.{ Monad, ~> }
7+
8+
object StateRuntime {
9+
10+
/**
11+
* Creates an aggregate runtime that uses StateT as a target context
12+
*
13+
* This runtime doesn't account for correlation,
14+
* i.e. all operations are executed against common sequence of events
15+
*
16+
*/
17+
def shared[Op[_], S, E, F[_]: Monad](
18+
behavior: Op ~> Handler[S, E, ?]
19+
)(implicit folder: Folder[F, E, S]): Op ~> StateT[F, Vector[E], ?] =
20+
new (Op ~> StateT[F, Vector[E], ?]) {
21+
override def apply[A](fa: Op[A]): StateT[F, Vector[E], A] =
22+
for {
23+
events <- StateT.get[F, Vector[E]]
24+
state <- StateT.lift(folder.consume(events))
25+
result <- {
26+
val (es, r) = behavior(fa).run(state)
27+
StateT.modify[F, Vector[E]](_ ++ es).map(_ => r)
28+
}
29+
} yield result
30+
}
31+
32+
/**
33+
* Creates an aggregate runtime that uses StateT as a target context
34+
*
35+
* This runtime uses correlation function to get entity identifier
36+
* that is used to execute commands against corresponding
37+
* sequence of events
38+
*
39+
*/
40+
def correlated[O[_], S, E, F[_]: Monad](
41+
behavior: O ~> Handler[S, E, ?],
42+
correlation: Correlation[O]
43+
)(implicit folder: Folder[F, E, S]): O ~> StateT[F, Map[String, Vector[E]], ?] =
44+
new (O ~> StateT[F, Map[String, Vector[E]], ?]) {
45+
override def apply[A](fa: O[A]): StateT[F, Map[String, Vector[E]], A] = {
46+
val inner: O ~> StateT[F, Vector[E], ?] = shared(behavior)
47+
val entityId = correlation(fa)
48+
inner(fa).transformS(_.getOrElse(entityId, Vector.empty[E]), _.updated(entityId, _))
49+
}
50+
}
51+
}

core/src/main/scala/aecor/data/Folded.scala

Lines changed: 137 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,21 @@
11
package aecor.data
22

33
import aecor.data.Folded.{ Impossible, Next }
4+
import cats.kernel.Eq
5+
import cats.{
6+
Alternative,
7+
Applicative,
8+
CoflatMap,
9+
Eval,
10+
Monad,
11+
MonadCombine,
12+
MonadError,
13+
Now,
14+
Show,
15+
TraverseFilter
16+
}
17+
18+
import scala.annotation.tailrec
419

520
sealed abstract class Folded[+A] extends Product with Serializable {
621
def fold[B](impossible: => B, next: A => B): B = this match {
@@ -19,10 +34,25 @@ sealed abstract class Folded[+A] extends Product with Serializable {
1934
case Impossible => that
2035
case Next(a) => a
2136
}
37+
def orElse[AA >: A](that: Folded[AA]): Folded[AA] = this match {
38+
case Next(_) => this
39+
case Impossible => that
40+
}
41+
def isNext: Boolean = fold(false, _ => true)
42+
def isImpossible: Boolean = !isNext
43+
44+
def filter(f: A => Boolean): Folded[A] = this match {
45+
case Next(a) if f(a) => this
46+
case _ => Impossible
47+
}
48+
def exists(f: A => Boolean): Boolean = filter(f).isNext
49+
def forall(f: A => Boolean): Boolean = fold(true, f)
50+
51+
def toOption: Option[A] = fold(None, Some(_))
2252
}
23-
object Folded {
24-
private final case object Impossible extends Folded[Nothing]
25-
private final case class Next[+A](a: A) extends Folded[A]
53+
object Folded extends FoldedInstances {
54+
final case object Impossible extends Folded[Nothing]
55+
final case class Next[+A](a: A) extends Folded[A]
2656
def impossible[A]: Folded[A] = Impossible
2757
def next[A](a: A): Folded[A] = Next(a)
2858
object syntax {
@@ -32,3 +62,107 @@ object Folded {
3262
def impossible[A]: Folded[A] = Folded.impossible
3363
}
3464
}
65+
66+
trait FoldedInstances {
67+
implicit val aecorDataInstancesForFolded
68+
: TraverseFilter[Folded] with MonadError[Folded, Unit] with MonadCombine[Folded] with Monad[
69+
Folded
70+
] with CoflatMap[Folded] with Alternative[Folded] =
71+
new TraverseFilter[Folded] with MonadError[Folded, Unit] with MonadCombine[Folded]
72+
with Monad[Folded] with CoflatMap[Folded] with Alternative[Folded] {
73+
74+
def empty[A]: Folded[A] = Impossible
75+
76+
def combineK[A](x: Folded[A], y: Folded[A]): Folded[A] = x orElse y
77+
78+
def pure[A](x: A): Folded[A] = Next(x)
79+
80+
override def map[A, B](fa: Folded[A])(f: A => B): Folded[B] =
81+
fa.map(f)
82+
83+
def flatMap[A, B](fa: Folded[A])(f: A => Folded[B]): Folded[B] =
84+
fa.flatMap(f)
85+
86+
@tailrec
87+
def tailRecM[A, B](a: A)(f: A => Folded[Either[A, B]]): Folded[B] =
88+
f(a) match {
89+
case Impossible => Impossible
90+
case Next(Left(a1)) => tailRecM(a1)(f)
91+
case Next(Right(b)) => Next(b)
92+
}
93+
94+
override def map2[A, B, Z](fa: Folded[A], fb: Folded[B])(f: (A, B) => Z): Folded[Z] =
95+
fa.flatMap(a => fb.map(b => f(a, b)))
96+
97+
override def map2Eval[A, B, Z](fa: Folded[A],
98+
fb: Eval[Folded[B]])(f: (A, B) => Z): Eval[Folded[Z]] =
99+
fa match {
100+
case Impossible => Now(Impossible)
101+
case Next(a) => fb.map(_.map(f(a, _)))
102+
}
103+
104+
def coflatMap[A, B](fa: Folded[A])(f: Folded[A] => B): Folded[B] =
105+
if (fa.isNext) Next(f(fa)) else Impossible
106+
107+
def foldLeft[A, B](fa: Folded[A], b: B)(f: (B, A) => B): B =
108+
fa match {
109+
case Impossible => b
110+
case Next(a) => f(b, a)
111+
}
112+
113+
def foldRight[A, B](fa: Folded[A], lb: Eval[B])(f: (A, Eval[B]) => Eval[B]): Eval[B] =
114+
fa match {
115+
case Impossible => lb
116+
case Next(a) => f(a, lb)
117+
}
118+
119+
def raiseError[A](e: Unit): Folded[A] = Impossible
120+
121+
def handleErrorWith[A](fa: Folded[A])(f: (Unit) => Folded[A]): Folded[A] = fa orElse f(())
122+
123+
def traverseFilter[G[_], A, B](
124+
fa: Folded[A]
125+
)(f: A => G[Option[B]])(implicit G: Applicative[G]): G[Folded[B]] =
126+
fa match {
127+
case Impossible => G.pure(Impossible)
128+
case Next(a) =>
129+
G.map(f(a)) {
130+
case Some(aa) => Next(aa)
131+
case None => Impossible
132+
}
133+
}
134+
135+
override def traverse[G[_]: Applicative, A, B](fa: Folded[A])(f: A => G[B]): G[Folded[B]] =
136+
fa match {
137+
case Impossible => Applicative[G].pure(Impossible)
138+
case Next(a) => Applicative[G].map(f(a))(Next(_))
139+
}
140+
141+
override def filter[A](fa: Folded[A])(p: A => Boolean): Folded[A] =
142+
fa.filter(p)
143+
144+
override def exists[A](fa: Folded[A])(p: A => Boolean): Boolean =
145+
fa.exists(p)
146+
147+
override def forall[A](fa: Folded[A])(p: A => Boolean): Boolean =
148+
fa.forall(p)
149+
150+
override def isEmpty[A](fa: Folded[A]): Boolean =
151+
fa.isImpossible
152+
}
153+
154+
implicit def aecorDataShowForFolded[A](implicit A: Show[A]): Show[Folded[A]] =
155+
new Show[Folded[A]] {
156+
def show(fa: Folded[A]): String = fa match {
157+
case Next(a) => s"Next(${A.show(a)})"
158+
case Impossible => "Impossible"
159+
}
160+
}
161+
162+
implicit def aecorDataEqForFolded[A](implicit A: Eq[A]): Eq[Folded[A]] =
163+
Eq.instance {
164+
case (Next(l), Next(r)) => A.eqv(l, r)
165+
case (Impossible, Impossible) => true
166+
case _ => false
167+
}
168+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
package aecor.tests
2+
3+
import aecor.data.Folded
4+
import cats.{ Cartesian, CoflatMap, Eval, Later, Monad, MonadCombine, MonadError, TraverseFilter }
5+
import cats.laws.{ ApplicativeLaws, CoflatMapLaws, FlatMapLaws, MonadLaws }
6+
import cats.laws.discipline._
7+
import Folded.syntax._
8+
import org.scalacheck.{ Arbitrary, Cogen }
9+
10+
class FoldedTests extends LawSuite {
11+
12+
implicit def arbitraryFolded[A](implicit A: Arbitrary[Option[A]]): Arbitrary[Folded[A]] =
13+
Arbitrary(A.arbitrary.map(_.map(_.next).getOrElse(impossible)))
14+
15+
implicit def cogenFolded[A](implicit A: Cogen[Option[A]]): Cogen[Folded[A]] =
16+
A.contramap(_.toOption)
17+
18+
checkAll("Folded[Int]", CartesianTests[Folded].cartesian[Int, Int, Int])
19+
checkAll("Cartesian[Folded]", SerializableTests.serializable(Cartesian[Folded]))
20+
21+
checkAll("Folded[Int]", CoflatMapTests[Folded].coflatMap[Int, Int, Int])
22+
checkAll("CoflatMap[Folded]", SerializableTests.serializable(CoflatMap[Folded]))
23+
24+
checkAll("Folded[Int]", MonadCombineTests[Folded].monadCombine[Int, Int, Int])
25+
checkAll("MonadCombine[Folded]", SerializableTests.serializable(MonadCombine[Folded]))
26+
27+
checkAll("Folded[Int]", MonadTests[Folded].monad[Int, Int, Int])
28+
checkAll("Monad[Folded]", SerializableTests.serializable(Monad[Folded]))
29+
30+
checkAll(
31+
"Folded[Int] with Folded",
32+
TraverseFilterTests[Folded].traverseFilter[Int, Int, Int, Int, Folded, Folded]
33+
)
34+
checkAll("TraverseFilter[Folded]", SerializableTests.serializable(TraverseFilter[Folded]))
35+
36+
checkAll("Folded with Unit", MonadErrorTests[Folded, Unit].monadError[Int, Int, Int])
37+
checkAll("MonadError[Folded, Unit]", SerializableTests.serializable(MonadError[Folded, Unit]))
38+
39+
test("show") {
40+
impossible[Int].show should ===("Impossible")
41+
1.next.show should ===("Next(1)")
42+
43+
forAll { fs: Folded[String] =>
44+
fs.show should ===(fs.toString)
45+
}
46+
}
47+
48+
// The following tests check laws which are a different formulation of
49+
// laws that are checked. Since these laws are more or less duplicates of
50+
// existing laws, we don't check them for all types that have the relevant
51+
// instances.
52+
53+
test("Kleisli associativity") {
54+
forAll { (l: Long, f: Long => Folded[Int], g: Int => Folded[Char], h: Char => Folded[String]) =>
55+
val isEq = FlatMapLaws[Folded].kleisliAssociativity(f, g, h, l)
56+
isEq.lhs should ===(isEq.rhs)
57+
}
58+
}
59+
60+
test("Cokleisli associativity") {
61+
forAll { (l: Folded[Long], f: Folded[Long] => Int, g: Folded[Int] => Char, h: Folded[Char] => String) =>
62+
val isEq = CoflatMapLaws[Folded].cokleisliAssociativity(f, g, h, l)
63+
isEq.lhs should ===(isEq.rhs)
64+
}
65+
}
66+
67+
test("applicative composition") {
68+
forAll { (fa: Folded[Int], fab: Folded[Int => Long], fbc: Folded[Long => Char]) =>
69+
val isEq = ApplicativeLaws[Folded].applicativeComposition(fa, fab, fbc)
70+
isEq.lhs should ===(isEq.rhs)
71+
}
72+
}
73+
74+
val monadLaws = MonadLaws[Folded]
75+
76+
test("Kleisli left identity") {
77+
forAll { (a: Int, f: Int => Folded[Long]) =>
78+
val isEq = monadLaws.kleisliLeftIdentity(a, f)
79+
isEq.lhs should ===(isEq.rhs)
80+
}
81+
}
82+
83+
test("Kleisli right identity") {
84+
forAll { (a: Int, f: Int => Folded[Long]) =>
85+
val isEq = monadLaws.kleisliRightIdentity(a, f)
86+
isEq.lhs should ===(isEq.rhs)
87+
}
88+
}
89+
90+
test("map2Eval is lazy") {
91+
val bomb: Eval[Folded[Int]] = Later(sys.error("boom"))
92+
impossible[Int].map2Eval(bomb)(_ + _).value should ===(impossible[Int])
93+
}
94+
}

0 commit comments

Comments
 (0)