diff --git a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstance.scala b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstance.scala index 1548a3c46..6c9ae1253 100644 --- a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstance.scala +++ b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstance.scala @@ -135,8 +135,6 @@ class ProcessInstance( // setting up receive timeout to stop actor, when it's not stopped by IdleStop message context.setReceiveTimeout(settings.idleTTL.getOrElse(15 minutes) * 2) - override val log: DiagnosticLoggingAdapter = Logging.getLogger(this) - override def preStart(): Unit = { log.info(s"ProcessInstance started: $recipeInstanceId") } @@ -659,6 +657,27 @@ class ProcessInstance( * It finds which transitions are enabled and executes those. */ def step(instance: Instance[RecipeInstanceState]): (Instance[RecipeInstanceState], Set[Job[RecipeInstanceState]]) = { + + // Log if sequenceNr is high + if (instance.sequenceNr > 500 && instance.sequenceNr % 50 == 0) { + val originalMdcBackup = log.mdc + try { + log.mdc(originalMdcBackup ++ Map( + "sequenceNr" -> instance.sequenceNr + )) + log.warning( + s"High iteration count detected: " + + s"recipe='${compiledRecipe.name}', " + + s"instance='$recipeInstanceId', " + + s"sequence=${instance.sequenceNr}. " + + s"Investigate possible non-terminating loops." + ) + } finally { + // Always restore the original MDC + log.mdc(originalMdcBackup) + } + } + runtime.allEnabledJobs.run(instance).value match { case (updatedInstance, jobs) => if (jobs.isEmpty && updatedInstance.activeJobs.isEmpty) diff --git a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceEventSourcing.scala b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceEventSourcing.scala index 6b48d3b68..74a05c887 100644 --- a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceEventSourcing.scala +++ b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceEventSourcing.scala @@ -2,6 +2,7 @@ package com.ing.baker.runtime.akka.actor.process_instance import akka.NotUsed import akka.actor.{ActorSystem, NoSerializationVerificationNeeded} +import akka.event.{DiagnosticLoggingAdapter, Logging} import akka.persistence.query.scaladsl.CurrentEventsByPersistenceIdQuery import akka.persistence.{PersistentActor, RecoveryCompleted} import akka.sensors.actor.PersistentActorMetrics @@ -16,6 +17,7 @@ import com.ing.baker.runtime.scaladsl.{EventInstance, RecipeInstanceState} import com.ing.baker.runtime.serialization.Encryption import com.ing.baker.types.Value import com.typesafe.scalalogging.LazyLogging +import scalapb.GeneratedMessage object ProcessInstanceEventSourcing extends LazyLogging { @@ -289,6 +291,59 @@ abstract class ProcessInstanceEventSourcing( protected implicit val system: ActorSystem = context.system + override val log: DiagnosticLoggingAdapter = Logging.getLogger(this) + + private object EventSizingMonitor { + private def megabytes(mb: Long): Long = mb * 1024 * 1024 + + val InitialWarningThreshold: Long = megabytes(20) + val WarningLogIncrement: Long = megabytes(1) + } + + private var cumulativeEventLogSize: Long = 0L + private var nextWarningThreshold: Long = EventSizingMonitor.InitialWarningThreshold + + private def trackAndLogEventSize(newEventSize: Int): Unit = { + cumulativeEventLogSize += newEventSize + + if (hasCrossedWarningThreshold) { + logLargeEventLogWarning() + advanceWarningThreshold() + } + } + + private def hasCrossedWarningThreshold: Boolean = + cumulativeEventLogSize >= nextWarningThreshold + + private def logLargeEventLogWarning(): Unit = { + val sizeInMB = cumulativeEventLogSize / (1024 * 1024) + + val originalMdcBackup = log.mdc + try { + log.mdc(originalMdcBackup ++ Map( + "persistenceId" -> persistenceId, + "eventLogSizeMB" -> sizeInMB + )) + log.warning( + s"Process instance '$persistenceId' has a large event log size: " + + s"$sizeInMB MB. This may impact performance and recovery times." + ) + } finally { + log.mdc(originalMdcBackup) + } + } + + /** + * Advances the warning threshold to the next increment. + * This ensures we don't log on every single subsequent event, only when the next milestone is reached. + * The loop handles cases where a large batch of events causes the size to cross multiple thresholds at once. + */ + private def advanceWarningThreshold(): Unit = { + while (cumulativeEventLogSize >= nextWarningThreshold) { + nextWarningThreshold += EventSizingMonitor.WarningLogIncrement + } + } + protected val eventSource: Instance[RecipeInstanceState] => Event => Instance[RecipeInstanceState] = ProcessInstanceEventSourcing.apply[RecipeInstanceState, EventInstance](eventSourceFn) @@ -298,17 +353,24 @@ abstract class ProcessInstanceEventSourcing( def persistEvent[O](instance: Instance[RecipeInstanceState], e: Event)(fn: Event => O): Unit = { val serializedEvent = serializer.serializeEvent(e)(instance) + trackAndLogEventSize(serializedEvent.serializedSize) + persist(serializedEvent) { _ => fn(e) } } def persistAllEvents[O](instance: Instance[RecipeInstanceState], events: List[Event])(fn: List[Event] => O): Unit = { val serializedEvents = events.map {e => serializer.serializeEvent(e)(instance)} + serializedEvents.foreach(event => trackAndLogEventSize(event.serializedSize)) + persistAll(serializedEvents) { _ -> fn(events) } } private var recoveringState: Instance[RecipeInstanceState] = Instance.uninitialized[RecipeInstanceState](petriNet) - private def applyToRecoveringState(e: AnyRef): Unit = { + private def applyToRecoveringState(e: GeneratedMessage with AnyRef): Unit = { + val eventSize = e.serializedSize + trackAndLogEventSize(eventSize) + val deserializedEvent = serializer.deserializeEvent(e)(recoveringState) recoveringState = eventSource(recoveringState)(deserializedEvent) } diff --git a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceSerialization.scala b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceSerialization.scala index ce4fe3baf..5cff8c907 100644 --- a/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceSerialization.scala +++ b/core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceSerialization.scala @@ -49,7 +49,7 @@ class ProcessInstanceSerialization[S, E](provider: AkkaSerializerProvider) { /** * Serializes an EventSourcing.Event to a persistence.protobuf.Event. */ - def serializeEvent(e: ProcessInstanceEventSourcing.Event): Instance[S] => AnyRef = + def serializeEvent(e: ProcessInstanceEventSourcing.Event): Instance[S] => scalapb.GeneratedMessage = _ => e match { case e: InitializedEvent => serializeInitialized(e) case e: TransitionFiredEvent => serializeTransitionFired(e)