Skip to content

Commit 2b5cbd1

Browse files
committed
Configuring persistence plugins at runtime for EventSourcedBehavior
1 parent 138e419 commit 2b5cbd1

File tree

13 files changed

+246
-53
lines changed

13 files changed

+246
-53
lines changed

akka-persistence-testkit/src/main/scala/akka/persistence/testkit/PersistenceTestKitPlugin.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,18 @@ object PersistenceTestKitPlugin {
104104
* Persistence testkit plugin for snapshots.
105105
*/
106106
@InternalApi
107-
class PersistenceTestKitSnapshotPlugin extends SnapshotStore {
107+
class PersistenceTestKitSnapshotPlugin(
108+
// providing this parameter in first position as unused
109+
// because Persistence extension that instantiates the plugins
110+
// does not support constructors without it
111+
@nowarn("msg=never used") cfg: Config,
112+
cfgPath: String)
113+
extends SnapshotStore {
108114

109-
private final val storage = SnapshotStorageEmulatorExtension(context.system)
115+
private final val storage = {
116+
log.debug("Using snapshot storage emulator extension [{}] for test kit snapshot storage", cfgPath)
117+
SnapshotStorageEmulatorExtension(context.system).storageFor(cfgPath)
118+
}
110119

111120
override def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] =
112121
Future.fromTry(Try(storage.tryRead(persistenceId, criteria)))

akka-persistence-testkit/src/main/scala/akka/persistence/testkit/internal/SnapshotStorageEmulatorExtension.scala

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
package akka.persistence.testkit.internal
66

7-
import akka.actor.{ ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider }
7+
import java.util.concurrent.ConcurrentHashMap
8+
9+
import akka.actor.{ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider}
810
import akka.actor.Extension
911
import akka.annotation.InternalApi
1012
import akka.persistence.testkit.SnapshotStorage
@@ -14,17 +16,33 @@ import akka.persistence.testkit.scaladsl.SnapshotTestKit
1416
* INTERNAL API
1517
*/
1618
@InternalApi
17-
private[testkit] object SnapshotStorageEmulatorExtension extends ExtensionId[SnapshotStorage] with ExtensionIdProvider {
19+
private[testkit] object SnapshotStorageEmulatorExtension extends ExtensionId[SnapshotStorageEmulatorExtension] with ExtensionIdProvider {
1820

19-
override def get(system: ActorSystem): SnapshotStorage = super.get(system)
21+
override def get(system: ActorSystem): SnapshotStorageEmulatorExtension = super.get(system)
2022

21-
override def createExtension(system: ExtendedActorSystem): SnapshotStorage =
22-
if (SnapshotTestKit.Settings(system).serialize) {
23-
new SerializedSnapshotStorageImpl(system)
24-
} else {
25-
new SimpleSnapshotStorageImpl
26-
}
23+
override def createExtension(system: ExtendedActorSystem): SnapshotStorageEmulatorExtension =
24+
new SnapshotStorageEmulatorExtension(system)
2725

2826
override def lookup: ExtensionId[_ <: Extension] =
2927
SnapshotStorageEmulatorExtension
3028
}
29+
30+
/**
31+
* INTERNAL API
32+
*/
33+
@InternalApi
34+
final class SnapshotStorageEmulatorExtension(system: ExtendedActorSystem) extends Extension {
35+
private val stores = new ConcurrentHashMap[String, SnapshotStorage]()
36+
private lazy val shouldCreateSerializedSnapshotStorage = SnapshotTestKit.Settings(system).serialize
37+
38+
def storageFor(key: String): SnapshotStorage =
39+
stores.computeIfAbsent(key,
40+
_ => {
41+
// we don't really care about the key here, we just want separate instances
42+
if (shouldCreateSerializedSnapshotStorage) {
43+
new SerializedSnapshotStorageImpl(system)
44+
} else {
45+
new SimpleSnapshotStorageImpl
46+
}
47+
})
48+
}

akka-persistence-testkit/src/main/scala/akka/persistence/testkit/scaladsl/PersistenceTestKit.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,8 @@ class SnapshotTestKit(system: ActorSystem)
328328

329329
import SnapshotTestKit._
330330

331-
override protected val storage: SnapshotStorage = SnapshotStorageEmulatorExtension(system)
331+
override protected val storage: SnapshotStorage =
332+
SnapshotStorageEmulatorExtension(system).storageFor(PersistenceTestKitSnapshotPlugin.PluginId)
332333

333334
override def getItem(persistenceId: String, nextInd: Int): Option[Any] = {
334335
storage.firstInExpectNextQueue(persistenceId).map(reprToAny)
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package akka.persistence.testkit.scaladsl
2+
3+
import akka.Done
4+
import akka.actor.testkit.typed.scaladsl.LogCapturing
5+
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
6+
import akka.actor.typed.ActorRef
7+
import akka.actor.typed.Behavior
8+
import akka.actor.typed.scaladsl.adapter._
9+
import akka.persistence.JournalProtocol.RecoverySuccess
10+
import akka.persistence.JournalProtocol.ReplayMessages
11+
import akka.persistence.JournalProtocol.ReplayedMessage
12+
import akka.persistence.Persistence
13+
import akka.persistence.SelectedSnapshot
14+
import akka.persistence.SnapshotProtocol.LoadSnapshot
15+
import akka.persistence.SnapshotProtocol.LoadSnapshotResult
16+
import akka.persistence.SnapshotSelectionCriteria
17+
import akka.persistence.testkit.PersistenceTestKitPlugin
18+
import akka.persistence.testkit.PersistenceTestKitSnapshotPlugin
19+
import akka.persistence.typed.PersistenceId
20+
import akka.persistence.typed.scaladsl.Effect
21+
import akka.persistence.typed.scaladsl.EventSourcedBehavior
22+
import akka.persistence.typed.scaladsl.RetentionCriteria
23+
import com.typesafe.config.ConfigFactory
24+
import org.scalatest.Inside
25+
import org.scalatest.wordspec.AnyWordSpecLike
26+
27+
object RuntimeJournalsSpec {
28+
29+
private object Actor {
30+
sealed trait Command
31+
case class Save(text: String, replyTo: ActorRef[Done]) extends Command
32+
case class ShowMeWhatYouGot(replyTo: ActorRef[String]) extends Command
33+
case object Stop extends Command
34+
35+
def apply(persistenceId: String, journal: String): Behavior[Command] =
36+
EventSourcedBehavior[Command, String, String](
37+
PersistenceId.ofUniqueId(persistenceId),
38+
"",
39+
(state, cmd) =>
40+
cmd match {
41+
case Save(text, replyTo) =>
42+
Effect.persist(text).thenRun(_ => replyTo ! Done)
43+
case ShowMeWhatYouGot(replyTo) =>
44+
replyTo ! state
45+
Effect.none
46+
case Stop =>
47+
Effect.stop()
48+
},
49+
(state, evt) => Seq(state, evt).filter(_.nonEmpty).mkString("|"))
50+
.withRetention(RetentionCriteria.snapshotEvery(1, Int.MaxValue))
51+
.withJournalPluginId(s"$journal.journal")
52+
.withJournalPluginConfig(Some(config(journal)))
53+
.withSnapshotPluginId(s"$journal.snapshot")
54+
.withSnapshotPluginConfig(Some(config(journal)))
55+
56+
}
57+
58+
private def config(journal: String) = {
59+
ConfigFactory.parseString(s"""
60+
$journal {
61+
journal.class = "${classOf[PersistenceTestKitPlugin].getName}"
62+
snapshot.class = "${classOf[PersistenceTestKitSnapshotPlugin].getName}"
63+
}
64+
""")
65+
}
66+
}
67+
68+
class RuntimeJournalsSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing with Inside {
69+
70+
import RuntimeJournalsSpec._
71+
72+
"The testkit journal and snapshot store plugins" must {
73+
74+
"be possible to configure at runtime and use in multiple isolated instances" in {
75+
val probe = createTestProbe[Any]()
76+
77+
{
78+
// one actor in each journal with same id
79+
val j1 = spawn(Actor("id1", "journal1"))
80+
val j2 = spawn(Actor("id1", "journal2"))
81+
j1 ! Actor.Save("j1m1", probe.ref)
82+
probe.receiveMessage()
83+
j2 ! Actor.Save("j2m1", probe.ref)
84+
probe.receiveMessage()
85+
}
86+
87+
{
88+
def assertJournal(journal: String, expectedEvent: String) = {
89+
val ref = Persistence(system).journalFor(s"$journal.journal", config(journal))
90+
ref.tell(ReplayMessages(0, Long.MaxValue, Long.MaxValue, "id1", probe.ref.toClassic), probe.ref.toClassic)
91+
inside(probe.receiveMessage()) {
92+
case ReplayedMessage(persistentRepr) =>
93+
persistentRepr.persistenceId shouldBe "id1"
94+
persistentRepr.payload shouldBe expectedEvent
95+
}
96+
probe.expectMessage(RecoverySuccess(1))
97+
}
98+
99+
assertJournal("journal1", "j1m1")
100+
assertJournal("journal2", "j2m1")
101+
}
102+
103+
{
104+
def assertSnapshot(journal: String, expectedShapshot: String) = {
105+
val ref = Persistence(system).snapshotStoreFor(s"$journal.snapshot", config(journal))
106+
ref.tell(LoadSnapshot("id1", SnapshotSelectionCriteria.Latest, Long.MaxValue), probe.ref.toClassic)
107+
inside(probe.receiveMessage()) {
108+
case LoadSnapshotResult(Some(SelectedSnapshot(_, snapshot)), _) =>
109+
snapshot shouldBe expectedShapshot
110+
}
111+
}
112+
113+
assertSnapshot("journal1", "j1m1")
114+
assertSnapshot("journal2", "j2m1")
115+
}
116+
}
117+
}
118+
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.typed.scaladsl.EventSourcedBehavior.withJournalPluginConfig")
2+
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.typed.scaladsl.EventSourcedBehavior.withSnapshotPluginConfig")

akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/BehaviorSetup.scala

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,11 @@ import akka.persistence.typed.scaladsl.ReplicationInterceptor
2323
import akka.persistence.typed.scaladsl.RetentionCriteria
2424
import akka.persistence.typed.telemetry.EventSourcedBehaviorInstrumentation
2525
import akka.persistence.typed.scaladsl.SnapshotWhenPredicate
26+
import akka.util.Helpers.ConfigOps
2627
import akka.util.OptionVal
28+
import com.typesafe.config.ConfigFactory
29+
30+
import scala.concurrent.duration.FiniteDuration
2731

2832
/**
2933
* INTERNAL API
@@ -70,8 +74,11 @@ private[akka] final class BehaviorSetup[C, E, S](
7074

7175
val persistence: Persistence = Persistence(context.system.toClassic)
7276

73-
val journal: ClassicActorRef = persistence.journalFor(settings.journalPluginId)
74-
val snapshotStore: ClassicActorRef = persistence.snapshotStoreFor(settings.snapshotPluginId)
77+
val journal: ClassicActorRef =
78+
persistence.journalFor(settings.journalPluginId, settings.journalPluginConfig.getOrElse(ConfigFactory.empty))
79+
val snapshotStore: ClassicActorRef = persistence.snapshotStoreFor(
80+
settings.snapshotPluginId,
81+
settings.snapshotPluginConfig.getOrElse(ConfigFactory.empty))
7582

7683
val (isSnapshotOptional: Boolean, isOnlyOneSnapshot: Boolean) = {
7784
val snapshotStoreConfig = Persistence(context.system.classicSystem).configFor(snapshotStore)
@@ -125,16 +132,19 @@ private[akka] final class BehaviorSetup[C, E, S](
125132

126133
private var recoveryTimer: OptionVal[Cancellable] = OptionVal.None
127134

135+
val recoveryEventTimeout: FiniteDuration = persistence
136+
.journalConfigFor(settings.journalPluginId, settings.journalPluginConfig.getOrElse(ConfigFactory.empty))
137+
.getMillisDuration("recovery-event-timeout")
138+
128139
def startRecoveryTimer(snapshot: Boolean): Unit = {
129140
cancelRecoveryTimer()
130141
implicit val ec: ExecutionContext = context.executionContext
131142
val timer =
132143
if (snapshot)
133-
context.scheduleOnce(settings.recoveryEventTimeout, context.self, RecoveryTickEvent(snapshot = true))
144+
context.scheduleOnce(recoveryEventTimeout, context.self, RecoveryTickEvent(snapshot = true))
134145
else
135-
context.system.scheduler.scheduleWithFixedDelay(settings.recoveryEventTimeout, settings.recoveryEventTimeout) {
136-
() =>
137-
context.self ! RecoveryTickEvent(snapshot = false)
146+
context.system.scheduler.scheduleWithFixedDelay(recoveryEventTimeout, recoveryEventTimeout) { () =>
147+
context.self ! RecoveryTickEvent(snapshot = false)
138148
}
139149
recoveryTimer = OptionVal.Some(timer)
140150
}

akka-persistence-typed/src/main/scala/akka/persistence/typed/internal/EventSourcedBehaviorImpl.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package akka.persistence.typed.internal
77
import java.util.Optional
88
import java.util.UUID
99
import java.util.concurrent.atomic.AtomicInteger
10+
1011
import org.slf4j.LoggerFactory
1112
import akka.Done
1213
import akka.actor.typed
@@ -44,6 +45,7 @@ import akka.persistence.typed.scaladsl._
4445
import akka.persistence.typed.scaladsl.{ Recovery => TypedRecovery }
4546
import akka.persistence.typed.scaladsl.RetentionCriteria
4647
import akka.persistence.typed.telemetry.EventSourcedBehaviorInstrumentationProvider
48+
import com.typesafe.config.Config
4749

4850
@InternalApi
4951
private[akka] object EventSourcedBehaviorImpl {
@@ -93,6 +95,8 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
9395
loggerClass: Class[_],
9496
journalPluginId: Option[String] = None,
9597
snapshotPluginId: Option[String] = None,
98+
journalPluginConfig: Option[Config] = None,
99+
snapshotPluginConfig: Option[Config] = None,
96100
tagger: (State, Event) => Set[String] = (_: State, _: Event) => Set.empty[String],
97101
eventAdapter: EventAdapter[Event, Any] = NoOpEventAdapter.instance[Event],
98102
snapshotAdapter: SnapshotAdapter[State] = NoOpSnapshotAdapter.instance[State],
@@ -139,7 +143,9 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
139143
ctx.system,
140144
journalPluginId.getOrElse(""),
141145
snapshotPluginId.getOrElse(""),
142-
customStashCapacity)
146+
customStashCapacity,
147+
journalPluginConfig,
148+
snapshotPluginConfig)
143149

144150
// stashState outside supervise because StashState should survive restarts due to persist failures
145151
val stashState = new StashState(ctx.asInstanceOf[ActorContext[InternalProtocol]], settings)
@@ -271,6 +277,14 @@ private[akka] final case class EventSourcedBehaviorImpl[Command, Event, State](
271277
copy(snapshotPluginId = if (id != "") Some(id) else None)
272278
}
273279

280+
override def withJournalPluginConfig(config: Option[Config]): EventSourcedBehavior[Command, Event, State] = {
281+
copy(journalPluginConfig = config)
282+
}
283+
284+
override def withSnapshotPluginConfig(config: Option[Config]): EventSourcedBehavior[Command, Event, State] = {
285+
copy(snapshotPluginConfig = config)
286+
}
287+
274288
override def withSnapshotSelectionCriteria(
275289
selection: SnapshotSelectionCriteria): EventSourcedBehavior[Command, Event, State] = {
276290
copy(recovery = Recovery(selection.toClassic))

0 commit comments

Comments
 (0)