Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import cats.effect.kernel.Resource.ExitCase
import cats.syntax.all.*
import com.evolutiongaming.kafka.flow.KeyContext
import com.evolutiongaming.kafka.flow.persistence.FlushBuffers
import com.evolutiongaming.skafka.Offset

import scala.concurrent.duration.*

Expand Down Expand Up @@ -56,8 +57,9 @@ object TimerFlowOf {
expiredAt = touchedAt.clock plusMillis maxIdle.toMillis
expired = current.clock isAfter expiredAt
offsetDifference = current.offset.value - touchedAt.offset.value
canUnload = expired || offsetDifference > maxOffsetDifference
_ <-
if (expired || offsetDifference > maxOffsetDifference) {
if (canUnload) {
context.log.info(s"flush, offset difference: $offsetDifference") *>
persistence.flush *>
context.remove
Expand All @@ -76,7 +78,7 @@ object TimerFlowOf {

/** Performs flush periodically.
*
* The flush will be called every `persitEvery` FiniteDuration.
* The flush will be called every `persistEvery` FiniteDuration.
*
* Note that using `ignorePersistErrors` can cause the persisted state to become inconsistent with the committed
* offset. For example, if 9 out of 10 snapshots were persisted successfully but the last persisting fails, no new
Expand Down Expand Up @@ -117,23 +119,14 @@ object TimerFlowOf {
persistedAt <- timers.persistedAt
flushedAt = persistedAt getOrElse committedAt
triggerFlushAt = flushedAt.clock plusMillis persistEvery.toMillis
_ <-
MonadThrow[F].whenA((current.clock compareTo triggerFlushAt) >= 0) {
persistence.flush.attempt.flatMap {
case Left(err) if ignorePersistErrors =>
// 'context' will continue holding the previous offset from the last time the state was persisted
// and offsets committed (or just the last committed offset if no state has ever been persisted before).
// Thus, when calculating the next offset to commit in `PartitionFlow#offsetToCommit` it will take
// the minimal one (previous) and won't commit any offsets
context
.log
.info(s"Failed to persist state, the error is ignored and offsets won't be committed, error: $err")
case Left(err) =>
err.raiseError[F, Unit]
case Right(_) =>
context.hold(current.offset)
}
}
canPersist = (current.clock compareTo triggerFlushAt) >= 0
_ <- MonadThrow[F].whenA(canPersist)(
persistence.attemptToPersist(
ignorePersistErrors = ignorePersistErrors,
context = context,
currentOffset = current.offset
)
)
_ <- register(current)
} yield ()
}
Expand All @@ -145,6 +138,73 @@ object TimerFlowOf {

}

/** Combines [[unloadOrphaned]] with [[persistPeriodically]] in a single TimerFlow
*
* @param fireEvery
* the interval at which `onTimer` triggers
* @param persistEvery
* the interval at which the state will be persisted
* @param maxOffsetDifference
* How many events could have happened without updates to the state before persist is initiated.
* @param maxIdle
* How long since the state was recovered, or last record was processed should pass before persist is initiated.
* @param flushOnRevoke
* controls whether persistence flushing should happen on partition revocation
* @param ignorePersistErrors
* if true, a failure to persist the state will not fail the computation. Instead, an error message will be logged
* and a new offset for the key will not be `held`, so as a result no new offset will be committed for the
* partition.
*/
def persistPeriodicallyAndUnloadOrphaned[F[_]: MonadThrow](
fireEvery: FiniteDuration = 1.minute,
persistEvery: FiniteDuration = 1.minute,
maxOffsetDifference: Int = 100000,
maxIdle: FiniteDuration = 10.minutes,
flushOnRevoke: Boolean = false,
ignorePersistErrors: Boolean = false,
): TimerFlowOf[F] = (context, persistence, timers) => {
def register(touchedAt: Timestamp): F[Unit] =
timers.registerProcessing(touchedAt.clock plusMillis fireEvery.toMillis)

val acquire: Resource[F, TimerFlow[F]] = Resource.eval {
for {
current <- timers.current
persistedAt <- timers.persistedAt
committedAt = persistedAt getOrElse current
_ <- context.hold(committedAt.offset)
_ <- register(committedAt)
} yield new TimerFlow[F] {
def onTimer: F[Unit] = for {
current <- timers.current
processedAt <- timers.processedAt
touchedAt = processedAt getOrElse committedAt
expiredAt = touchedAt.clock plusMillis maxIdle.toMillis
offsetDifference = current.offset.value - touchedAt.offset.value
flushedAt = persistedAt getOrElse committedAt
triggerFlushAt = flushedAt.clock plusMillis persistEvery.toMillis
expired = current.clock isAfter expiredAt
canUnload = expired || offsetDifference > maxOffsetDifference
canPersist = (current.clock compareTo triggerFlushAt) >= 0
_ <- Applicative[F].whenA(canPersist || canUnload)(
persistence.attemptToPersist(
ignorePersistErrors = ignorePersistErrors,
context = context,
currentOffset = current.offset
)
)
_ <- Applicative[F].whenA(canUnload)(
context.log.info(s"flush, offset difference: $offsetDifference") *> context.remove
)
_ <- register(current)
} yield ()
}
}

val cancel = flushOnCancel.apply(context, persistence, timers)

if (flushOnRevoke) acquire <* cancel else acquire
}

/** Performs flush when `Resource` is cancelled only */
def flushOnCancel[F[_]: Monad]: TimerFlowOf[F] = { (context, persistence, _) =>
val cancel = context.holding flatMap { holding =>
Expand All @@ -168,4 +228,19 @@ object TimerFlowOf {
}
}

private implicit class AttemptToPersist[F[_]: MonadThrow](persistence: FlushBuffers[F]) {
def attemptToPersist(ignorePersistErrors: Boolean, context: KeyContext[F], currentOffset: Offset): F[Unit] =
persistence.flush.attempt.flatMap {
case Left(err) if ignorePersistErrors =>
// 'context' will continue holding the previous offset from the last time the state was persisted
// and offsets committed (or just the last committed offset if no state has ever been persisted before).
// Thus, when calculating the next offset to commit in `PartitionFlow#offsetToCommit` it will take
// the minimal one (previous) and won't commit any offsets
context
.log
.info(s"Failed to persist state, the error is ignored and offsets won't be committed, error: $err")
case Left(err) => err.raiseError[F, Unit]
case Right(_) => context.hold(currentOffset)
}
}
}
Loading
Loading