Skip to content

Commit 0010bdf

Browse files
author
Vladislav Filatov
committed
Fine-grained logging for PartitionFlow owned classes.
1 parent 4a3887e commit 0010bdf

File tree

10 files changed

+163
-118
lines changed

10 files changed

+163
-118
lines changed

core/src/main/scala/com/evolutiongaming/kafka/flow/KeyContext.scala

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ trait KeyContext[F[_]] {
1717
def hold(offset: Offset): F[Unit]
1818
def remove: F[Unit]
1919
def log: Log[F]
20+
def key: String
2021
}
2122
object KeyContext {
2223

@@ -27,29 +28,33 @@ object KeyContext {
2728
def holding = none[Offset].pure[F]
2829
def hold(offset: Offset) = ().pure[F]
2930
def remove = ().pure[F]
31+
val key = ""
3032
}
3133

32-
def of[F[_]: Ref.Make: Monad: Log](removeFromCache: F[Unit]): F[KeyContext[F]] =
34+
def of[F[_]: Ref.Make: Monad: Log](removeFromCache: F[Unit], key: String): F[KeyContext[F]] =
3335
Ref.of[F, Option[Offset]](None) map { storage =>
34-
KeyContext(storage.stateInstance, removeFromCache)
36+
KeyContext(storage.stateInstance, removeFromCache, key)
3537
}
3638

3739
def apply[F[_]: Monad: Log](
3840
storage: Stateful[F, Option[Offset]],
39-
removeFromCache: F[Unit]
41+
removeFromCache: F[Unit],
42+
_key: String
4043
): KeyContext[F] = new KeyContext[F] {
4144
def holding = storage.get
4245
def hold(offset: Offset) = storage.set(Some(offset))
4346
def remove = storage.set(None) *> removeFromCache
4447
def log = Log[F]
48+
val key = _key
4549
}
4650

4751
def resource[F[_]: Ref.Make: Monad](
4852
removeFromCache: F[Unit],
49-
log: Log[F]
53+
log: Log[F],
54+
key: String
5055
): Resource[F, KeyContext[F]] = {
5156
implicit val _log = log
52-
Resource.eval(of(removeFromCache))
57+
Resource.eval(of(removeFromCache, key))
5358
}
5459

5560
}

core/src/main/scala/com/evolutiongaming/kafka/flow/KeyFlowOf.scala

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package com.evolutiongaming.kafka.flow
22

33
import cats.Monad
44
import cats.effect.{Ref, Resource}
5+
import com.evolutiongaming.catshelper.LogOf
56
import com.evolutiongaming.kafka.flow.persistence.Persistence
67
import com.evolutiongaming.kafka.flow.registry.EntityRegistry
78
import com.evolutiongaming.kafka.flow.timer.{TimerContext, TimerFlowOf}
@@ -15,7 +16,7 @@ trait KeyFlowOf[F[_], S, A] {
1516
timers: TimerContext[F],
1617
additionalPersist: AdditionalStatePersist[F, S, A],
1718
registry: EntityRegistry[F, KafkaKey, S],
18-
): Resource[F, KeyFlow[F, A]]
19+
)(implicit logOf: LogOf[F]): Resource[F, KeyFlow[F, A]]
1920

