Skip to content

Commit 8f23432

Browse files
authored
Behavior overhaul (#11)
1 parent 683372b commit 8f23432

File tree

66 files changed

+1568
-1453
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+1568
-1453
lines changed

.scalafmt.conf

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
danglingParentheses = true
2+
maxColumn = 100
3+
rewrite.rules = [PreferCurlyFors, RedundantBraces]
4+
style = IntelliJ
5+
align.arrowEnumeratorGenerator = true
6+
align.openParenCallSite = false
7+
spaces {
8+
inImportCurlyBraces = true
9+
}

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11

22
[![Build Status](https://img.shields.io/travis/notxcain/aecor/master.svg)](https://travis-ci.org/notxcain/aecor)
33
[![Maven Central](https://img.shields.io/maven-central/v/io.aecor/aecor-core_2.11.svg)](https://github.com/notxcain/aecor)
4-
[![Join the chat at https://gitter.im/notxcain/akka-ddd](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/notxcain/aecor)
4+
[![Join the chat at https://gitter.im/notxcain/aecor](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/notxcain/aecor)
55

66

77
# Aecor

build.sbt

Lines changed: 32 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,9 @@ lazy val buildSettings = Seq(
77
crossScalaVersions := Seq("2.11.8", "2.12.0")
88
)
99

10-
lazy val circeVersion = "0.6.1"
11-
lazy val akkaVersion = "2.4.14"
12-
lazy val akkaHttpVersion = "10.0.0"
13-
lazy val reactiveKafkaVersion = "0.13"
14-
lazy val akkaPersistenceCassandra = "0.21"
15-
lazy val catsVersion = "0.8.1"
16-
lazy val akkaHttpJsonVersion = "1.11.0"
17-
lazy val freekVersion = "0.6.5"
18-
lazy val kryoSerializationVersion = "0.5.1"
10+
lazy val akkaVersion = "2.4.16"
11+
lazy val akkaPersistenceCassandra = "0.22"
12+
lazy val catsVersion = "0.9.0"
1913
lazy val logbackVersion = "1.1.7"
2014

2115
lazy val scalaCheckVersion = "1.13.4"
@@ -27,14 +21,9 @@ lazy val paradiseVersion = "2.1.0"
2721

2822
lazy val commonSettings = Seq(
2923
scalacOptions ++= commonScalacOptions,
30-
resolvers ++= Seq(
31-
Resolver.bintrayRepo("projectseptemberinc", "maven")
32-
),
3324
libraryDependencies ++= Seq(
34-
compilerPlugin(
35-
"org.spire-math" %% "kind-projector" % kindProjectorVersion),
36-
compilerPlugin(
37-
"org.scalamacros" % "paradise" % paradiseVersion cross CrossVersion.full)
25+
compilerPlugin("org.spire-math" %% "kind-projector" % kindProjectorVersion),
26+
compilerPlugin("org.scalamacros" % "paradise" % paradiseVersion cross CrossVersion.full)
3827
),
3928
parallelExecution in Test := false,
4029
scalacOptions in (Compile, doc) := (scalacOptions in (Compile, doc)).value
@@ -49,14 +38,10 @@ lazy val aecor = project
4938
.settings(aecorSettings)
5039
.settings(noPublishSettings)
5140
.aggregate(core, example, schedule, tests)
52-
.dependsOn(core,
53-
example % "compile-internal",
54-
tests % "test-internal -> test")
41+
.dependsOn(core, example % "compile-internal", tests % "test-internal -> test")
5542

56-
lazy val core = project
57-
.settings(moduleName := "aecor-core")
58-
.settings(aecorSettings)
59-
.settings(coreSettings)
43+
lazy val core =
44+
project.settings(moduleName := "aecor-core").settings(aecorSettings).settings(coreSettings)
6045

6146
lazy val schedule = project
6247
.dependsOn(core)
@@ -85,26 +70,33 @@ lazy val coreSettings = Seq(
8570
"com.typesafe.akka" %% "akka-persistence-query-experimental" % akkaVersion,
8671
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
8772
"com.typesafe.akka" %% "akka-persistence-cassandra" % akkaPersistenceCassandra,
88-
"com.typesafe.akka" %% "akka-stream-kafka" % reactiveKafkaVersion,
89-
"ch.qos.logback" % "logback-classic" % logbackVersion,
9073
"com.chuusai" %% "shapeless" % shapelessVersion,
9174
"org.typelevel" %% "cats" % catsVersion
9275
)
9376
)
9477

9578
lazy val scheduleSettings = commonProtobufSettings
9679

97-
lazy val exampleSettings = Seq(
98-
libraryDependencies ++= Seq(
99-
"com.typesafe.akka" %% "akka-http" % akkaHttpVersion,
100-
"de.heikoseeberger" %% "akka-http-circe" % akkaHttpJsonVersion,
101-
("com.projectseptember" %% "freek" % freekVersion)
102-
.exclude("org.typelevel", "cats-free_2.12.0-RC2"),
103-
"io.circe" %% "circe-core" % circeVersion,
104-
"io.circe" %% "circe-generic" % circeVersion,
105-
"io.circe" %% "circe-parser" % circeVersion
80+
lazy val exampleSettings = {
81+
val circeVersion = "0.6.1"
82+
val akkaHttpVersion = "10.0.3"
83+
val akkaHttpJsonVersion = "1.11.0"
84+
val freekVersion = "0.6.5"
85+
Seq(
86+
resolvers ++= Seq(Resolver.bintrayRepo("projectseptemberinc", "maven")),
87+
libraryDependencies ++=
88+
Seq(
89+
"com.typesafe.akka" %% "akka-http" % akkaHttpVersion,
90+
"de.heikoseeberger" %% "akka-http-circe" % akkaHttpJsonVersion,
91+
("com.projectseptember" %% "freek" % freekVersion)
92+
.exclude("org.typelevel", "cats-free_2.12.0-RC2"),
93+
"io.circe" %% "circe-core" % circeVersion,
94+
"io.circe" %% "circe-generic" % circeVersion,
95+
"io.circe" %% "circe-parser" % circeVersion,
96+
"ch.qos.logback" % "logback-classic" % logbackVersion
97+
)
10698
)
107-
)
99+
}
108100

109101
lazy val testingSettings = Seq(
110102
libraryDependencies ++= Seq(
@@ -142,18 +134,11 @@ lazy val commonScalacOptions = Seq(
142134
"-Ypartial-unification"
143135
)
144136

145-
lazy val warnUnusedImport = Seq(
146-
scalacOptions in (Compile, console) ~= {
147-
_.filterNot(Set("-Ywarn-unused-import", "-Ywarn-value-discard"))
148-
},
149-
scalacOptions in (Test, console) := (scalacOptions in (Compile, console)).value
150-
)
137+
lazy val warnUnusedImport = Seq(scalacOptions in (Compile, console) ~= {
138+
_.filterNot(Set("-Ywarn-unused-import", "-Ywarn-value-discard"))
139+
}, scalacOptions in (Test, console) := (scalacOptions in (Compile, console)).value)
151140

152-
lazy val noPublishSettings = Seq(
153-
publish := (),
154-
publishLocal := (),
155-
publishArtifact := false
156-
)
141+
lazy val noPublishSettings = Seq(publish := (), publishLocal := (), publishArtifact := false)
157142

158143
lazy val publishSettings = Seq(
159144
releaseCommitMessage := s"Set version to ${if (releaseUseGlobalVersion.value) (version in ThisBuild).value
@@ -176,10 +161,7 @@ lazy val publishSettings = Seq(
176161
},
177162
autoAPIMappings := true,
178163
scmInfo := Some(
179-
ScmInfo(
180-
url("https://github.com/notxcain/aecor"),
181-
"scm:git:git@github.com:notxcain/aecor.git"
182-
)
164+
ScmInfo(url("https://github.com/notxcain/aecor"), "scm:git:git@github.com:notxcain/aecor.git")
183165
),
184166
pomExtra :=
185167
<developers>

core/src/main/resources/reference.conf

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,10 @@ cassandra-query-journal {
88
}
99

1010
aecor {
11-
aggregate {
11+
akka-runtime {
1212
number-of-shards = 30
1313
ask-timeout = 60s
14-
15-
default-idle-timeout = 120s
16-
17-
idle-timeout {
18-
# per entity name idle timeout here
19-
}
20-
21-
default-snapshot-after = off
22-
23-
snapshot-after {
24-
25-
}
14+
idle-timeout = 60s
2615
}
2716
}
2817

@@ -38,6 +27,15 @@ akka {
3827
"java.lang.String" = java
3928
"akka.dispatch.sysmsg.DeathWatchNotification" = java
4029
}
30+
serialization-identifiers {
31+
"aecor.aggregate.serialization.PersistentReprSerializer" = 100
32+
}
33+
serializers {
34+
persistent-repr = "aecor.aggregate.serialization.PersistentReprSerializer"
35+
}
36+
serialization-bindings {
37+
"aecor.aggregate.serialization.PersistentRepr" = persistent-repr
38+
}
4139
}
4240
persistence {
4341
journal.plugin = "cassandra-journal"
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
package aecor.aggregate
2+
3+
import java.net.URLDecoder
4+
import java.nio.charset.StandardCharsets
5+
import java.time.{ Duration, Instant }
6+
7+
import aecor.aggregate.SnapshotPolicy.{ EachNumberOfEvents, Never }
8+
import aecor.aggregate.serialization.PersistentDecoder.Result
9+
import aecor.aggregate.serialization.{ PersistentDecoder, PersistentEncoder, PersistentRepr }
10+
import aecor.data.{ Folded, Handler }
11+
import akka.NotUsed
12+
import akka.actor.{ ActorLogging, Props, ReceiveTimeout, Stash }
13+
import akka.cluster.sharding.ShardRegion
14+
import akka.persistence.journal.Tagged
15+
import akka.persistence.{ PersistentActor, RecoveryCompleted, SnapshotOffer }
16+
import cats.~>
17+
18+
import scala.concurrent.duration.FiniteDuration
19+
import scala.util.{ Left, Right }
20+
21+
sealed trait SnapshotPolicy[+E]
22+
23+
object SnapshotPolicy {
24+
def never[E]: SnapshotPolicy[E] = Never.asInstanceOf[SnapshotPolicy[E]]
25+
26+
def eachNumberOfEvents[E: PersistentEncoder: PersistentDecoder](
27+
numberOfEvents: Int
28+
): SnapshotPolicy[E] = EachNumberOfEvents(numberOfEvents)
29+
30+
private[aggregate] case object Never extends SnapshotPolicy[Nothing]
31+
32+
private[aggregate] case class EachNumberOfEvents[State: PersistentEncoder: PersistentDecoder](
33+
numberOfEvents: Int
34+
) extends SnapshotPolicy[State] {
35+
def encode(state: State): PersistentRepr = PersistentEncoder[State].encode(state)
36+
def decode(repr: PersistentRepr): Result[State] = PersistentDecoder[State].decode(repr)
37+
}
38+
39+
}
40+
41+
sealed trait Identity
42+
object Identity {
43+
case class Provided(value: String) extends Identity
44+
case object FromPathName extends Identity
45+
}
46+
47+
object AggregateActor {
48+
49+
def props[Command[_], State, Event: PersistentEncoder: PersistentDecoder](
50+
entityName: String,
51+
behavior: Command ~> Handler[State, Event, ?],
52+
identity: Identity,
53+
snapshotPolicy: SnapshotPolicy[State],
54+
tagging: Tagging[Event],
55+
idleTimeout: FiniteDuration
56+
)(implicit folder: Folder[Folded, Event, State]): Props =
57+
Props(new AggregateActor(entityName, behavior, identity, snapshotPolicy, tagging, idleTimeout))
58+
59+
case object Stop
60+
}
61+
62+
/**
63+
*
64+
* Actor encapsulating state of event sourced entity behavior [Behavior]
65+
*
66+
* @param entityName entity name used as persistence prefix and as a tag for all events
67+
* @param behavior entity behavior
68+
* @param identity describes how to extract entity identifier
69+
* @param snapshotPolicy snapshot policy to use
70+
* @param idleTimeout - time with no commands after which graceful actor shutdown is initiated
71+
*/
72+
class AggregateActor[Command[_], State, Event: PersistentEncoder: PersistentDecoder] private[aecor] (
73+
entityName: String,
74+
behavior: Command ~> Handler[State, Event, ?],
75+
identity: Identity,
76+
snapshotPolicy: SnapshotPolicy[State],
77+
tagger: Tagging[Event],
78+
idleTimeout: FiniteDuration
79+
)(implicit folder: Folder[Folded, Event, State])
80+
extends PersistentActor
81+
with Stash
82+
with ActorLogging {
83+
84+
final private val entityId: String = identity match {
85+
case Identity.Provided(value) => value
86+
case Identity.FromPathName =>
87+
URLDecoder.decode(self.path.name, StandardCharsets.UTF_8.name())
88+
}
89+
90+
final override val persistenceId: String = s"$entityName-$entityId"
91+
92+
private val recoveryStartTimestamp: Instant = Instant.now()
93+
94+
log.info("[{}] Starting...", persistenceId)
95+
96+
protected var state: State = folder.zero
97+
98+
private var eventCount = 0L
99+
100+
final override def receiveRecover: Receive = {
101+
case repr: PersistentRepr =>
102+
PersistentDecoder[Event].decode(repr) match {
103+
case Left(cause) =>
104+
onRecoveryFailure(cause, Some(repr))
105+
case Right(event) =>
106+
applyEvent(event)
107+
}
108+
109+
case SnapshotOffer(_, snapshotRepr: PersistentRepr) =>
110+
snapshotPolicy match {
111+
case Never => ()
112+
case e @ EachNumberOfEvents(_) =>
113+
e.decode(snapshotRepr) match {
114+
case Left(cause) =>
115+
onRecoveryFailure(cause, Some(snapshotRepr))
116+
case Right(snapshot) =>
117+
state = snapshot
118+
}
119+
}
120+
121+
case RecoveryCompleted =>
122+
log.info(
123+
"[{}] Recovery to version [{}] completed in [{} ms]",
124+
persistenceId,
125+
lastSequenceNr,
126+
Duration.between(recoveryStartTimestamp, Instant.now()).toMillis
127+
)
128+
setIdleTimeout()
129+
}
130+
131+
final override def receiveCommand: Receive =
132+
receivePassivationMessages.orElse(receiveCommandMessage)
133+
134+
private def receiveCommandMessage: Receive = {
135+
case command =>
136+
handleCommand(command.asInstanceOf[Command[_]])
137+
}
138+
139+
private def handleCommand(command: Command[_]): Unit = {
140+
val (events, reply) = behavior(command).run(state)
141+
log.debug(
142+
"[{}] Command [{}] produced reply [{}] and events [{}]",
143+
persistenceId,
144+
command,
145+
reply,
146+
events
147+
)
148+
val envelopes =
149+
events.map(e => Tagged(PersistentEncoder[Event].encode(e), tagger(e)))
150+
151+
persistAll(envelopes)(_ => ())
152+
153+
deferAsync(NotUsed) { _ =>
154+
events.foreach { event =>
155+
applyEvent(event)
156+
snapshotIfNeeded()
157+
}
158+
sender() ! reply
159+
}
160+
}
161+
162+
private def snapshotIfNeeded(): Unit =
163+
snapshotPolicy match {
164+
case e @ EachNumberOfEvents(numberOfEvents) if eventCount % numberOfEvents == 0 =>
165+
saveSnapshot(e.encode(state))
166+
case _ => ()
167+
}
168+
169+
private def applyEvent(event: Event): Unit = {
170+
state = folder
171+
.fold(state, event)
172+
.getOrElse {
173+
val error = new IllegalStateException(s"Illegal state while applying [$event] to [$state]")
174+
log.error(error, error.getMessage)
175+
throw error
176+
}
177+
eventCount += 1
178+
if (recoveryFinished)
179+
log.debug("[{}] State [{}]", persistenceId, state)
180+
}
181+
182+
private def receivePassivationMessages: Receive = {
183+
case ReceiveTimeout =>
184+
if (shouldPassivate) {
185+
passivate()
186+
} else {
187+
setIdleTimeout()
188+
}
189+
case AggregateActor.Stop =>
190+
context.stop(self)
191+
}
192+
193+
protected def shouldPassivate: Boolean = true
194+
195+
private def passivate(): Unit = {
196+
log.debug("[{}] Passivating...", persistenceId)
197+
context.parent ! ShardRegion.Passivate(AggregateActor.Stop)
198+
}
199+
200+
private def setIdleTimeout(): Unit = {
201+
log.debug("[{}] Setting idle timeout to [{}]", persistenceId, idleTimeout)
202+
context.setReceiveTimeout(idleTimeout)
203+
}
204+
}

0 commit comments

Comments
 (0)