@@ -2,8 +2,11 @@ package com.evolutiongaming.kafka.flow.timer
22
33import cats .{Applicative , Monad , MonadThrow }
44import cats .effect .Resource
5+ import cats .effect .syntax .all .*
56import cats .effect .kernel .Resource .ExitCase
67import cats .syntax .all .*
8+ import com .evolutiongaming .catshelper .Log .Mdc
9+ import com .evolutiongaming .catshelper .LogOf
710import com .evolutiongaming .kafka .flow .KeyContext
811import com .evolutiongaming .kafka .flow .persistence .FlushBuffers
912
@@ -15,8 +18,7 @@ trait TimerFlowOf[F[_]] {
1518 context : KeyContext [F ],
1619 persistence : FlushBuffers [F ],
1720 timers : TimerContext [F ]
18- ): Resource [F , TimerFlow [F ]]
19-
21+ )(implicit logOf : LogOf [F ]): Resource [F , TimerFlow [F ]]
2022}
2123object TimerFlowOf {
2224
@@ -37,43 +39,48 @@ object TimerFlowOf {
3739 maxOffsetDifference : Int = 100000 ,
3840 maxIdle : FiniteDuration = 10 .minutes,
3941 flushOnRevoke : Boolean = false ,
40- ): TimerFlowOf [F ] = { (context, persistence, timers) =>
41- def register (touchedAt : Timestamp ) =
42- timers.registerProcessing(touchedAt.clock plusMillis fireEvery.toMillis)
43-
44- val acquire = Resource .eval {
45- for {
46- current <- timers.current
47- persistedAt <- timers.persistedAt
48- committedAt = persistedAt getOrElse current
49- _ <- context.hold(committedAt.offset)
50- _ <- register(committedAt)
51- } yield new TimerFlow [F ] {
52- def onTimer = for {
53- current <- timers.current
54- processedAt <- timers.processedAt
55- touchedAt = processedAt getOrElse committedAt
56- expiredAt = touchedAt.clock plusMillis maxIdle.toMillis
57- expired = current.clock isAfter expiredAt
58- offsetDifference = current.offset.value - touchedAt.offset.value
59- _ <-
60- if (expired || offsetDifference > maxOffsetDifference) {
61- context.log.info(s " flush, offset difference: $offsetDifference" ) *>
62- persistence.flush *>
63- context.remove
64- } else {
65- register(touchedAt)
66- }
67- } yield ()
42+ ): TimerFlowOf [F ] =
43+ new TimerFlowOf [F ] {
44+ override def apply (context : KeyContext [F ], persistence : FlushBuffers [F ], timers : TimerContext [F ])(
45+ implicit logOf : LogOf [F ]
46+ ): Resource [F , TimerFlow [F ]] = {
47+ def register (touchedAt : Timestamp ) =
48+ timers.registerProcessing(touchedAt.clock plusMillis fireEvery.toMillis)
49+
50+ val acquire = Resource .eval {
51+ for {
52+ log <- logOf(classOf [TimerFlowOf [F ]]).map(_.withMdc(Mdc .Eager (" key" -> context.key)))
53+ current <- timers.current
54+ persistedAt <- timers.persistedAt
55+ committedAt = persistedAt getOrElse current
56+ _ <- context.hold(committedAt.offset)
57+ _ <- register(committedAt)
58+ } yield new TimerFlow [F ] {
59+ def onTimer = for {
60+ current <- timers.current
61+ processedAt <- timers.processedAt
62+ touchedAt = processedAt getOrElse committedAt
63+ expiredAt = touchedAt.clock plusMillis maxIdle.toMillis
64+ expired = current.clock isAfter expiredAt
65+ offsetDifference = current.offset.value - touchedAt.offset.value
66+ _ <-
67+ if (expired || offsetDifference > maxOffsetDifference) {
68+ log.info(s " flush, offset difference: $offsetDifference" ) *>
69+ persistence.flush *>
70+ context.remove
71+ } else {
72+ register(touchedAt)
73+ }
74+ } yield ()
75+ }
76+ }
77+
78+ val cancel = flushOnCancel.apply(context, persistence, timers)
79+
80+ if (flushOnRevoke) acquire <* cancel else acquire
6881 }
6982 }
7083
71- val cancel = flushOnCancel.apply(context, persistence, timers)
72-
73- if (flushOnRevoke) acquire <* cancel else acquire
74-
75- }
76-
7784 /** Performs flush periodically.
7885 *
7986 * The flush will be called every `persitEvery` FiniteDuration.
@@ -100,72 +107,80 @@ object TimerFlowOf {
100107 persistEvery : FiniteDuration = 1 .minute,
101108 flushOnRevoke : Boolean = false ,
102109 ignorePersistErrors : Boolean = false ,
103- ): TimerFlowOf [F ] = { (context, persistence, timers) =>
104- def register (current : Timestamp ): F [Unit ] =
105- timers.registerProcessing(current.clock plusMillis fireEvery.toMillis)
106-
107- val acquire = Resource .eval {
108- for {
109- current <- timers.current
110- persistedAt <- timers.persistedAt
111- committedAt = persistedAt getOrElse current
112- _ <- context.hold(committedAt.offset)
113- _ <- register(current)
114- } yield new TimerFlow [F ] {
115- def onTimer : F [Unit ] = for {
116- current <- timers.current
117- persistedAt <- timers.persistedAt
118- flushedAt = persistedAt getOrElse committedAt
119- triggerFlushAt = flushedAt.clock plusMillis persistEvery.toMillis
120- _ <-
121- MonadThrow [F ].whenA((current.clock compareTo triggerFlushAt) >= 0 ) {
122- persistence.flush.attempt.flatMap {
123- case Left (err) if ignorePersistErrors =>
124- // 'context' will continue holding the previous offset from the last time the state was persisted
125- // and offsets committed (or just the last committed offset if no state has ever been persisted before).
126- // Thus, when calculating the next offset to commit in `PartitionFlow#offsetToCommit` it will take
127- // the minimal one (previous) and won't commit any offsets
128- context
129- .log
130- .info(s " Failed to persist state, the error is ignored and offsets won't be committed, error: $err" )
131- case Left (err) =>
132- err.raiseError[F , Unit ]
133- case Right (_) =>
134- context.hold(current.offset)
110+ ): TimerFlowOf [F ] = new TimerFlowOf [F ] {
111+ override def apply (context : KeyContext [F ], persistence : FlushBuffers [F ], timers : TimerContext [F ])(
112+ implicit logOf : LogOf [F ]
113+ ): Resource [F , TimerFlow [F ]] = {
114+ def register (current : Timestamp ): F [Unit ] =
115+ timers.registerProcessing(current.clock plusMillis fireEvery.toMillis)
116+
117+ val acquire = Resource .eval {
118+ for {
119+ current <- timers.current
120+ persistedAt <- timers.persistedAt
121+ committedAt = persistedAt getOrElse current
122+ _ <- context.hold(committedAt.offset)
123+ _ <- register(current)
124+ } yield new TimerFlow [F ] {
125+ def onTimer : F [Unit ] = for {
126+ log <- logOf(classOf [TimerFlowOf [F ]]).map(_.withMdc(Mdc .Eager (" key" -> context.key)))
127+ current <- timers.current
128+ persistedAt <- timers.persistedAt
129+ flushedAt = persistedAt getOrElse committedAt
130+ triggerFlushAt = flushedAt.clock plusMillis persistEvery.toMillis
131+ _ <-
132+ MonadThrow [F ].whenA((current.clock compareTo triggerFlushAt) >= 0 ) {
133+ persistence.flush.attempt.flatMap {
134+ case Left (err) if ignorePersistErrors =>
135+ // 'context' will continue holding the previous offset from the last time the state was persisted
136+ // and offsets committed (or just the last committed offset if no state has ever been persisted before).
137+ // Thus, when calculating the next offset to commit in `PartitionFlow#offsetToCommit` it will take
138+ // the minimal one (previous) and won't commit any offsets
139+ log.info(
140+ s " Failed to persist state, the error is ignored and offsets won't be committed, error: $err"
141+ )
142+ case Left (err) =>
143+ err.raiseError[F , Unit ]
144+ case Right (_) =>
145+ context.hold(current.offset)
146+ }
135147 }
136- }
137- _ <- register(current )
138- } yield ()
148+ _ <- register(current)
149+ } yield ( )
150+ }
139151 }
140- }
141-
142- val cancel = flushOnCancel.apply(context, persistence, timers)
143152
144- if (flushOnRevoke) acquire <* cancel else acquire
153+ val cancel = flushOnCancel.apply(context, persistence, timers)
145154
155+ if (flushOnRevoke) acquire <* cancel else acquire
156+ }
146157 }
147158
148159 /** Performs flush when `Resource` is cancelled only */
149- def flushOnCancel [F [_]: Monad ]: TimerFlowOf [F ] = { (context, persistence, _) =>
150- val cancel = context.holding flatMap { holding =>
151- Applicative [F ].whenA(holding.isDefined) {
152- context.log.info(s " flush on revoke, holding offset: $holding" ) *>
153- persistence.flush *>
154- context.remove
160+ def flushOnCancel [F [_]: Monad ]: TimerFlowOf [F ] = new TimerFlowOf [F ] {
161+ override def apply (context : KeyContext [F ], persistence : FlushBuffers [F ], timers : TimerContext [F ])(
162+ implicit logOf : LogOf [F ]
163+ ): Resource [F , TimerFlow [F ]] =
164+ logOf(classOf [TimerFlowOf [F ]]).toResource.map(_.withMdc(Mdc .Eager (" key" -> context.key))).flatMap { log =>
165+ val cancel = context.holding flatMap { holding =>
166+ Applicative [F ].whenA(holding.isDefined) {
167+ log.info(s " flush on revoke, holding offset: $holding" ) *>
168+ persistence.flush *>
169+ context.remove
170+ }
171+ }
172+
173+ Resource .makeCase(TimerFlow .empty.pure) {
174+ case (_, ExitCase .Succeeded ) =>
175+ cancel
176+ case (_, ExitCase .Canceled ) =>
177+ cancel
178+ // there is no point to try flushing if it failed with an error
179+ // the state might not be consistend and storage not accessible
180+ // plus this is a concurrent operation, and we do not want anything
181+ // to happen concurrently for a specific key
182+ case (_, _) => ().pure[F ]
183+ }
155184 }
156- }
157-
158- Resource .makeCase(TimerFlow .empty.pure) {
159- case (_, ExitCase .Succeeded ) =>
160- cancel
161- case (_, ExitCase .Canceled ) =>
162- cancel
163- // there is no point to try flushing if it failed with an error
164- // the state might not be consistend and storage not accessible
165- // plus this is a concurrent operation, and we do not want anything
166- // to happen concurrently for a specific key
167- case (_, _) => ().pure[F ]
168- }
169185 }
170-
171186}
0 commit comments