2021
}
2122
object KeyFlowOf {
@@ -50,11 +51,19 @@ object KeyFlowOf {
5051
timerFlowOf: TimerFlowOf[F],
5152
fold: EnhancedFold[F, S, A],
5253
tick: TickOption[F, S],
53-
): KeyFlowOf[F, S, A] = { (key, context, persistence, timers, additionalPersist, registry) =>
54-
implicit val _context = context
55-
timerFlowOf(context, persistence, timers) flatMap { timerFlow =>
56-
KeyFlow.of(key, fold, tick, persistence, additionalPersist, timerFlow, registry)
54+
): KeyFlowOf[F, S, A] = new KeyFlowOf[F, S, A] {
55+
override def apply(
56+
key: KafkaKey,
57+
context: KeyContext[F],
58+
persistence: Persistence[F, S, A],
59+
timers: TimerContext[F],
60+
additionalPersist: AdditionalStatePersist[F, S, A],
61+
registry: EntityRegistry[F, KafkaKey, S]
62+
)(implicit logOf: LogOf[F]): Resource[F, KeyFlow[F, A]] = {
63+
implicit val _context = context
64+
timerFlowOf(context, persistence, timers) flatMap { timerFlow =>
65+
KeyFlow.of(key, fold, tick, persistence, additionalPersist, timerFlow, registry)
66+
}
5767
}
5868
}
59-
6069
}

core/src/main/scala/com/evolutiongaming/kafka/flow/KeyStateOf.scala

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package com.evolutiongaming.kafka.flow
33
import cats.Applicative
44
import cats.effect.{Resource, Sync}
55
import cats.syntax.all.*
6+
import com.evolutiongaming.catshelper.LogOf
67
import com.evolutiongaming.kafka.flow.key.KeysOf
78
import com.evolutiongaming.kafka.flow.persistence.{PersistenceOf, SnapshotPersistenceOf}
89
import com.evolutiongaming.kafka.flow.registry.EntityRegistry
@@ -20,7 +21,7 @@ trait KeyStateOf[F[_]] { self =>
2021
key: String,
2122
createdAt: Timestamp,
2223
context: KeyContext[F]
23-
): Resource[F, KeyState[F, ConsumerRecord[String, ByteVector]]]
24+
)(implicit logOf: LogOf[F]): Resource[F, KeyState[F, ConsumerRecord[String, ByteVector]]]
2425

2526
/** Restores a state for all keys present in persistence.
2627
*
@@ -71,7 +72,9 @@ object KeyStateOf {
7172
registry: EntityRegistry[F, KafkaKey, S],
7273
): KeyStateOf[F] = new KeyStateOf[F] {
7374

74-
def apply(topicPartition: TopicPartition, key: String, createdAt: Timestamp, context: KeyContext[F]) = {
75+
def apply(topicPartition: TopicPartition, key: String, createdAt: Timestamp, context: KeyContext[F])(
76+
implicit logOf: LogOf[F]
77+
) = {
7578
implicit val _context = context
7679
val kafkaKey = KafkaKey(
7780
applicationId = applicationId,
@@ -217,7 +220,9 @@ object KeyStateOf {
217220
registry: EntityRegistry[F, KafkaKey, S],
218221
): KeyStateOf[F] = new KeyStateOf[F] {
219222

220-
def apply(topicPartition: TopicPartition, key: String, createdAt: Timestamp, context: KeyContext[F]) = {
223+
def apply(topicPartition: TopicPartition, key: String, createdAt: Timestamp, context: KeyContext[F])(
224+
implicit logOf: LogOf[F]
225+
) = {
221226
val kafkaKey = KafkaKey(
222227
applicationId = applicationId,
223228
groupId = groupId,

core/src/main/scala/com/evolutiongaming/kafka/flow/PartitionFlow.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ object PartitionFlow {
6666
}
6767
}
6868

69-
def of[F[_]: Async](
69+
def of[F[_]: Async: LogOf](
7070
topicPartition: TopicPartition,
7171
assignedAt: Offset,
7272
keyStateOf: KeyStateOf[F],
@@ -97,7 +97,7 @@ object PartitionFlow {
9797
} yield flow
9898

9999
// TODO: put most `Ref` variables into one state class?
100-
def of[F[_]: Async](
100+
def of[F[_]: Async: LogOf](
101101
topicPartition: TopicPartition,
102102
keyStateOf: KeyStateOf[F],
103103
committedOffset: Ref[F, Offset],
@@ -116,7 +116,8 @@ object PartitionFlow {
116116
for {
117117
context <- KeyContext.resource[F](
118118
removeFromCache = cache.remove(key).flatten.void,
119-
log = log.prefixed(key)
119+
log = log.prefixed(key),
120+
key = key
120121
)
121122
keyState <- keyStateOf(topicPartition, key, createdAt, context)
122123
} yield PartitionKey(keyState, context)

0 commit comments

Comments
 (0)