@@ -3,7 +3,7 @@ package com.ing.baker.runtime.akka.actor.process_index
33import akka .actor .{ActorRef , NoSerializationVerificationNeeded , Props }
44import akka .cluster .sharding .ShardRegion .Passivate
55import akka .event .{DiagnosticLoggingAdapter , Logging }
6- import akka .pattern .{BackoffOpts , BackoffSupervisor , ask }
6+ import akka .pattern .{BackoffOpts , BackoffSupervisor , ask , pipe }
77import akka .persistence ._
88import akka .sensors .actor .PersistentActorMetrics
99import cats .data .{EitherT , OptionT }
@@ -40,6 +40,7 @@ import com.typesafe.config.Config
4040import scala .collection .mutable
4141import scala .concurrent .duration ._
4242import scala .concurrent .{Await , ExecutionContext , Future }
43+ import scala .util .control .NonFatal
4344import scala .util .{Failure , Success }
4445
4546
@@ -128,6 +129,10 @@ class ProcessIndex(recipeInstanceIdleTimeout: Option[FiniteDuration],
128129
129130 override val log : DiagnosticLoggingAdapter = Logging .getLogger(logSource = this )
130131
132+ // --- Internal messages for the asynchronous initialization flow ---
133+ private case class InitializationConfirmed (originalSender : ActorRef , recipeId : String , recipeInstanceId : String , compiledRecipe : CompiledRecipe , createdTime : Long ) extends NoSerializationVerificationNeeded
134+ private case class InitializationRejected (originalSender : ActorRef , recipeInstanceId : String , cause : Throwable ) extends NoSerializationVerificationNeeded
135+
131136 private val startTime = System .currentTimeMillis()
132137
133138 override def preStart (): Unit = {
@@ -342,6 +347,60 @@ class ProcessIndex(recipeInstanceIdleTimeout: Option[FiniteDuration],
342347 } yield (transition, jobId)
343348 }
344349
350+ /**
351+ * Creates the Initialize command for a new process instance.
352+ *
353+ * @return A tuple containing the Initialize command and the creation timestamp.
354+ */
355+ private def createInitializationCommand (
356+ recipeId : String ,
357+ recipeInstanceId : String ,
358+ recipeInstanceMetadata : Map [String , String ],
359+ compiledRecipe : CompiledRecipe ): (Initialize , Long ) = {
360+
361+ val createdTime = System .currentTimeMillis()
362+
363+ val ingredientsMap =
364+ if (recipeInstanceMetadata.isEmpty) Map .empty[String , Value ]
365+ else Map (RecipeInstanceMetadataName -> com.ing.baker.types.Converters .toValue(recipeInstanceMetadata))
366+
367+ val processState = RecipeInstanceState (
368+ recipeId = recipeId,
369+ recipeInstanceId = recipeInstanceId,
370+ ingredients = ingredientsMap,
371+ recipeInstanceMetadata = recipeInstanceMetadata,
372+ events = List .empty)
373+
374+ val initializeCmd = Initialize (compiledRecipe.initialMarking, processState)
375+
376+ (initializeCmd, createdTime)
377+ }
378+
379+ /**
380+ * Handles the successful initialization of a process instance.
381+ *
382+ * This involves persisting the ActorCreated event, updating the in-memory index,
383+ * logging the creation event, and replying to the original sender.
384+ */
385+ private def handleSuccessfulInitialization (
386+ originalSender : ActorRef ,
387+ recipeId : String ,
388+ recipeInstanceId : String ,
389+ compiledRecipe : CompiledRecipe ,
390+ createdTime : Long ): Unit = {
391+
392+ persistWithSnapshot(ActorCreated (recipeId, recipeInstanceId, createdTime)) { _ =>
393+ // This callback runs after the event is successfully persisted.
394+ val actorMetadata = ActorMetadata (recipeId, recipeInstanceId, createdTime, Active )
395+ index += recipeInstanceId -> actorMetadata
396+
397+ val creationEvent = RecipeInstanceCreated (createdTime, recipeId, compiledRecipe.name, recipeInstanceId)
398+ LogAndSendEvent .recipeInstanceCreated(creationEvent, context.system.eventStream)
399+
400+ originalSender ! Initialized (Marking .empty)
401+ }
402+ }
403+
345404 override def receiveCommand : Receive = {
346405 case SaveSnapshotSuccess (metadata) =>
347406 log.debug(" Snapshot saved & cleaning old processes" )
@@ -390,72 +449,48 @@ class ProcessIndex(recipeInstanceIdleTimeout: Option[FiniteDuration],
390449 }
391450
392451 case CreateProcess (recipeId, recipeInstanceId, recipeInstanceMetadata) =>
393- context.child(recipeInstanceId) match {
394- case None if ! index.contains(recipeInstanceId) =>
395-
396- // First check if the recipe exists
397- getCompiledRecipe(recipeId) match {
398- case Some (compiledRecipe) =>
399-
400- val createdTime = System .currentTimeMillis()
452+ val originalSender = sender()
401453
402- // this persists the fact that we created a process instance
403- persistWithSnapshot(ActorCreated (recipeId, recipeInstanceId, createdTime)) { _ =>
454+ index.get(recipeInstanceId) match {
455+ case Some (metadata) if metadata.isDeleted =>
456+ originalSender ! ProcessDeleted (recipeInstanceId)
404457
405- // after that we actually create the ProcessInstance actor
406- val processState = RecipeInstanceState (
407- recipeId = recipeId,
408- recipeInstanceId = recipeInstanceId,
409- ingredients =
410- if (recipeInstanceMetadata.isEmpty) Map .empty[String , Value ]
411- else Map (RecipeInstanceMetadataName -> com.ing.baker.types.Converters .toValue(recipeInstanceMetadata)),
412- recipeInstanceMetadata = recipeInstanceMetadata,
413- events = List .empty)
414- val initializeCmd = Initialize (compiledRecipe.initialMarking, processState)
458+ case Some (_) =>
459+ originalSender ! ProcessAlreadyExists (recipeInstanceId)
415460
416- // TODO ensure the initialiseCMD is accepted before we add it ot the index
461+ case None =>
462+ getCompiledRecipe(recipeId) match {
463+ case Some (compiledRecipe) =>
464+ val processActor = context.child(recipeInstanceId).getOrElse {
417465 createProcessActor(recipeInstanceId, compiledRecipe)
418- .forward(initializeCmd)
419-
420- val actorMetadata = ActorMetadata (recipeId, recipeInstanceId, createdTime, Active )
466+ }
421467
422- LogAndSendEvent .recipeInstanceCreated(
423- RecipeInstanceCreated (System .currentTimeMillis(), recipeId, compiledRecipe.name, recipeInstanceId),
424- context.system.eventStream)
468+ val (initializeCmd, createdTime) = createInitializationCommand(
469+ recipeId, recipeInstanceId, recipeInstanceMetadata, compiledRecipe)
425470
426- index += recipeInstanceId -> actorMetadata
427- }
471+ (processActor ? initializeCmd)(processInquireTimeout).map {
472+ case _ : Initialized | _ : AlreadyInitialized =>
473+ InitializationConfirmed (originalSender, recipeId, recipeInstanceId, compiledRecipe, createdTime)
474+ case other =>
475+ val err = new IllegalStateException (s " ProcessInstance for $recipeInstanceId replied with unexpected message: $other" )
476+ InitializationRejected (originalSender, recipeInstanceId, err)
477+ }
478+ .recover { case NonFatal (e) => InitializationRejected (originalSender, recipeInstanceId, e) }
479+ .pipeTo(self)
428480
429481 case None =>
430- sender() ! NoRecipeFound (recipeId)
431- }
432- case _ if index.get(recipeInstanceId).exists(_.isDeleted) =>
433- sender() ! ProcessDeleted (recipeInstanceId)
434- case None =>
435- // Temporary solution for the situation that the initializeCmd is not send in the original Bake
436- getCompiledRecipe(recipeId) match {
437- case Some (compiledRecipe) =>
438- val processState = RecipeInstanceState (recipeId, recipeInstanceId, Map .empty[String , Value ], recipeInstanceMetadata, List .empty)
439- val initializeCmd = Initialize (compiledRecipe.initialMarking, processState)
440- createProcessActor(recipeInstanceId, compiledRecipe) ! initializeCmd
441- sender() ! ProcessAlreadyExists (recipeInstanceId)
442- case None =>
443- // Kept the ProcessAlreadyExists since this was the original error
444- sender() ! ProcessAlreadyExists (recipeInstanceId)
445- }
446- case Some (actorRef : ActorRef ) =>
447- // Temporary solution for the situation that the initializeCmd is not send in the original Bake
448- getCompiledRecipe(recipeId) match {
449- case Some (compiledRecipe) =>
450- val processState = RecipeInstanceState (recipeId, recipeInstanceId, Map .empty[String , Value ], recipeInstanceMetadata, List .empty)
451- val initializeCmd = Initialize (compiledRecipe.initialMarking, processState)
452- actorRef ! initializeCmd
453- sender() ! ProcessAlreadyExists (recipeInstanceId)
454- case None =>
455- sender() ! NoRecipeFound (recipeId)
482+ originalSender ! NoRecipeFound (recipeId)
456483 }
457484 }
458485
486+ case msg : InitializationConfirmed =>
487+ handleSuccessfulInitialization(
488+ msg.originalSender, msg.recipeId, msg.recipeInstanceId, msg.compiledRecipe, msg.createdTime)
489+
490+ case msg : InitializationRejected =>
491+ log.error(msg.cause, s " Initialization of process ${msg.recipeInstanceId} failed. " )
492+ msg.originalSender ! akka.actor.Status .Failure (msg.cause)
493+
459494 case command@ ProcessEvent (recipeInstanceId, event, correlationId, _, _) =>
460495 run ({ responseHandler =>
461496 for {
0 commit comments