-
Notifications
You must be signed in to change notification settings - Fork 238
Description
Expected Behavior
When rebalancing occurs during a commit and all offset commit retries are exhausted, the Reactor Kafka library should poll again and process uncommitted messages. The Kafka consumer should not be dropped and should continue processing the next batch of messages.
Actual Behavior
We have a distributed system with a Kafka topic containing 200 partitions and consumers. Due to network issues, latency, or other reasons, rebalancing may be triggered. If an offset commit fails during rebalancing and the rebalancing continues beyond the retry period, Kafka consumers are removed from the consumer group.
I reviewed the Reactor Kafka library and found that the asyncCleanup in the withHandler method stops the Consumer Event Loop. In a non-reactive Kafka consumer implementation, there is usually an infinite loop for poll(), where exceptions are caught, and the consumer continues to process the next set of messages. However, in reactive Kafka, the consumer event loop itself gets closed.
I have used repeat, retry workaround and increased offset commit retry attempts, but still it is not working.
DefaultKafkaReceiver.java
private <T> Flux<T> withHandler(AckMode ackMode, BiFunction<Scheduler, ConsumerHandler<K, V>, Flux<T>> function) {
return Flux.usingWhen(
Mono.fromCallable(() -> {
ConsumerHandler<K, V> consumerHandler = new ConsumerHandler<>(
receiverOptions,
consumerFactory.createConsumer(receiverOptions),
// Always use the currently set value
e -> isRetriableException.test(e),
ackMode
);
consumerHandlerRef.set(consumerHandler);
return consumerHandler;
}),
handler -> Flux.using(
() -> Schedulers.single(receiverOptions.schedulerSupplier().get()),
scheduler -> function.apply(scheduler, handler),
Scheduler::dispose
),
**handler -> handler.close().doFinally(__ -> consumerHandlerRef.compareAndSet(handler, null))**
);
}
ConsumerHandler.java
public Mono<Void> close() {
if (consumerListener != null) {
consumerListener.consumerRemoved(consumerId, consumer);
}
**return consumerEventLoop.stop().doFinally(__ -> eventScheduler.dispose());**
}
Steps to Reproduce
private Disposable poll(KafkaReceiver<String, String> receiver) {
return receiver.receive()
.publishOn(elasticScheduler)
.doOnError(KafkaConsumer::logEventConsumptionFailure)
.retryWhen(getRetryStrategy())
.onErrorResume(KafkaConsumer::handleErrorOnEventConsumption)
.flatMap(this::toKafkaEvent)
.flatMap(kafkaEvent ->
stream.handleEvent(kafkaEvent)
.switchIfEmpty(Mono.defer(() -> Mono.just(kafkaEvent)))
, steamConfig.getConsumerParallelism)
.flatMap(this::commitKafkaOffset, steamConfig.getConsumerParallelism)
.doOnTerminate(()-> {
isConsumerActive = false;
log.error("Kafka consumer got terminated");})
.repeat()
.subscribe();
}
private Mono<KafkaEvent> commitKafkaOffset(KafkaEvent kafkaEvent) {
return Mono.just(kafkaEvent)
.doOnNext(action -> logOffsetCommit(kafkaEvent))
.flatMap(event -> kafkaEvent.getMessageRecord().receiverOffset().commit())
.retryWhen(Retry.backoff(steamConfig.maxCommitAttempts, Duration.ofSeconds(1)).transientErrors(true))
.doOnError(exception -> logOffsetCommitFailure(kafkaEvent, (Exception) exception))
.onErrorResume(exception -> Mono.empty())
.then(Mono.just(kafkaEvent))
.doOnNext(action -> logSuccessfulOffsetCommit(kafkaEvent));
}
private static void logEventConsumptionFailure(Throwable error) {
log.error("Failed to consume events from topic {}, {}, {}",
keyValue(Constant.ERROR, error),
keyValue(Constant.PHASE, "CONSUMER_ERROR"),
keyValue(Constant.EXCEPTION, LoggingUtil.formatExceptionAsMapForLogging(error)));
}Kafka Properties
kafka.session.timeout.ms=300000
kafka.heartbeat.interval.ms=30000
kafka.request.timeout.ms=180000
kafka.max.poll.records=500
kafka.max.poll.interval.ms=300000
kafka consumer retry config
max.commit.attempts=200
commit.retry.interval=5000
max.delay.rebalance.ms=240000
Logs
WARN reactor.kafka.receiver.internals.ConsumerEventLoop.ConsumerEventLoop$CommitEvent.handleFailure:497 - Commit failed
org.apache.kafka.common.errors.RebalanceInProgressException: Offset commit cannot be completed since the consumer is undergoing a rebalance for auto partition assignment. You can try completing the rebalance by calling poll() and then retry the operation.
INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ConsumerCoordinator.invokePartitionsLost:366 - [Consumer kafka-consumer-app, groupId=consumer-group-1] Lost previously assigned partitions kafka-topic-95
INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ConsumerCoordinator.invokePartitionsLost:370 - [Consumer kafka-consumer-app, groupId=consumer-group-1] The pause flag in partitions [kafka-topic-95] will be removed due to partition lost.
INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.AbstractCoordinator.maybeLeaveGroup:1133 - [Consumer kafka-consumer-app, groupId=consumer-group-1] Member kafka-consumer-app-62d7b238-3ecd-4631-9103-ffe8420e23e2 sending LeaveGroup request to coordinator kafka-broker-host:9092 (id: 2039073794 rack: null) due to the consumer is being closed
INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.AbstractCoordinator.resetStateAndGeneration:1025 - [Consumer kafka-consumer-app, groupId=consumer-group-1] Resetting generation and member id due to: consumer pro-actively leaving the group
INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.AbstractCoordinator.requestRejoin:1072 - [Consumer kafka-consumer-app, groupId=consumer-group-1] Request joining group due to: consumer pro-actively leaving the group
org.apache.kafka.common.metrics.Metrics.Metrics.close:693 - Metrics scheduler closed
INFO org.apache.kafka.common.metrics.Metrics.Metrics.close:697 - Closing reporter org.apache.kafka.common.metrics.JmxReporter
INFO org.apache.kafka.common.metrics.Metrics.Metrics.close:703 - Metrics reporters closed
INFO org.apache.kafka.common.utils.AppInfoParser.AppInfoParser.unregisterAppInfo:83 - App info kafka.consumer for kafka-consumer-app unregistered
Possible Solution
The Consumer Event Loop should not be closed during cleanup. Instead, it should continue polling for messages.
Your Environment
- Reactor version(s) used: 1.3.22
- Other relevant libraries versions (eg.
netty, ...): Spring Boot Webflux - 3.2.7 - JVM version (
java -version): 17