Skip to content

Commit cf7e0c9

Browse files
authored
Fix spillover records reset after poll loop (#1398)
1 parent 3627d96 commit cf7e0c9

File tree

1 file changed

+20
-23
lines changed

1 file changed

+20
-23
lines changed

modules/core/src/main/scala/fs2/kafka/internal/KafkaConsumerActor.scala

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -450,31 +450,28 @@ private[kafka] object KafkaConsumerActor {
450450
*/
451451
def resetSpilloverAfterPoll(
452452
spillover: Map[TopicPartition, Chunk[CommittableConsumerRecord[F, K, V]]]
453-
): State[F, K, V] =
454-
if (spillover.isEmpty)
455-
this
456-
else {
457-
require(spillover.forall(kv => partitionState.contains(kv._1)))
458-
459-
val newPartitionState: PartitionStateMap[F, K, V] = partitionState.map {
460-
case (partition, partitionState) =>
461-
(
462-
partition,
463-
spillover
464-
.get(partition)
465-
.map(spillover => partitionState.copy(spillover = spillover))
466-
.getOrElse(
467-
if (partitionState.spillover.isEmpty)
468-
partitionState
469-
else
470-
partitionState.copy(spillover = Chunk.empty)
471-
)
472-
)
473-
}
474-
475-
copy(partitionState = newPartitionState)
453+
): State[F, K, V] = {
454+
require(spillover.forall(kv => partitionState.contains(kv._1)))
455+
456+
val newPartitionState: PartitionStateMap[F, K, V] = partitionState.map {
457+
case (partition, partitionState) =>
458+
(
459+
partition,
460+
spillover
461+
.get(partition)
462+
.map(spillover => partitionState.copy(spillover = spillover))
463+
.getOrElse(
464+
if (partitionState.spillover.isEmpty)
465+
partitionState
466+
else
467+
partitionState.copy(spillover = Chunk.empty)
468+
)
469+
)
476470
}
477471

472+
copy(partitionState = newPartitionState)
473+
}
474+
478475
/**
479476
* Resets pending commits after a poll operation.
480477
*

0 commit comments

Comments
 (0)