@@ -101,12 +101,15 @@ object PartitionFlow {
101101
102102 def stateOf (createdAt : Timestamp , key : String ): F [PartitionKey [F ]] =
103103 cache.getOrUpdateResource(key) {
104+ val keyLog = Log [F ].prefixed(key)
104105 for {
106+ _ <- Resource .eval(keyLog.debug(" not found in cache" ))
105107 context <- KeyContext .resource[F ](
106108 removeFromCache = cache.remove(key).flatten.void,
107- log = Log [ F ].prefixed(key)
109+ log = keyLog
108110 )
109111 keyState <- keyStateOf(topicPartition, key, createdAt, context)
112+ _ <- Resource .eval(keyLog.debug(" computed KeyState" ))
110113 } yield PartitionKey (keyState, context)
111114 }
112115
@@ -132,6 +135,8 @@ object PartitionFlow {
132135 } yield ()
133136
134137 def processRecords (records : NonEmptyList [ConsRecord ]) = for {
138+ _ <- Log [F ].debug(s " processing ${records.size} records from partition ${topicPartition.partition}" )
139+
135140 clock <- Clock [F ].instant
136141 keys = records groupBy (_.key map (_.value)) collect {
137142 // we deliberately ignore records without a key to simplify the code
@@ -172,6 +177,8 @@ object PartitionFlow {
172177 offset = maximumOffset
173178 )
174179 )
180+
181+ _ <- Log [F ].debug(s " finished processing records from partition ${topicPartition.partition}" )
175182 } yield ()
176183
177184 def triggerTimers = for {
0 commit comments