Skip to content

Commit 1cf6efb

Browse files
author
ku76uh
committed
Add logs to detect high usage.
1 parent 3f0e76a commit 1cf6efb

File tree

3 files changed

+85
-4
lines changed

3 files changed

+85
-4
lines changed

core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstance.scala

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,6 @@ class ProcessInstance(
135135
// setting up receive timeout to stop actor, when it's not stopped by IdleStop message
136136
context.setReceiveTimeout(settings.idleTTL.getOrElse(15 minutes) * 2)
137137

138-
override val log: DiagnosticLoggingAdapter = Logging.getLogger(this)
139-
140138
override def preStart(): Unit = {
141139
log.info(s"ProcessInstance started: $recipeInstanceId")
142140
}
@@ -662,6 +660,27 @@ class ProcessInstance(
662660
* It finds which transitions are enabled and executes those.
663661
*/
664662
def step(instance: Instance[RecipeInstanceState]): (Instance[RecipeInstanceState], Set[Job[RecipeInstanceState]]) = {
663+
664+
// Log if sequenceNr is high
665+
if (instance.sequenceNr > 100 && instance.sequenceNr % 50 == 0) {
666+
val originalMdcBackup = log.mdc
667+
try {
668+
log.mdc(originalMdcBackup ++ Map(
669+
"sequenceNr" -> instance.sequenceNr
670+
))
671+
log.warning(
672+
s"High iteration count detected: " +
673+
s"recipe='${compiledRecipe.name}', " +
674+
s"instance='$recipeInstanceId', " +
675+
s"sequence=${instance.sequenceNr}. " +
676+
s"Investigate possible non-terminating loops."
677+
)
678+
} finally {
679+
// Always restore the original MDC
680+
log.mdc(originalMdcBackup)
681+
}
682+
}
683+
665684
runtime.allEnabledJobs.run(instance).value match {
666685
case (updatedInstance, jobs) =>
667686
if (jobs.isEmpty && updatedInstance.activeJobs.isEmpty)

core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceEventSourcing.scala

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package com.ing.baker.runtime.akka.actor.process_instance
22

33
import akka.NotUsed
44
import akka.actor.{ActorSystem, NoSerializationVerificationNeeded}
5+
import akka.event.{DiagnosticLoggingAdapter, Logging}
56
import akka.persistence.query.scaladsl.CurrentEventsByPersistenceIdQuery
67
import akka.persistence.{PersistentActor, RecoveryCompleted}
78
import akka.sensors.actor.PersistentActorMetrics
@@ -16,6 +17,7 @@ import com.ing.baker.runtime.scaladsl.{EventInstance, RecipeInstanceState}
1617
import com.ing.baker.runtime.serialization.Encryption
1718
import com.ing.baker.types.Value
1819
import com.typesafe.scalalogging.LazyLogging
20+
import scalapb.GeneratedMessage
1921

2022
object ProcessInstanceEventSourcing extends LazyLogging {
2123

@@ -289,6 +291,59 @@ abstract class ProcessInstanceEventSourcing(
289291

290292
protected implicit val system: ActorSystem = context.system
291293

294+
override val log: DiagnosticLoggingAdapter = Logging.getLogger(this)
295+
296+
private object EventSizingMonitor {
297+
private def megabytes(mb: Long): Long = mb * 1024 * 1024
298+
299+
val InitialWarningThreshold: Long = megabytes(20)
300+
val WarningLogIncrement: Long = megabytes(1)
301+
}
302+
303+
private var cumulativeEventLogSize: Long = 0L
304+
private var nextWarningThreshold: Long = EventSizingMonitor.InitialWarningThreshold
305+
306+
private def trackAndLogEventSize(newEventSize: Int): Unit = {
307+
cumulativeEventLogSize += newEventSize
308+
309+
if (hasCrossedWarningThreshold) {
310+
logLargeEventLogWarning()
311+
advanceWarningThreshold()
312+
}
313+
}
314+
315+
private def hasCrossedWarningThreshold: Boolean =
316+
cumulativeEventLogSize >= nextWarningThreshold
317+
318+
private def logLargeEventLogWarning(): Unit = {
319+
val sizeInMB = cumulativeEventLogSize / (1024 * 1024)
320+
321+
val originalMdcBackup = log.mdc
322+
try {
323+
log.mdc(originalMdcBackup ++ Map(
324+
"persistenceId" -> persistenceId,
325+
"eventLogSizeMB" -> sizeInMB
326+
))
327+
log.warning(
328+
s"Process instance '$persistenceId' has a large event log size: " +
329+
s"$sizeInMB MB. This may impact performance and recovery times."
330+
)
331+
} finally {
332+
log.mdc(originalMdcBackup)
333+
}
334+
}
335+
336+
/**
337+
* Advances the warning threshold to the next increment.
338+
* This ensures we don't log on every single subsequent event, only when the next milestone is reached.
339+
* The loop handles cases where a large batch of events causes the size to cross multiple thresholds at once.
340+
*/
341+
private def advanceWarningThreshold(): Unit = {
342+
while (cumulativeEventLogSize >= nextWarningThreshold) {
343+
nextWarningThreshold += EventSizingMonitor.WarningLogIncrement
344+
}
345+
}
346+
292347
protected val eventSource: Instance[RecipeInstanceState] => Event => Instance[RecipeInstanceState] =
293348
ProcessInstanceEventSourcing.apply[RecipeInstanceState, EventInstance](eventSourceFn)
294349

@@ -298,17 +353,24 @@ abstract class ProcessInstanceEventSourcing(
298353

299354
def persistEvent[O](instance: Instance[RecipeInstanceState], e: Event)(fn: Event => O): Unit = {
300355
val serializedEvent = serializer.serializeEvent(e)(instance)
356+
trackAndLogEventSize(serializedEvent.toByteArray.length)
357+
301358
persist(serializedEvent) { _ => fn(e) }
302359
}
303360

304361
def persistAllEvents[O](instance: Instance[RecipeInstanceState], events: List[Event])(fn: List[Event] => O): Unit = {
305362
val serializedEvents = events.map {e => serializer.serializeEvent(e)(instance)}
363+
serializedEvents.foreach(event => trackAndLogEventSize(event.toByteArray.length))
364+
306365
persistAll(serializedEvents) { _ -> fn(events) }
307366
}
308367

309368
private var recoveringState: Instance[RecipeInstanceState] = Instance.uninitialized[RecipeInstanceState](petriNet)
310369

311-
private def applyToRecoveringState(e: AnyRef): Unit = {
370+
private def applyToRecoveringState(e: GeneratedMessage): Unit = {
371+
val eventSize = e.toByteArray.length
372+
trackAndLogEventSize(eventSize)
373+
312374
val deserializedEvent = serializer.deserializeEvent(e)(recoveringState)
313375
recoveringState = eventSource(recoveringState)(deserializedEvent)
314376
}

core/akka-runtime/src/main/scala/com/ing/baker/runtime/akka/actor/process_instance/ProcessInstanceSerialization.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class ProcessInstanceSerialization[S, E](provider: AkkaSerializerProvider) {
4949
/**
5050
* Serializes an EventSourcing.Event to a persistence.protobuf.Event.
5151
*/
52-
def serializeEvent(e: ProcessInstanceEventSourcing.Event): Instance[S] => AnyRef =
52+
def serializeEvent(e: ProcessInstanceEventSourcing.Event): Instance[S] => scalapb.GeneratedMessage =
5353
_ => e match {
5454
case e: InitializedEvent => serializeInitialized(e)
5555
case e: TransitionFiredEvent => serializeTransitionFired(e)

0 commit comments

Comments
 (0)