Skip to content

Commit b2b989d

Browse files
dfakhritdinovDenys Fakhritdinov
andauthored
Stop actor after persisting events failed (#319)
Co-authored-by: Denys Fakhritdinov <dfakhritdinov@evolution.com>
1 parent a3d64de commit b2b989d

File tree

8 files changed

+211
-38
lines changed

8 files changed

+211
-38
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,6 @@ metals.sbt
2929
.metals
3030
.bloop
3131

32+
.vscode/
33+
3234
ignored

.scalafix.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ OrganizeImports {
44
preset = INTELLIJ_2020_3
55
#removeUnused = false # `true` is not supported in Scala 3.3.0
66
targetDialect = Auto
7-
}
7+
}

build.sbt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,16 @@ lazy val commonSettings = Seq(
1717
licenses := Seq(("MIT", url("https://opensource.org/licenses/MIT"))),
1818
)
1919

20+
// TODO remove after 4.1.4 is released
21+
// https://github.com/lightbend-labs/mima/issues/738
22+
// should be fixed by https://github.com/lightbend-labs/mima/issues/778
23+
import com.typesafe.tools.mima.core._
24+
val mimaExclude = Seq(
25+
"com.evolutiongaming.akkaeffect.persistence.EventSourcedActorOf#EventStoreOps.asJournaller",
26+
"com.evolutiongaming.akkaeffect.persistence.EventSourcedActorOf#EventStoreOps.asJournaller$extension",
27+
).map(ProblemFilters.exclude[DirectMissingMethodProblem])
28+
ThisBuild / mimaBinaryIssueFilters ++= mimaExclude
29+
2030
val alias: Seq[sbt.Def.Setting[_]] =
2131
addCommandAlias("fmt", "scalafixEnable; scalafixAll; all scalafmtAll scalafmtSbt") ++
2232
addCommandAlias(

persistence/src/main/scala/akka/persistence/EventStoreInterop.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ object EventStoreInterop {
180180
persistentActor = actor.ref,
181181
)
182182
_ <- journaller.tell(request)
183-
_ <- log.debug("recovery: events from Akka percictence requested")
183+
_ <- log.debug("recovery: events from Akka persistence requested")
184184
} yield new sstream.Stream[F, EventStore.Persisted[Any]] {
185185

186186
override def foldWhileM[L, R](l: L)(f: (L, EventStore.Persisted[Any]) => F[Either[L, R]]): F[Either[L, R]] = {

persistence/src/main/scala/com/evolutiongaming/akkaeffect/persistence/EventSourcedActorOf.scala

Lines changed: 45 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@ package com.evolutiongaming.akkaeffect.persistence
22

33
import akka.actor.Actor
44
import akka.persistence.SnapshotSelectionCriteria
5-
import cats.Monad
65
import cats.effect.implicits.effectResourceOps
7-
import cats.effect.{Async, Ref, Resource}
6+
import cats.effect.{Async, Concurrent, Ref, Resource}
87
import cats.syntax.all.*
98
import com.evolutiongaming.akkaeffect.*
109
import com.evolutiongaming.akkaeffect.persistence.SeqNr
11-
import com.evolutiongaming.catshelper.{LogOf, ToFuture}
10+
import com.evolutiongaming.catshelper.{Log, LogOf, ToFuture}
1211

1312
import java.time.Instant
1413

@@ -70,12 +69,11 @@ object EventSourcedActorOf {
7069
LogOf
7170
.log[F, EventSourcedActorOf.type]
7271
.toResource
73-
.map { log =>
74-
log
75-
.prefixed(actorCtx.self.path.name)
76-
.mapK(Resource.liftK[F])
77-
}
78-
.flatMap { implicit log =>
72+
.flatMap { implicit log0 =>
73+
implicit val log =
74+
log0
75+
.prefixed(actorCtx.self.path.name)
76+
.mapK(Resource.liftK[F])
7977
val receive = for {
8078
eventSourced <- eventSourcedOf(actorCtx).toResource
8179
recoveryStarted <- eventSourced.value
@@ -116,9 +114,9 @@ object EventSourcedActorOf {
116114
} yield seqNr
117115
}.toResource
118116

119-
_ <- log.debug(s"recovery completed with seqNr $seqNr")
120-
seqNrRef <- Ref[F].of(seqNr).toResource
121-
receive <- recovering.completed(seqNr, eventStore.asJournaller(seqNrRef), snapshotStore.asSnapshotter)
117+
_ <- log.debug(s"recovery completed with seqNr $seqNr")
118+
journaller <- eventStore.asJournaller(actorCtx, seqNr).toResource
119+
receive <- recovering.completed(seqNr, journaller, snapshotStore.asSnapshotter)
122120
} yield receive
123121

124122
receive.onError {
@@ -158,30 +156,46 @@ object EventSourcedActorOf {
158156

159157
implicit final private class EventStoreOps[F[_], E](val store: EventStore[F, E]) extends AnyVal {
160158

161-
def asJournaller(seqNrRef: Ref[F, SeqNr])(implicit F: Monad[F]): Journaller[F, E] = new Journaller[F, E] {
162-
163-
val append = new Append[F, E] {
164-
165-
def apply(events: Events[E]): F[F[SeqNr]] =
166-
seqNrRef
167-
.modify { seqNr0 =>
168-
events.mapAccumulate(seqNr0) {
169-
case (seqNr0, event) =>
170-
val seqNr1 = seqNr0 + 1
171-
seqNr1 -> EventStore.Event(event, seqNr1)
159+
def asJournaller(actorCtx: ActorCtx[F], seqNr: SeqNr)(implicit F: Concurrent[F], log: Log[F]): F[Journaller[F, E]] =
160+
for {
161+
seqNrRef <- Ref[F].of(seqNr)
162+
} yield new Journaller[F, E] {
163+
val append = new Append[F, E] {
164+
165+
def apply(events: Events[E]): F[F[SeqNr]] =
166+
seqNrRef
167+
.modify { seqNr =>
168+
events.mapAccumulate(seqNr) {
169+
case (seqNr0, event) =>
170+
val seqNr1 = seqNr0 + 1
171+
seqNr1 -> EventStore.Event(event, seqNr1)
172+
}
173+
}
174+
.flatMap { events =>
175+
def handleError(err: Throwable) = {
176+
val from = events.values.head.head.seqNr
177+
val to = events.values.last.last.seqNr
178+
stopActor(from, to, err)
179+
}
180+
store
181+
.save(events)
182+
.onError(handleError)
183+
.flatTap(_.onError(handleError))
172184
}
173-
}
174-
.flatMap { events =>
175-
store.save(events)
176-
}
177185

178-
}
186+
private def stopActor(from: SeqNr, to: SeqNr, error: Throwable): F[Unit] =
187+
for {
188+
_ <- log.error(s"failed to append events with seqNr range [$from .. $to], stopping actor", error)
189+
_ <- actorCtx.stop
190+
} yield {}
191+
192+
}
179193

180-
val deleteTo = new DeleteEventsTo[F] {
194+
val deleteTo = new DeleteEventsTo[F] {
181195

182-
def apply(seqNr: SeqNr): F[F[Unit]] = store.deleteTo(seqNr)
196+
def apply(seqNr: SeqNr): F[F[Unit]] = store.deleteTo(seqNr)
183197

198+
}
184199
}
185-
}
186200
}
187201
}

persistence/src/test/resources/test.conf

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,22 +9,27 @@ akka {
99
}
1010
}
1111

12+
fail-on-event-journal {
13+
class = "com.evolutiongaming.akkaeffect.persistence.FailOnEventJournal"
14+
plugin-dispatcher = "akka.actor.default-dispatcher"
15+
}
16+
1217
failing-journal {
13-
class = "akka.persistence.FailingJournal"
18+
class = "akka.persistence.FailingJournal"
1419
plugin-dispatcher = "akka.actor.default-dispatcher"
1520
}
1621

1722
infinite-journal {
18-
class = "akka.persistence.InfiniteJournal"
23+
class = "akka.persistence.InfiniteJournal"
1924
plugin-dispatcher = "akka.actor.default-dispatcher"
2025
}
2126

2227
failing-snapshot {
23-
class = "akka.persistence.FailingSnapshotter"
28+
class = "akka.persistence.FailingSnapshotter"
2429
plugin-dispatcher = "akka.actor.default-dispatcher"
2530
}
2631

2732
infinite-snapshot {
28-
class = "akka.persistence.InfiniteSnapshotter"
33+
class = "akka.persistence.InfiniteSnapshotter"
2934
plugin-dispatcher = "akka.actor.default-dispatcher"
3035
}
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
package com.evolutiongaming.akkaeffect.persistence
2+
3+
import akka.actor.Props
4+
import akka.persistence.journal.AsyncWriteJournal
5+
import akka.persistence.{AtomicWrite, PersistentRepr}
6+
import cats.effect.unsafe.implicits.global
7+
import cats.effect.{IO, Resource}
8+
import cats.syntax.all.*
9+
import com.evolutiongaming.akkaeffect.testkit.TestActorSystem
10+
import com.evolutiongaming.akkaeffect.{Envelope, Receive}
11+
import com.evolutiongaming.catshelper.LogOf
12+
import org.scalatest.funsuite.AnyFunSuite
13+
import org.scalatest.matchers.should.Matchers
14+
15+
import scala.concurrent.Future
16+
import scala.concurrent.duration.*
17+
import scala.util.Try
18+
19+
class PersistenceFailureTest extends AnyFunSuite with Matchers {
20+
21+
sealed trait Event extends FailOnEventJournal.Event
22+
object GoodEvent extends Event { val fail = false }
23+
object FailEvent extends Event { val fail = true }
24+
25+
val eventSourced = EventSourcedOf.const {
26+
IO {
27+
28+
val journal = "fail-on-event-journal"
29+
val snapshot = "inmemory-snapshot-store"
30+
31+
EventSourced(
32+
eventSourcedId = EventSourcedId("test"),
33+
pluginIds = PluginIds(journal, snapshot),
34+
value = RecoveryStarted[Unit] { (_, _) =>
35+
Recovering[Unit] {
36+
Replay.empty[IO, Event].pure[Resource[IO, *]]
37+
} { (_, journaller, _) =>
38+
Receive[Envelope[Event]] { envelope =>
39+
// persist event in forked thread thus don't fail actor directly
40+
journaller.append(Events.of(envelope.msg)).flatten.start.as(false)
41+
} {
42+
IO(true)
43+
}.pure[Resource[IO, *]]
44+
}.pure[Resource[IO, *]]
45+
}.typeless(
46+
sf = _ => IO.unit,
47+
ef = e => IO(e.asInstanceOf[Event]),
48+
cf = c => IO(c.asInstanceOf[Event]),
49+
).pure[Resource[IO, *]],
50+
)
51+
}
52+
}
53+
54+
test("EventSourcedActorOf based actor must fail if journal fails to persist message") {
55+
56+
implicit val log = LogOf.empty[IO]
57+
58+
TestActorSystem[IO]("testing", none)
59+
.use { system =>
60+
val persistence = EventSourcedPersistence.fromAkkaPlugins[IO](system, 1.second, 100)
61+
def actor = EventSourcedActorOf.actor[IO, Any, Any](eventSourced, persistence)
62+
val ref = system.actorOf(Props(actor))
63+
def tell = IO(ref ! GoodEvent)
64+
def fail = IO(ref ! FailEvent)
65+
def look = IO.fromFuture(IO(system.actorSelection(ref.path).resolveOne(2.seconds)))
66+
67+
for {
68+
_ <- tell
69+
_ <- tell
70+
_ <- IO.sleep(1.second)
71+
r <- look
72+
_ = r shouldBe ref
73+
_ <- fail
74+
_ <- IO.sleep(1.second)
75+
r <- look.attempt
76+
_ = r shouldBe a[Left[_, _]]
77+
} yield {}
78+
}
79+
.unsafeRunSync()
80+
81+
}
82+
83+
test("PersistentActorOf based actor must fail if journal fails to persist message") {
84+
85+
TestActorSystem[IO]("testing", none)
86+
.use { system =>
87+
def actor = PersistentActorOf[IO](eventSourced)
88+
val ref = system.actorOf(Props(actor))
89+
def tell = IO(ref ! GoodEvent)
90+
def fail = IO(ref ! FailEvent)
91+
def look = IO.fromFuture(IO(system.actorSelection(ref.path).resolveOne(2.seconds)))
92+
93+
for {
94+
_ <- tell
95+
_ <- tell
96+
_ <- IO.sleep(1.second)
97+
r <- look
98+
_ = r shouldBe ref
99+
_ <- fail
100+
_ <- IO.sleep(1.second)
101+
r <- look.attempt
102+
_ = r shouldBe a[Left[_, _]]
103+
} yield {}
104+
}
105+
.unsafeRunSync()
106+
107+
}
108+
109+
}
110+
111+
object FailOnEventJournal {
112+
val exception = new RuntimeException("test exception")
113+
114+
trait Event {
115+
def fail: Boolean
116+
}
117+
}
118+
119+
class FailOnEventJournal extends AsyncWriteJournal {
120+
121+
import scala.concurrent.ExecutionContext.Implicits.global
122+
import FailOnEventJournal._
123+
124+
override def asyncReplayMessages(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long, max: Long)(
125+
recoveryCallback: PersistentRepr => Unit,
126+
): Future[Unit] = Future.successful {}
127+
128+
override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] =
129+
Future.successful(42L)
130+
131+
override def asyncWriteMessages(messages: Seq[AtomicWrite]): Future[Seq[Try[Unit]]] =
132+
Future.traverse(messages.flatMap(_.payload)) { repr =>
133+
repr.payload match {
134+
case event: Event if event.fail => Future.failed(exception)
135+
case _ => Future.successful(Try(()))
136+
}
137+
}
138+
139+
override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] =
140+
Future.successful {}
141+
142+
}

version.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
ThisBuild / version := "4.1.4-SNAPSHOT"
1+
ThisBuild / version := "4.1.4"

0 commit comments

Comments
 (0)