Skip to content

Commit 004d5fe

Browse files
mltheuserku76uh
andauthored
Extend awaitEvent to waitForNext event instance (#1932)
* Extend awaitEvent to waitForNext event instance * Updated documentation. --------- Co-authored-by: ku76uh <malte.heuser@ing.com>
1 parent 57f48b5 commit 004d5fe

File tree

20 files changed

+163
-54
lines changed

20 files changed

+163
-54
lines changed

core/akka-runtime/src/main/protobuf/process_index.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ message AwaitCompleted {
204204
message AwaitEvent {
205205
optional string recipeInstanceId = 1;
206206
optional string eventName = 2;
207+
optional bool waitForNext = 3;
207208
}
208209

209210
message ProcessSensoryEvent {

core/akka-runtime/src/main/protobuf/process_instance.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ message Completed {
283283

284284
message AwaitEvent {
285285
optional string eventName = 1;
286+
optional bool waitForNext = 2;
286287
}
287288

288289
message EventOccurred {

core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/AkkaBaker.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -608,9 +608,9 @@ class AkkaBaker private[runtime](config: AkkaBakerConfig) extends scaladsl.Baker
608608
}
609609
}
610610

611-
override def awaitEvent(recipeInstanceId: String, eventName: String, timeout: FiniteDuration): Future[Unit] = {
611+
override def awaitEvent(recipeInstanceId: String, eventName: String, timeout: FiniteDuration, waitForNext: Boolean = false): Future[Unit] = {
612612
processIndexActor
613-
.ask(ProcessIndexProtocol.AwaitEvent(recipeInstanceId, eventName))(timeout)
613+
.ask(ProcessIndexProtocol.AwaitEvent(recipeInstanceId, eventName, waitForNext))(timeout)
614614
.javaTimeoutToBakerTimeout("awaitEvent")
615615
.flatMap {
616616
case EventOccurred => Future.successful()

core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_index/ProcessIndex.scala

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -634,45 +634,26 @@ class ProcessIndex(recipeInstanceIdleTimeout: Option[FiniteDuration],
634634
actorRef.forward(ProcessInstanceProtocol.AwaitCompleted)
635635
}
636636

637-
case ProcessIndexProtocol.AwaitEvent(recipeInstanceId, eventName) =>
637+
case ProcessIndexProtocol.AwaitEvent(recipeInstanceId, eventName, waitForNext) =>
638638
val originalSender = sender()
639639

640-
// This program describes the logic as a sequence of validation steps.
641-
// If any step fails (returns a Left), the entire program short-circuits
642-
// and the failure is handled in unsafeRunAsync.
643640
val program: FireEventIO[Unit] = for {
644-
// Fetch the process actor and its metadata. This will activate the actor if it's passivated.
645641
instanceAndMeta <- fetchInstance(recipeInstanceId)
646642
(processInstance, metadata) = instanceAndMeta
647-
648-
// Fetch the compiled recipe from the cache or manager.
649643
recipe <- fetchRecipe(metadata)
650-
651-
// Validate that the event name corresponds to a known sensory event in the recipe.
652-
// We don't need the result, we just care that it succeeds.
653644
_ <- validateAnyEventNameIsInRecipe(recipe, eventName, recipeInstanceId)
654-
655-
// If all validations pass, forward the AwaitEvent message to the ProcessInstance actor.
656-
// `forward` preserves the original sender, so the ProcessInstance actor's reply will go
657-
// to the actor that originally sent the AwaitEvent message.
658-
_ <- accept(processInstance.tell(ProcessInstanceProtocol.AwaitEvent(eventName), originalSender))
645+
_ <- accept(processInstance.tell(ProcessInstanceProtocol.AwaitEvent(eventName, waitForNext), originalSender))
659646
} yield ()
660647

661-
// Asynchronously execute the program and handle the result.
662648
program.value.unsafeRunAsync{
663649
case Left(exception) =>
664-
// This case represents an unexpected technical error within the IO program.
665650
log.error(exception, s"Unexpected error processing AwaitEvent for recipe instance '$recipeInstanceId'")
666651
originalSender ! akka.actor.Status.Failure(exception)
667652

668653
case Right(Left(rejection)) =>
669-
// This case represents a controlled, business-logic failure (e.g., event name not found, process doesn't exist).
670-
// We send the rejection object directly back to the sender.
671654
originalSender ! rejection
672655

673656
case Right(Right(())) =>
674-
// The command was successfully validated and forwarded to the ProcessInstance actor.
675-
// That actor is now responsible for handling the await logic. No further action is needed here.
676657
()
677658
}(IORuntime.global)
678659

core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_index/ProcessIndexProto.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -641,13 +641,14 @@ object ProcessIndexProto {
641641
val companion = protobuf.AwaitEvent
642642

643643
def toProto(a: ProcessIndexProtocol.AwaitEvent): protobuf.AwaitEvent =
644-
protobuf.AwaitEvent(Some(a.recipeInstanceId), Some(a.eventName))
644+
protobuf.AwaitEvent(Some(a.recipeInstanceId), Some(a.eventName), Some(a.waitForNext))
645645

646646
def fromProto(message: protobuf.AwaitEvent): Try[ProcessIndexProtocol.AwaitEvent] =
647647
for {
648648
recipeInstanceId <- versioned(message.recipeInstanceId, "recipeInstanceId")
649649
eventName <- versioned(message.eventName, "eventName")
650-
} yield ProcessIndexProtocol.AwaitEvent(recipeInstanceId, eventName)
650+
waitForNext = message.waitForNext.getOrElse(false)
651+
} yield ProcessIndexProtocol.AwaitEvent(recipeInstanceId, eventName, waitForNext)
651652
}
652653

653654
}

core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_index/ProcessIndexProtocol.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ object ProcessIndexProtocol {
4040

4141
case class AwaitCompleted(recipeInstanceId: String) extends ProcessIndexMessage
4242

43-
case class AwaitEvent(recipeInstanceId: String, eventName: String) extends ProcessIndexMessage
43+
case class AwaitEvent(recipeInstanceId: String, eventName: String, waitForNext: Boolean = false) extends ProcessIndexMessage
4444

4545
/**
4646
* Failure when attempting to resolve a blocked interaction, the event is not of valid type according with the recipe

core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstance.scala

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -334,16 +334,13 @@ class ProcessInstance(
334334
}
335335
}
336336

337-
case ProcessInstanceProtocol.AwaitEvent(eventName) =>
338-
if (instance.state.eventNames.contains(eventName)) {
337+
case ProcessInstanceProtocol.AwaitEvent(eventName, waitForNext) =>
338+
if (!waitForNext && instance.state.eventNames.contains(eventName)) {
339339
sender() ! ProcessInstanceProtocol.EventOccurred
340340
} else {
341-
// Persist the listener addition
342341
val event = EventListenerAdded(eventName, sender().path.toSerializationFormat)
343-
// This persist is sync so no race conditions to worry about
344342
persistEvent(instance, event) { _ =>
345343
val updatedInstance = eventSource.apply(instance)(event)
346-
// Update actor's state
347344
context.become(running(updatedInstance, scheduledRetries))
348345
}
349346
}

core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceProto.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,12 +372,13 @@ object ProcessInstanceProto {
372372
val companion = protobuf.AwaitEvent
373373

374374
def toProto(a: ProcessInstanceProtocol.AwaitEvent): protobuf.AwaitEvent =
375-
protobuf.AwaitEvent(Some(a.eventName))
375+
protobuf.AwaitEvent(Some(a.eventName), Some(a.waitForNext))
376376

377377
def fromProto(message: protobuf.AwaitEvent): Try[ProcessInstanceProtocol.AwaitEvent] =
378378
for {
379379
eventName <- versioned(message.eventName, "eventName")
380-
} yield ProcessInstanceProtocol.AwaitEvent(eventName)
380+
waitForNext = message.waitForNext.getOrElse(false)
381+
} yield ProcessInstanceProtocol.AwaitEvent(eventName, waitForNext)
381382
}
382383

383384
implicit def processInstanceEventOccurredProto: ProtoMap[ProcessInstanceProtocol.EventOccurred.type, protobuf.EventOccurred] =

core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceProtocol.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ object ProcessInstanceProtocol {
2929
*/
3030
case object Completed extends BakerSerializable
3131

32-
case class AwaitEvent(eventName: String) extends Command
32+
case class AwaitEvent(eventName: String, waitForNext: Boolean = false) extends Command
3333

3434
/**
3535
* Response confirming the event has occurred.

core/baker-interface-kotlin/src/main/kotlin/com/ing/baker/runtime/kotlindsl/Baker.kt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -216,8 +216,9 @@ class Baker internal constructor(private val jBaker: Baker) : AutoCloseable {
216216
recipeInstanceId: String,
217217
eventName: String,
218218
timeout: Duration,
219-
): Unit {
220-
jBaker.awaitEvent(recipeInstanceId, eventName, timeout.toJavaDuration()).await()
219+
waitForNext: Boolean = false
220+
) {
221+
jBaker.awaitEvent(recipeInstanceId, eventName, timeout.toJavaDuration(), waitForNext).await()
221222
}
222223

223224
suspend fun awaitCompleted(

0 commit comments

Comments
 (0)