Skip to content

Consumption stopped with slow consumer #386

@abialas

Description

@abialas

I have a simple but slow consumer which consumes 1 record at time:

  private Flux<Void> consumeRecords(Flux<ReceiverRecord<String, Value>> records) {
    return records
        .concatMap(receiverRecord -> handleReceivedRecord(receiverRecord))
        .concatMap(this::ackOffset)
        .doOnEach(logOnError(logErrorReceiveRecord()))
        .retryWhen(RETRY_FAILED_CONSUMER_SPEC);
  }

Processing time of method handleReceivedRecord is less than 500ms. I understand this consumer is slow and needs to be fixed (because of concurrency).
However, in my test I produce just about 3000 records in 1 minute to the topic the above consumer is consuming from. Initially it consumes fine but after some time I see consumer is not consuming anymore. There is no error log or similar.

In the logs I see such messages:

Rebalance during back pressure, re-pausing new assignments
Rebalancing; waiting for 104 records in pipeline

and I have to restart consumer instance to fix this. It is also worth to mention that when I disable scaling up of consumers it works fine.

Expected Behavior

Consuming records from topic should not stop.

Actual Behavior

Consuming records from topic is stuck and restart is required.

Your Environment

  • Reactor Core: 3.5.10
  • Reactor Kafka: 1.3.18
  • JVM version (java -version): 21.0.2

Metadata

Metadata

Assignees

No one assigned

    Labels

    ❓need-triageThis issue needs triage, hasn't been looked at by a team member yet

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions