Skip to content

Commit ed7716b

Browse files
authored
Improve handling of pending commits (#1368)
1 parent 18159a4 commit ed7716b

File tree

2 files changed

+2
-2
lines changed

2 files changed

+2
-2
lines changed

modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ final private[kafka] class KafkaConsumerActor[F[_], K, V](
9393
private[this] def commit(request: Request.Commit[F]): F[Unit] =
9494
ref.flatModify { state =>
9595
val commitF = commitAsync(request.offsets, request.callback)
96-
if (state.rebalancing) {
96+
if (state.rebalancing || state.pendingCommits.nonEmpty) {
9797
val newState =
9898
state.withPendingCommit(commitF >> logging.log(CommittedPendingCommit(request)))
9999
(newState, logging.log(StoredPendingCommit(request, newState)))

modules/core/src/main/scala/fs2/kafka/internal/LogEntry.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ private[kafka] object LogEntry {
145145

146146
override def level: LogLevel = Debug
147147

148-
override def message: String = s"Committed pending commits [$pendingCommit]."
148+
override def message: String = s"Committed pending commit [$pendingCommit]."
149149

150150
}
151151

0 commit comments

Comments
 (0)