@@ -6,6 +6,7 @@ import cats.effect.kernel.Resource.ExitCase
66import cats .syntax .all .*
77import com .evolutiongaming .kafka .flow .KeyContext
88import com .evolutiongaming .kafka .flow .persistence .FlushBuffers
9+ import com .evolutiongaming .skafka .Offset
910
1011import scala .concurrent .duration .*
1112
@@ -56,8 +57,9 @@ object TimerFlowOf {
5657 expiredAt = touchedAt.clock plusMillis maxIdle.toMillis
5758 expired = current.clock isAfter expiredAt
5859 offsetDifference = current.offset.value - touchedAt.offset.value
60+ canUnload = expired || offsetDifference > maxOffsetDifference
5961 _ <-
60- if (expired || offsetDifference > maxOffsetDifference ) {
62+ if (canUnload ) {
6163 context.log.info(s " flush, offset difference: $offsetDifference" ) *>
6264 persistence.flush *>
6365 context.remove
@@ -76,7 +78,7 @@ object TimerFlowOf {
7678
7779 /** Performs flush periodically.
7880 *
79- * The flush will be called every `persitEvery ` FiniteDuration.
81+ * The flush will be called every `persistEvery ` FiniteDuration.
8082 *
8183 * Note that using `ignorePersistErrors` can cause the persisted state to become inconsistent with the committed
8284 * offset. For example, if 9 out of 10 snapshots were persisted successfully but the last persisting fails, no new
@@ -117,23 +119,14 @@ object TimerFlowOf {
117119 persistedAt <- timers.persistedAt
118120 flushedAt = persistedAt getOrElse committedAt
119121 triggerFlushAt = flushedAt.clock plusMillis persistEvery.toMillis
120- _ <-
121- MonadThrow [F ].whenA((current.clock compareTo triggerFlushAt) >= 0 ) {
122- persistence.flush.attempt.flatMap {
123- case Left (err) if ignorePersistErrors =>
124- // 'context' will continue holding the previous offset from the last time the state was persisted
125- // and offsets committed (or just the last committed offset if no state has ever been persisted before).
126- // Thus, when calculating the next offset to commit in `PartitionFlow#offsetToCommit` it will take
127- // the minimal one (previous) and won't commit any offsets
128- context
129- .log
130- .info(s " Failed to persist state, the error is ignored and offsets won't be committed, error: $err" )
131- case Left (err) =>
132- err.raiseError[F , Unit ]
133- case Right (_) =>
134- context.hold(current.offset)
135- }
136- }
122+ canPersist = (current.clock compareTo triggerFlushAt) >= 0
123+ _ <- MonadThrow [F ].whenA(canPersist)(
124+ persistence.attemptToPersist(
125+ ignorePersistErrors = ignorePersistErrors,
126+ context = context,
127+ currentOffset = current.offset
128+ )
129+ )
137130 _ <- register(current)
138131 } yield ()
139132 }
@@ -145,6 +138,73 @@ object TimerFlowOf {
145138
146139 }
147140
141+ /** Combines [[unloadOrphaned ]] with [[persistPeriodically ]] in a single TimerFlow
142+ *
143+ * @param fireEvery
144+ * the interval at which `onTimer` triggers
145+ * @param persistEvery
146+ * the interval at which the state will be persisted
147+ * @param maxOffsetDifference
148+ * How many events could have happened without updates to the state before persist is initiated.
149+ * @param maxIdle
150+ * How long since the state was recovered, or last record was processed should pass before persist is initiated.
151+ * @param flushOnRevoke
152+ * controls whether persistence flushing should happen on partition revocation
153+ * @param ignorePersistErrors
154+ * if true, a failure to persist the state will not fail the computation. Instead, an error message will be logged
155+ * and a new offset for the key will not be `held`, so as a result no new offset will be committed for the
156+ * partition.
157+ */
158+ def persistPeriodicallyAndUnloadOrphaned [F [_]: MonadThrow ](
159+ fireEvery : FiniteDuration = 1 .minute,
160+ persistEvery : FiniteDuration = 1 .minute,
161+ maxOffsetDifference : Int = 100000 ,
162+ maxIdle : FiniteDuration = 10 .minutes,
163+ flushOnRevoke : Boolean = false ,
164+ ignorePersistErrors : Boolean = false ,
165+ ): TimerFlowOf [F ] = (context, persistence, timers) => {
166+ def register (touchedAt : Timestamp ): F [Unit ] =
167+ timers.registerProcessing(touchedAt.clock plusMillis fireEvery.toMillis)
168+
169+ val acquire : Resource [F , TimerFlow [F ]] = Resource .eval {
170+ for {
171+ current <- timers.current
172+ persistedAt <- timers.persistedAt
173+ committedAt = persistedAt getOrElse current
174+ _ <- context.hold(committedAt.offset)
175+ _ <- register(committedAt)
176+ } yield new TimerFlow [F ] {
177+ def onTimer : F [Unit ] = for {
178+ current <- timers.current
179+ processedAt <- timers.processedAt
180+ touchedAt = processedAt getOrElse committedAt
181+ expiredAt = touchedAt.clock plusMillis maxIdle.toMillis
182+ offsetDifference = current.offset.value - touchedAt.offset.value
183+ flushedAt = persistedAt getOrElse committedAt
184+ triggerFlushAt = flushedAt.clock plusMillis persistEvery.toMillis
185+ expired = current.clock isAfter expiredAt
186+ canUnload = expired || offsetDifference > maxOffsetDifference
187+ canPersist = (current.clock compareTo triggerFlushAt) >= 0
188+ _ <- Applicative [F ].whenA(canPersist || canUnload)(
189+ persistence.attemptToPersist(
190+ ignorePersistErrors = ignorePersistErrors,
191+ context = context,
192+ currentOffset = current.offset
193+ )
194+ )
195+ _ <- Applicative [F ].whenA(canUnload)(
196+ context.log.info(s " flush, offset difference: $offsetDifference" ) *> context.remove
197+ )
198+ _ <- register(current)
199+ } yield ()
200+ }
201+ }
202+
203+ val cancel = flushOnCancel.apply(context, persistence, timers)
204+
205+ if (flushOnRevoke) acquire <* cancel else acquire
206+ }
207+
148208 /** Performs flush when `Resource` is cancelled only */
149209 def flushOnCancel [F [_]: Monad ]: TimerFlowOf [F ] = { (context, persistence, _) =>
150210 val cancel = context.holding flatMap { holding =>
@@ -168,4 +228,19 @@ object TimerFlowOf {
168228 }
169229 }
170230
231+ private implicit class AttemptToPersist [F [_]: MonadThrow ](persistence : FlushBuffers [F ]) {
232+ def attemptToPersist (ignorePersistErrors : Boolean , context : KeyContext [F ], currentOffset : Offset ): F [Unit ] =
233+ persistence.flush.attempt.flatMap {
234+ case Left (err) if ignorePersistErrors =>
235+ // 'context' will continue holding the previous offset from the last time the state was persisted
236+ // and offsets committed (or just the last committed offset if no state has ever been persisted before).
237+ // Thus, when calculating the next offset to commit in `PartitionFlow#offsetToCommit` it will take
238+ // the minimal one (previous) and won't commit any offsets
239+ context
240+ .log
241+ .info(s " Failed to persist state, the error is ignored and offsets won't be committed, error: $err" )
242+ case Left (err) => err.raiseError[F , Unit ]
243+ case Right (_) => context.hold(currentOffset)
244+ }
245+ }
171246}
0 commit comments