Skip to content

kafka ban consumer plug-in resubscribe will override ConsumerRebalanceListener #1803

@lisong2010

Description

@lisong2010

What happened?

  1. In Kafka, the mechanism to disable and restore consumption is implemented through resubscription. However, in the KafkaConsumerController#disableConsumption method, the resubscription is directly implemented via kafkaConsumer.subscribe(subtractTopics), which ignores business scenarios where a ConsumerRebalanceListener has been added by the business system. This oversight results in the loss of the ConsumerRebalanceListener execution logic.

How can we reproduce it (as minimally and precisely as possible)?

consumer.subscribe(Lists.newArrayList("test-topic"), new ConsumerRebalanceListener() {
                    @Override
                    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                        log.info("onPartitionsRevoked begin");
                    }

                    @Override
                    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                        log.info("onPartitionsAssigned begin");
                    }
                })
  1. Secondly, subscribe(Pattern pattern) is not supported

Anything else we need to know?

No response

Sermant version

最新版本

OS version

Details
# On Linux:
$ cat /etc/os-release
# paste output here
$ uname -a
# paste output here

# On Windows:
C:\> wmic os get Caption, Version, BuildNumber, OSArchitecture
# paste output here

Metadata

Metadata

Assignees

No one assigned

    Labels

    kind/bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions