Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,6 @@ class ProcessInstance(
// setting up receive timeout to stop actor, when it's not stopped by IdleStop message
context.setReceiveTimeout(settings.idleTTL.getOrElse(15 minutes) * 2)

override val log: DiagnosticLoggingAdapter = Logging.getLogger(this)

override def preStart(): Unit = {
log.info(s"ProcessInstance started: $recipeInstanceId")
}
Expand Down Expand Up @@ -659,6 +657,27 @@ class ProcessInstance(
* It finds which transitions are enabled and executes those.
*/
def step(instance: Instance[RecipeInstanceState]): (Instance[RecipeInstanceState], Set[Job[RecipeInstanceState]]) = {

// Log if sequenceNr is high
if (instance.sequenceNr > 500 && instance.sequenceNr % 50 == 0) {
val originalMdcBackup = log.mdc
try {
log.mdc(originalMdcBackup ++ Map(
"sequenceNr" -> instance.sequenceNr
))
log.warning(
s"High iteration count detected: " +
s"recipe='${compiledRecipe.name}', " +
s"instance='$recipeInstanceId', " +
s"sequence=${instance.sequenceNr}. " +
s"Investigate possible non-terminating loops."
)
} finally {
// Always restore the original MDC
log.mdc(originalMdcBackup)
}
}

runtime.allEnabledJobs.run(instance).value match {
case (updatedInstance, jobs) =>
if (jobs.isEmpty && updatedInstance.activeJobs.isEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package com.ing.baker.runtime.akka.actor.process_instance

import akka.NotUsed
import akka.actor.{ActorSystem, NoSerializationVerificationNeeded}
import akka.event.{DiagnosticLoggingAdapter, Logging}
import akka.persistence.query.scaladsl.CurrentEventsByPersistenceIdQuery
import akka.persistence.{PersistentActor, RecoveryCompleted}
import akka.sensors.actor.PersistentActorMetrics
Expand All @@ -16,6 +17,7 @@ import com.ing.baker.runtime.scaladsl.{EventInstance, RecipeInstanceState}
import com.ing.baker.runtime.serialization.Encryption
import com.ing.baker.types.Value
import com.typesafe.scalalogging.LazyLogging
import scalapb.GeneratedMessage

object ProcessInstanceEventSourcing extends LazyLogging {

Expand Down Expand Up @@ -289,6 +291,59 @@ abstract class ProcessInstanceEventSourcing(

protected implicit val system: ActorSystem = context.system

override val log: DiagnosticLoggingAdapter = Logging.getLogger(this)

private object EventSizingMonitor {
private def megabytes(mb: Long): Long = mb * 1024 * 1024

val InitialWarningThreshold: Long = megabytes(20)
val WarningLogIncrement: Long = megabytes(1)
}

private var cumulativeEventLogSize: Long = 0L
private var nextWarningThreshold: Long = EventSizingMonitor.InitialWarningThreshold

private def trackAndLogEventSize(newEventSize: Int): Unit = {
cumulativeEventLogSize += newEventSize

if (hasCrossedWarningThreshold) {
logLargeEventLogWarning()
advanceWarningThreshold()
}
}

private def hasCrossedWarningThreshold: Boolean =
cumulativeEventLogSize >= nextWarningThreshold

private def logLargeEventLogWarning(): Unit = {
val sizeInMB = cumulativeEventLogSize / (1024 * 1024)

val originalMdcBackup = log.mdc
try {
log.mdc(originalMdcBackup ++ Map(
"persistenceId" -> persistenceId,
"eventLogSizeMB" -> sizeInMB
))
log.warning(
s"Process instance '$persistenceId' has a large event log size: " +
s"$sizeInMB MB. This may impact performance and recovery times."
)
} finally {
log.mdc(originalMdcBackup)
}
}

/**
* Advances the warning threshold to the next increment.
* This ensures we don't log on every single subsequent event, only when the next milestone is reached.
* The loop handles cases where a large batch of events causes the size to cross multiple thresholds at once.
*/
private def advanceWarningThreshold(): Unit = {
while (cumulativeEventLogSize >= nextWarningThreshold) {
nextWarningThreshold += EventSizingMonitor.WarningLogIncrement
}
}

protected val eventSource: Instance[RecipeInstanceState] => Event => Instance[RecipeInstanceState] =
ProcessInstanceEventSourcing.apply[RecipeInstanceState, EventInstance](eventSourceFn)

Expand All @@ -298,17 +353,24 @@ abstract class ProcessInstanceEventSourcing(

def persistEvent[O](instance: Instance[RecipeInstanceState], e: Event)(fn: Event => O): Unit = {
val serializedEvent = serializer.serializeEvent(e)(instance)
trackAndLogEventSize(serializedEvent.serializedSize)

persist(serializedEvent) { _ => fn(e) }
}

def persistAllEvents[O](instance: Instance[RecipeInstanceState], events: List[Event])(fn: List[Event] => O): Unit = {
val serializedEvents = events.map {e => serializer.serializeEvent(e)(instance)}
serializedEvents.foreach(event => trackAndLogEventSize(event.serializedSize))

persistAll(serializedEvents) { _ -> fn(events) }
}

private var recoveringState: Instance[RecipeInstanceState] = Instance.uninitialized[RecipeInstanceState](petriNet)

private def applyToRecoveringState(e: AnyRef): Unit = {
private def applyToRecoveringState(e: GeneratedMessage with AnyRef): Unit = {
val eventSize = e.serializedSize
trackAndLogEventSize(eventSize)

val deserializedEvent = serializer.deserializeEvent(e)(recoveringState)
recoveringState = eventSource(recoveringState)(deserializedEvent)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ProcessInstanceSerialization[S, E](provider: AkkaSerializerProvider) {
/**
* Serializes an EventSourcing.Event to a persistence.protobuf.Event.
*/
def serializeEvent(e: ProcessInstanceEventSourcing.Event): Instance[S] => AnyRef =
def serializeEvent(e: ProcessInstanceEventSourcing.Event): Instance[S] => scalapb.GeneratedMessage =
_ => e match {
case e: InitializedEvent => serializeInitialized(e)
case e: TransitionFiredEvent => serializeTransitionFired(e)
Expand Down