From ade87bbc7b4aa260b8e3e873492c730332783fd6 Mon Sep 17 00:00:00 2001 From: Kaise Cheng Date: Fri, 3 Oct 2025 21:48:13 +0100 Subject: [PATCH 01/15] test: add support to test against Kafka v3 and v4 update download url --- .ci/run.sh | 27 +++++++++++--- kafka_test_setup.sh | 68 ++++++++++++++++++++++------------- kafka_test_teardown.sh | 7 ++-- spec/fixtures/jaas.config | 2 +- spec/fixtures/jaas3.config | 5 +++ start_auth_schema_registry.sh | 5 ++- 6 files changed, 80 insertions(+), 34 deletions(-) create mode 100644 spec/fixtures/jaas3.config diff --git a/.ci/run.sh b/.ci/run.sh index 814674b0..7080c8fb 100755 --- a/.ci/run.sh +++ b/.ci/run.sh @@ -5,10 +5,27 @@ env set -ex -export KAFKA_VERSION=3.3.1 -./kafka_test_setup.sh +# Define the Kafka:Confluent version pairs +VERSIONS=( +# "3.9.1:7.4.0" + "4.1.0:8.0.0" +) -bundle exec rspec -fd -bundle exec rspec -fd --tag integration +for pair in "${VERSIONS[@]}"; do + KAFKA_VERSION="${pair%%:*}" + CONFLUENT_VERSION="${pair##*:}" -./kafka_test_teardown.sh + echo "==================================================" + echo " Testing with Kafka $KAFKA_VERSION / Confluent $CONFLUENT_VERSION" + echo "==================================================" + + export KAFKA_VERSION + export CONFLUENT_VERSION + + ./kafka_test_setup.sh + + bundle exec rspec -fd + bundle exec rspec -fd --tag integration + + ./kafka_test_teardown.sh +done \ No newline at end of file diff --git a/kafka_test_setup.sh b/kafka_test_setup.sh index 664e2b43..fb583ab2 100755 --- a/kafka_test_setup.sh +++ b/kafka_test_setup.sh @@ -9,6 +9,8 @@ else KAFKA_VERSION=3.4.1 fi +KAFKA_MAJOR_VERSION="${KAFKA_VERSION%%.*}" + export _JAVA_OPTIONS="-Djava.net.preferIPv4Stack=true" rm -rf build @@ -17,16 +19,28 @@ mkdir build echo "Setup Kafka version $KAFKA_VERSION" if [ ! -e "kafka_2.13-$KAFKA_VERSION.tgz" ]; then echo "Kafka not present locally, downloading" - curl -s -o "kafka_2.13-$KAFKA_VERSION.tgz" "https://archive.apache.org/dist/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz" + curl -s -o "kafka_2.13-$KAFKA_VERSION.tgz" "https://downloads.apache.org/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz" fi -cp kafka_2.13-$KAFKA_VERSION.tgz build/kafka.tgz -mkdir build/kafka && tar xzf build/kafka.tgz -C build/kafka --strip-components 1 +cp "kafka_2.13-$KAFKA_VERSION.tgz" "build/kafka.tgz" +mkdir "build/kafka" && tar xzf "build/kafka.tgz" -C "build/kafka" --strip-components 1 -echo "Starting ZooKeeper" -build/kafka/bin/zookeeper-server-start.sh -daemon build/kafka/config/zookeeper.properties -sleep 10 +if [ -f "build/kafka/bin/zookeeper-server-start.sh" ]; then + echo "Starting ZooKeeper" + build/kafka/bin/zookeeper-server-start.sh -daemon "build/kafka/config/zookeeper.properties" + sleep 10 +else + echo "Use KRaft for Kafka version $KAFKA_VERSION" + + echo "log.dirs=${PWD}/build/kafka-logs" >> build/kafka/config/server.properties + + build/kafka/bin/kafka-storage.sh format \ + --cluster-id $(build/kafka/bin/kafka-storage.sh random-uuid) \ + --config build/kafka/config/server.properties \ + --ignore-formatted \ + --standalone +fi echo "Starting Kafka broker" -build/kafka/bin/kafka-server-start.sh -daemon build/kafka/config/server.properties --override advertised.host.name=127.0.0.1 --override log.dirs="${PWD}/build/kafka-logs" +build/kafka/bin/kafka-server-start.sh -daemon "build/kafka/config/server.properties" --override advertised.host.name=127.0.0.1 --override log.dirs="${PWD}/build/kafka-logs" sleep 10 echo "Setup Confluent Platform" @@ -36,14 +50,14 @@ if [ -n "${CONFLUENT_VERSION+1}" ]; then else CONFLUENT_VERSION=7.4.0 fi -if [ ! -e confluent-community-$CONFLUENT_VERSION.tar.gz ]; then +if [ ! -e "confluent-community-$CONFLUENT_VERSION.tar.gz" ]; then echo "Confluent Platform not present locally, downloading" CONFLUENT_MINOR=$(echo "$CONFLUENT_VERSION" | sed -n 's/^\([[:digit:]]*\.[[:digit:]]*\)\.[[:digit:]]*$/\1/p') echo "CONFLUENT_MINOR is $CONFLUENT_MINOR" - curl -s -o confluent-community-$CONFLUENT_VERSION.tar.gz http://packages.confluent.io/archive/$CONFLUENT_MINOR/confluent-community-$CONFLUENT_VERSION.tar.gz + curl -s -o "confluent-community-$CONFLUENT_VERSION.tar.gz" "http://packages.confluent.io/archive/$CONFLUENT_MINOR/confluent-community-$CONFLUENT_VERSION.tar.gz" fi -cp confluent-community-$CONFLUENT_VERSION.tar.gz build/confluent_platform.tar.gz -mkdir build/confluent_platform && tar xzf build/confluent_platform.tar.gz -C build/confluent_platform --strip-components 1 +cp "confluent-community-$CONFLUENT_VERSION.tar.gz" "build/confluent_platform.tar.gz" +mkdir "build/confluent_platform" && tar xzf "build/confluent_platform.tar.gz" -C "build/confluent_platform" --strip-components 1 echo "Configuring TLS on Schema registry" rm -Rf tls_repository @@ -51,20 +65,24 @@ mkdir tls_repository ./setup_keystore_and_truststore.sh # configure schema-registry to handle https on 8083 port if [[ "$OSTYPE" == "darwin"* ]]; then - sed -i '' 's/http:\/\/0.0.0.0:8081/http:\/\/0.0.0.0:8081, https:\/\/0.0.0.0:8083/g' build/confluent_platform/etc/schema-registry/schema-registry.properties + sed -i '' 's/http:\/\/0.0.0.0:8081/http:\/\/0.0.0.0:8081, https:\/\/0.0.0.0:8083/g' "build/confluent_platform/etc/schema-registry/schema-registry.properties" else - sed -i 's/http:\/\/0.0.0.0:8081/http:\/\/0.0.0.0:8081, https:\/\/0.0.0.0:8083/g' build/confluent_platform/etc/schema-registry/schema-registry.properties + sed -i 's/http:\/\/0.0.0.0:8081/http:\/\/0.0.0.0:8081, https:\/\/0.0.0.0:8083/g' "build/confluent_platform/etc/schema-registry/schema-registry.properties" +fi +echo "ssl.keystore.location=`pwd`/tls_repository/schema_reg.jks" >> "build/confluent_platform/etc/schema-registry/schema-registry.properties" +echo "ssl.keystore.password=changeit" >> "build/confluent_platform/etc/schema-registry/schema-registry.properties" +echo "ssl.key.password=changeit" >> "build/confluent_platform/etc/schema-registry/schema-registry.properties" + +cp "build/confluent_platform/etc/schema-registry/schema-registry.properties" "build/confluent_platform/etc/schema-registry/authed-schema-registry.properties" +echo "authentication.method=BASIC" >> "build/confluent_platform/etc/schema-registry/authed-schema-registry.properties" +echo "authentication.roles=admin,developer,user,sr-user" >> "build/confluent_platform/etc/schema-registry/authed-schema-registry.properties" +echo "authentication.realm=SchemaRegistry-Props" >> "build/confluent_platform/etc/schema-registry/authed-schema-registry.properties" +cp spec/fixtures/jaas.config "build/confluent_platform/etc/schema-registry" +if [[ "$KAFKA_MAJOR_VERSION" -eq 3 ]]; then + cp "spec/fixtures/jaas$KAFKA_MAJOR_VERSION.config" "build/confluent_platform/etc/schema-registry/jaas.config" fi -echo "ssl.keystore.location=`pwd`/tls_repository/schema_reg.jks" >> build/confluent_platform/etc/schema-registry/schema-registry.properties -echo "ssl.keystore.password=changeit" >> build/confluent_platform/etc/schema-registry/schema-registry.properties -echo "ssl.key.password=changeit" >> build/confluent_platform/etc/schema-registry/schema-registry.properties -cp build/confluent_platform/etc/schema-registry/schema-registry.properties build/confluent_platform/etc/schema-registry/authed-schema-registry.properties -echo "authentication.method=BASIC" >> build/confluent_platform/etc/schema-registry/authed-schema-registry.properties -echo "authentication.roles=admin,developer,user,sr-user" >> build/confluent_platform/etc/schema-registry/authed-schema-registry.properties -echo "authentication.realm=SchemaRegistry-Props" >> build/confluent_platform/etc/schema-registry/authed-schema-registry.properties -cp spec/fixtures/jaas.config build/confluent_platform/etc/schema-registry -cp spec/fixtures/pwd build/confluent_platform/etc/schema-registry +cp spec/fixtures/pwd "build/confluent_platform/etc/schema-registry" echo "Setting up test topics with test data" build/kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 --topic logstash_integration_topic_plain --bootstrap-server localhost:9092 @@ -82,8 +100,8 @@ build/kafka/bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 - build/kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 --topic logstash_integration_partitioner_topic --bootstrap-server localhost:9092 build/kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 --topic logstash_integration_static_membership_topic --bootstrap-server localhost:9092 curl -s -o build/apache_logs.txt https://s3.amazonaws.com/data.elasticsearch.org/apache_logs/apache_logs.txt -cat build/apache_logs.txt | build/kafka/bin/kafka-console-producer.sh --topic logstash_integration_topic_plain --broker-list localhost:9092 -cat build/apache_logs.txt | build/kafka/bin/kafka-console-producer.sh --topic logstash_integration_topic_snappy --broker-list localhost:9092 --compression-codec snappy -cat build/apache_logs.txt | build/kafka/bin/kafka-console-producer.sh --topic logstash_integration_topic_lz4 --broker-list localhost:9092 --compression-codec lz4 +cat build/apache_logs.txt | build/kafka/bin/kafka-console-producer.sh --topic logstash_integration_topic_plain --bootstrap-server localhost:9092 +cat build/apache_logs.txt | build/kafka/bin/kafka-console-producer.sh --topic logstash_integration_topic_snappy --bootstrap-server localhost:9092 --compression-codec snappy +cat build/apache_logs.txt | build/kafka/bin/kafka-console-producer.sh --topic logstash_integration_topic_lz4 --bootstrap-server localhost:9092 --compression-codec lz4 echo "Setup complete, running specs" diff --git a/kafka_test_teardown.sh b/kafka_test_teardown.sh index b3fe9d4f..0801efed 100755 --- a/kafka_test_teardown.sh +++ b/kafka_test_teardown.sh @@ -8,8 +8,11 @@ build/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --top echo "Stopping Kafka broker" build/kafka/bin/kafka-server-stop.sh -echo "Stopping zookeeper" -build/kafka/bin/zookeeper-server-stop.sh + +if [ -f "build/kafka/bin/zookeeper-server-stop.sh" ]; then + echo "Stopping ZooKeeper" + build/kafka/bin/zookeeper-server-stop.sh +fi echo "Clean TLS folder" rm -Rf tls_repository diff --git a/spec/fixtures/jaas.config b/spec/fixtures/jaas.config index 0d13fa37..f1c29ac7 100644 --- a/spec/fixtures/jaas.config +++ b/spec/fixtures/jaas.config @@ -1,5 +1,5 @@ SchemaRegistry-Props { - org.eclipse.jetty.jaas.spi.PropertyFileLoginModule required + org.eclipse.jetty.security.jaas.spi.PropertyFileLoginModule required file="build/confluent_platform/etc/schema-registry/pwd" debug="true"; }; diff --git a/spec/fixtures/jaas3.config b/spec/fixtures/jaas3.config new file mode 100644 index 00000000..0d13fa37 --- /dev/null +++ b/spec/fixtures/jaas3.config @@ -0,0 +1,5 @@ +SchemaRegistry-Props { + org.eclipse.jetty.jaas.spi.PropertyFileLoginModule required + file="build/confluent_platform/etc/schema-registry/pwd" + debug="true"; +}; diff --git a/start_auth_schema_registry.sh b/start_auth_schema_registry.sh index c2f48fed..a1d6497d 100755 --- a/start_auth_schema_registry.sh +++ b/start_auth_schema_registry.sh @@ -3,4 +3,7 @@ set -ex echo "Starting authed SchemaRegistry" -SCHEMA_REGISTRY_OPTS=-Djava.security.auth.login.config=build/confluent_platform/etc/schema-registry/jaas.config build/confluent_platform/bin/schema-registry-start build/confluent_platform/etc/schema-registry/authed-schema-registry.properties > /dev/null 2>&1 & \ No newline at end of file +SCHEMA_REGISTRY_OPTS="-Djava.security.auth.login.config=build/confluent_platform/etc/schema-registry/jaas.config" \ + build/confluent_platform/bin/schema-registry-start \ + build/confluent_platform/etc/schema-registry/authed-schema-registry.properties \ + > /dev/null 2>&1 & \ No newline at end of file From f05f8b2642d908967db16b36304ce0a94c225b2e Mon Sep 17 00:00:00 2001 From: Kaise Cheng Date: Mon, 6 Oct 2025 11:14:11 +0100 Subject: [PATCH 02/15] bump version & changelog --- CHANGELOG.md | 6 ++++++ logstash-integration-kafka.gemspec | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f71bab15..6bf93f25 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 12.0.0 + - Update kafka client to 4.1.0 and transitive dependencies [#205](https://github.com/logstash-plugins/logstash-integration-kafka/pull/205) + - `partitioner => "default"` option is removed + - Add `group_protocols` options for configuring Kafka consumer rebalance protocol + - Setting `group_protocols => consumer` opts in to the new consumer group protocol + ## 11.7.0 - Add `reconnect_backoff_max_ms` option for configuring kafka client [#204](https://github.com/logstash-plugins/logstash-integration-kafka/pull/204) diff --git a/logstash-integration-kafka.gemspec b/logstash-integration-kafka.gemspec index 34519a66..6b5d855f 100644 --- a/logstash-integration-kafka.gemspec +++ b/logstash-integration-kafka.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-integration-kafka' - s.version = '11.7.0' + s.version = '12.0.0' s.licenses = ['Apache-2.0'] s.summary = "Integration with Kafka - input and output plugins" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline "+ From cfc1f842a426afb171e1aa5b1463cebdb7bf8060 Mon Sep 17 00:00:00 2001 From: Kaise Cheng Date: Mon, 6 Oct 2025 17:42:11 +0100 Subject: [PATCH 03/15] updated kafka-client and dependencies - added jackson-datatype-jdk8 - Avro class renamed AbstractKafkaAvroSerDeConfig is renamed to AbstractKafkaSchemaSerDeConfig - kafka-output `partitioner` classes are removed UniformStickyPartitioner and DefaultPartitioner are removed only `round_robin` and `uniform_sticky` are available - kafka-client DeleteTopicsResult values() is deprecated used topicNameValues() instead - doc changed the default behaviour of `partitioner` --- build.gradle | 9 +++++---- docs/index.asciidoc | 2 +- docs/input-kafka.asciidoc | 4 ++-- docs/output-kafka.asciidoc | 11 ++++++----- lib/logstash/inputs/kafka.rb | 2 +- lib/logstash/outputs/kafka.rb | 11 +++++++---- spec/integration/inputs/kafka_spec.rb | 6 +++--- spec/integration/outputs/kafka_spec.rb | 2 +- 8 files changed, 26 insertions(+), 21 deletions(-) diff --git a/build.gradle b/build.gradle index d5b0ab9f..218a6bac 100644 --- a/build.gradle +++ b/build.gradle @@ -39,8 +39,8 @@ java { // given https://docs.confluent.io/current/installation/versions-interoperability.html matrix // Confluent Platform 7.9.x is Apache Kafka 3.9.x -String confluentKafkaVersion = '7.9.1' -String apacheKafkaVersion = '3.9.1' +String confluentKafkaVersion = '8.0.0' +String apacheKafkaVersion = '4.1.0' repositories { mavenCentral() @@ -65,11 +65,12 @@ dependencies { exclude group: 'org.apache.kafka', module:'kafka-clients' } implementation "org.apache.kafka:kafka-clients:${apacheKafkaVersion}" - // slf4j, zstd, lz4-java, snappy are dependencies from "kafka-clients" + // slf4j, zstd, lz4-java, snappy, jackson-datatype-jdk8 are dependencies from "kafka-clients" implementation 'org.slf4j:slf4j-api:1.7.36' - implementation 'com.github.luben:zstd-jni:1.5.6-8' + implementation 'com.github.luben:zstd-jni:1.5.6-10' implementation 'org.lz4:lz4-java:1.8.0' implementation 'org.xerial.snappy:snappy-java:1.1.10.7' + implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.20.0' } task generateGemJarRequiresFile { doLast { diff --git a/docs/index.asciidoc b/docs/index.asciidoc index d82615b6..f3669b63 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -1,7 +1,7 @@ :plugin: kafka :type: integration :no_codec: -:kafka_client: 3.9.1 +:kafka_client: 4.1.0 /////////////////////////////////////////// START - GENERATED VARIABLES, DO NOT EDIT! diff --git a/docs/input-kafka.asciidoc b/docs/input-kafka.asciidoc index 2136b746..342faae2 100644 --- a/docs/input-kafka.asciidoc +++ b/docs/input-kafka.asciidoc @@ -2,8 +2,8 @@ :plugin: kafka :type: input :default_codec: plain -:kafka_client: 3.9.1 -:kafka_client_doc: 39 +:kafka_client: 4.1.0 +:kafka_client_doc: 41 /////////////////////////////////////////// START - GENERATED VARIABLES, DO NOT EDIT! diff --git a/docs/output-kafka.asciidoc b/docs/output-kafka.asciidoc index 3ca797e7..63bd3e64 100644 --- a/docs/output-kafka.asciidoc +++ b/docs/output-kafka.asciidoc @@ -2,8 +2,8 @@ :plugin: kafka :type: output :default_codec: plain -:kafka_client: 3.9.1 -:kafka_client_doc: 39 +:kafka_client: 4.1.0 +:kafka_client_doc: 41 /////////////////////////////////////////// START - GENERATED VARIABLES, DO NOT EDIT! @@ -349,12 +349,13 @@ The max time in milliseconds before a metadata refresh is forced. * Value type is <> * There is no default value for this setting. -The default behavior is to hash the `message_key` of an event to get the partition. -When no message key is present, the plugin picks a partition in a round-robin fashion. +By not setting this value, the plugin uses the default behavior of the Kafka client, which is `uniform_sticky`. +If a record explicitly specifies a partition, that partition is always used. +Otherwise, the producer selects a sticky partition, which remains the same for the duration of a batch and changes once the batch is full or flushed. Available options for choosing a partitioning strategy are as follows: -* `default` use the default partitioner as described above +* `default` is an alias for `uniform_sticky` * `round_robin` distributes writes to all partitions equally, regardless of `message_key` * `uniform_sticky` sticks to a partition for the duration of a batch than randomly picks a new one diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index 467f28d4..ea73bfcd 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -471,7 +471,7 @@ def create_consumer(client_id, group_instance_id) props.put("security.protocol", security_protocol) unless security_protocol.nil? if schema_registry_url props.put(kafka::VALUE_DESERIALIZER_CLASS_CONFIG, Java::io.confluent.kafka.serializers.KafkaAvroDeserializer.java_class) - serdes_config = Java::io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig + serdes_config = Java::io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig props.put(serdes_config::SCHEMA_REGISTRY_URL_CONFIG, schema_registry_url.uri.to_s) if schema_registry_proxy && !schema_registry_proxy.empty? props.put(serdes_config::PROXY_HOST, @schema_registry_proxy_host) diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb index 0bc2532a..8ee417a0 100644 --- a/lib/logstash/outputs/kafka.rb +++ b/lib/logstash/outputs/kafka.rb @@ -369,8 +369,8 @@ def create_producer props.put(kafka::LINGER_MS_CONFIG, linger_ms.to_s) props.put(kafka::MAX_REQUEST_SIZE_CONFIG, max_request_size.to_s) props.put(kafka::METADATA_MAX_AGE_CONFIG, metadata_max_age_ms.to_s) unless metadata_max_age_ms.nil? - unless partitioner.nil? - props.put(kafka::PARTITIONER_CLASS_CONFIG, partitioner = partitioner_class) + partitioner_class&.tap do |partitioner| + props.put(kafka::PARTITIONER_CLASS_CONFIG, partitioner) logger.debug('producer configured using partitioner', :partitioner_class => partitioner) end props.put(kafka::RECEIVE_BUFFER_CONFIG, receive_buffer_bytes.to_s) unless receive_buffer_bytes.nil? @@ -405,13 +405,16 @@ def create_producer end def partitioner_class + return nil if partitioner.nil? + case partitioner when 'round_robin' 'org.apache.kafka.clients.producer.RoundRobinPartitioner' when 'uniform_sticky' - 'org.apache.kafka.clients.producer.UniformStickyPartitioner' + nil when 'default' - 'org.apache.kafka.clients.producer.internals.DefaultPartitioner' + # The default is `uniform_sticky` since Kafka client 3.0 + nil else unless partitioner.index('.') raise LogStash::ConfigurationError, "unsupported partitioner: #{partitioner.inspect}" diff --git a/spec/integration/inputs/kafka_spec.rb b/spec/integration/inputs/kafka_spec.rb index 7c44f474..92c55a8b 100644 --- a/spec/integration/inputs/kafka_spec.rb +++ b/spec/integration/inputs/kafka_spec.rb @@ -486,7 +486,7 @@ def shutdown_schema_registry def delete_topic_if_exists(topic_name, user = nil, password = nil) props = java.util.Properties.new props.put(Java::org.apache.kafka.clients.admin.AdminClientConfig::BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") - serdes_config = Java::io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig + serdes_config = Java::io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig unless user.nil? props.put(serdes_config::BASIC_AUTH_CREDENTIALS_SOURCE, 'USER_INFO') props.put(serdes_config::USER_INFO_CONFIG, "#{user}:#{password}") @@ -495,7 +495,7 @@ def delete_topic_if_exists(topic_name, user = nil, password = nil) topics_list = admin_client.listTopics().names().get() if topics_list.contains(topic_name) result = admin_client.deleteTopics([topic_name]) - result.values.get(topic_name).get() + result.topicNameValues().get(topic_name).get() end end @@ -503,7 +503,7 @@ def write_some_data_to(topic_name, user = nil, password = nil) props = java.util.Properties.new config = org.apache.kafka.clients.producer.ProducerConfig - serdes_config = Java::io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig + serdes_config = Java::io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig props.put(serdes_config::SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081") props.put(config::BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") diff --git a/spec/integration/outputs/kafka_spec.rb b/spec/integration/outputs/kafka_spec.rb index c5f5d72d..77a8ecd5 100644 --- a/spec/integration/outputs/kafka_spec.rb +++ b/spec/integration/outputs/kafka_spec.rb @@ -164,7 +164,7 @@ let(:test_topic) { 'logstash_integration_topic3' } before :each do - config = base_config.merge("topic_id" => test_topic, "partitioner" => 'org.apache.kafka.clients.producer.UniformStickyPartitioner') + config = base_config.merge("topic_id" => test_topic, "partitioner" => 'org.apache.kafka.clients.producer.RoundRobinPartitioner') load_kafka_data(config) do # let's have a bit more (diverse) dataset num_events.times.collect do LogStash::Event.new.tap do |e| From 19ec2d556c34b0eba7887dfab585fe45ab8289cf Mon Sep 17 00:00:00 2001 From: Kaise Cheng Date: Tue, 7 Oct 2025 12:53:06 +0100 Subject: [PATCH 04/15] add `group_protocol` option --- docs/input-kafka.asciidoc | 16 ++++++ kafka_test_setup.sh | 4 +- kafka_test_teardown.sh | 2 +- lib/logstash/inputs/kafka.rb | 28 ++++++++++ spec/integration/inputs/kafka_spec.rb | 77 +++++++++++++++++++++++++++ 5 files changed, 124 insertions(+), 3 deletions(-) diff --git a/docs/input-kafka.asciidoc b/docs/input-kafka.asciidoc index 342faae2..332a1f00 100644 --- a/docs/input-kafka.asciidoc +++ b/docs/input-kafka.asciidoc @@ -132,6 +132,7 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai | <> |<>|No | <> |<>|No | <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>|No | <> |a valid filesystem path|No @@ -412,6 +413,21 @@ You can set this value to use information such as a hostname, an IP, or anything NOTE: In cases when multiple threads are configured and `consumer_threads` is greater than one, a suffix is appended to the `group_instance_id` to avoid collisions. +[id="plugins-{type}s-{plugin}-group_protocol"] +===== `group_protocol` + +* Value can be either of: `classic`, `consumer` +* Default value is `classic`. + +Specifies the consumer group rebalance protocol used by the Kafka client. + +`classic` is the default protocol. During a rebalance, all consumer instances pause message processing until partition assignments are complete. + +`consumer` An incremental rebalance protocol introduced in Kafka 4. It avoids global synchronization barriers by only pausing partitions that are reassigned. +When using `consumer`, the following settings **cannot be configured**: +`partition_assignment_strategy`, `heartbeat_interval_ms`, and `session_timeout_ms`. + + [id="plugins-{type}s-{plugin}-heartbeat_interval_ms"] ===== `heartbeat_interval_ms` diff --git a/kafka_test_setup.sh b/kafka_test_setup.sh index fb583ab2..abb09eda 100755 --- a/kafka_test_setup.sh +++ b/kafka_test_setup.sh @@ -6,7 +6,7 @@ set -ex if [ -n "${KAFKA_VERSION+1}" ]; then echo "KAFKA_VERSION is $KAFKA_VERSION" else - KAFKA_VERSION=3.4.1 + KAFKA_VERSION=4.1.0 fi KAFKA_MAJOR_VERSION="${KAFKA_VERSION%%.*}" @@ -48,7 +48,7 @@ echo "Setup Confluent Platform" if [ -n "${CONFLUENT_VERSION+1}" ]; then echo "CONFLUENT_VERSION is $CONFLUENT_VERSION" else - CONFLUENT_VERSION=7.4.0 + CONFLUENT_VERSION=8.0.0 fi if [ ! -e "confluent-community-$CONFLUENT_VERSION.tar.gz" ]; then echo "Confluent Platform not present locally, downloading" diff --git a/kafka_test_teardown.sh b/kafka_test_teardown.sh index 0801efed..ba7ff361 100755 --- a/kafka_test_teardown.sh +++ b/kafka_test_teardown.sh @@ -4,7 +4,7 @@ set -ex echo "Unregistering test topics" build/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic 'logstash_integration_.*' -build/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic 'topic_avro.*' +build/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic 'topic_avro.*' 2>&1 echo "Stopping Kafka broker" build/kafka/bin/kafka-server-stop.sh diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index ea73bfcd..d2f4fa18 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -129,6 +129,12 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base # consumer crated by each thread an artificial suffix is appended to the user provided `group_instance_id` # to avoid clashing. config :group_instance_id, :validate => :string + # `classic` is the "stop-the-world" rebalances. + # Any consumer restart or failure triggers a full-group rebalance, pausing processing for all consumers. + # `consumer` is an incremental rebalance protocol that avoids global sync barriers, + # pausing only the partitions that are reassigned. + # It cannot set along with `partition_assignment_strategy`, `heartbeat_interval_ms` and `session_timeout_ms` + config :group_protocol, :validate => ["classic", "consumer"], :default => "classic" # The expected time between heartbeats to the consumer coordinator. Heartbeats are used to ensure # that the consumer's session stays active and to facilitate rebalancing when new # consumers join or leave the group. The value must be set lower than @@ -293,6 +299,8 @@ def register reassign_dns_lookup @pattern ||= java.util.regex.Pattern.compile(@topics_pattern) unless @topics_pattern.nil? check_schema_registry_parameters + + set_group_protocol! end METADATA_NONE = Set[].freeze @@ -450,6 +458,7 @@ def create_consumer(client_id, group_instance_id) props.put(kafka::FETCH_MIN_BYTES_CONFIG, fetch_min_bytes.to_s) unless fetch_min_bytes.nil? props.put(kafka::GROUP_ID_CONFIG, group_id) props.put(kafka::GROUP_INSTANCE_ID_CONFIG, group_instance_id) unless group_instance_id.nil? + props.put(kafka::GROUP_PROTOCOL_CONFIG, group_protocol) props.put(kafka::HEARTBEAT_INTERVAL_MS_CONFIG, heartbeat_interval_ms.to_s) unless heartbeat_interval_ms.nil? props.put(kafka::ISOLATION_LEVEL_CONFIG, isolation_level) props.put(kafka::KEY_DESERIALIZER_CLASS_CONFIG, key_deserializer_class) @@ -513,6 +522,25 @@ def create_consumer(client_id, group_instance_id) end end + # In order to use group_protocol => consumer, heartbeat_interval_ms, session_timeout_ms and partition_assignment_strategy need to be unset + # If any of these are not using the default value of the plugin, we raise a configuration error + def set_group_protocol! + return unless group_protocol == "consumer" + + heartbeat_overridden = heartbeat_interval_ms != self.class.get_config.dig("heartbeat_interval_ms", :default) + session_overridden = session_timeout_ms != self.class.get_config.dig("session_timeout_ms", :default) + strategy_defined = !partition_assignment_strategy.nil? + + if strategy_defined || heartbeat_overridden || session_overridden + raise LogStash::ConfigurationError, "group_protocol cannot be set to 'consumer' "\ + "when any of partition_assignment_strategy, heartbeat_interval_ms or session_timeout_ms is set" + end + + @heartbeat_interval_ms = nil + @session_timeout_ms = nil + logger.debug("Reset `heartbeat_interval_ms` and `session_timeout_ms` for the consumer `group_protocol`") + end + def partition_assignment_strategy_class case partition_assignment_strategy when 'range' diff --git a/spec/integration/inputs/kafka_spec.rb b/spec/integration/inputs/kafka_spec.rb index 92c55a8b..3bb8d67d 100644 --- a/spec/integration/inputs/kafka_spec.rb +++ b/spec/integration/inputs/kafka_spec.rb @@ -187,6 +187,83 @@ def send_message(record) end end + context 'setting group_protocol' do + let(:test_topic) { 'logstash_integration_partitioner_topic' } + let(:consumer_config) do + plain_config.merge( + "topics" => [test_topic], + 'group_protocol' => group_protocol, + "partition_assignment_strategy" => partition_assignment_strategy, + "heartbeat_interval_ms" => heartbeat_interval_ms, + "session_timeout_ms" => session_timeout_ms + ) + end + let(:group_protocol) { nil } + let(:partition_assignment_strategy) { nil } + let(:heartbeat_interval_ms) { LogStash::Inputs::Kafka.get_config().dig("heartbeat_interval_ms", :default) } + let(:session_timeout_ms) { LogStash::Inputs::Kafka.get_config().dig("session_timeout_ms", :default) } + + describe "group_protocol = classic" do + let(:group_protocol) { 'classic' } + + it 'passes register check' do + kafka_input = LogStash::Inputs::Kafka.new(consumer_config) + expect { + kafka_input.register + }.to_not raise_error + + expect( kafka_input.instance_variable_get(:@heartbeat_interval_ms)).eql?(heartbeat_interval_ms) + expect( kafka_input.instance_variable_get(:@session_timeout_ms)).eql?(session_timeout_ms) + end + end + + describe "group_protocol = consumer" do + let(:group_protocol) { 'consumer' } + + describe "passes register check with supported config" do + it 'reset unsupported config to nil' do + kafka_input = LogStash::Inputs::Kafka.new(consumer_config) + expect { + kafka_input.register + }.to_not raise_error + + expect( kafka_input.instance_variable_get(:@heartbeat_interval_ms)).to be_nil + expect( kafka_input.instance_variable_get(:@session_timeout_ms)).to be_nil + end + end + + { + partition_assignment_strategy: 'range', + heartbeat_interval_ms: 2000, + session_timeout_ms: 6000 + }.each do |config_key, config_value| + context "with unsupported config #{config_key}" do + let(config_key) { config_value } + + it 'raises LogStash::ConfigurationError' do + kafka_input = LogStash::Inputs::Kafka.new(consumer_config) + expect { + kafka_input.register + }.to raise_error(LogStash::ConfigurationError, /group_protocol cannot be set to.*consumer.*/) + end + end + end + + context "with valid config" do + let(:test_topic) { 'logstash_integration_topic_plain' } + let(:manual_commit_config) do + consumer_config.merge( + 'enable_auto_commit' => 'false' + ) + end + it 'consume data' do + queue = consume_messages(manual_commit_config, timeout: timeout_seconds, event_count: num_events) + expect(queue.length).to eq(num_events) + end + end + end + end + context "static membership 'group.instance.id' setting" do let(:base_config) do { From b6cbe0b3efe87502076735315dbdf202ec736f23 Mon Sep 17 00:00:00 2001 From: Kaise Cheng Date: Wed, 8 Oct 2025 12:50:15 +0100 Subject: [PATCH 05/15] remove `default` partitioner --- docs/output-kafka.asciidoc | 9 +++------ lib/logstash/outputs/kafka.rb | 5 +---- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/docs/output-kafka.asciidoc b/docs/output-kafka.asciidoc index 63bd3e64..ba5f79af 100644 --- a/docs/output-kafka.asciidoc +++ b/docs/output-kafka.asciidoc @@ -349,15 +349,12 @@ The max time in milliseconds before a metadata refresh is forced. * Value type is <> * There is no default value for this setting. -By not setting this value, the plugin uses the default behavior of the Kafka client, which is `uniform_sticky`. -If a record explicitly specifies a partition, that partition is always used. -Otherwise, the producer selects a sticky partition, which remains the same for the duration of a batch and changes once the batch is full or flushed. +By not setting this value, the plugin uses the default strategy provided by the Kafka client. -Available options for choosing a partitioning strategy are as follows: +Available options are as follows: -* `default` is an alias for `uniform_sticky` * `round_robin` distributes writes to all partitions equally, regardless of `message_key` -* `uniform_sticky` sticks to a partition for the duration of a batch than randomly picks a new one +* `uniform_sticky` switches to a new partition after every batch.size bytes are produced, ensuring uniform distribution across partitions. This is the default strategy of Kafka client. [id="plugins-{type}s-{plugin}-receive_buffer_bytes"] ===== `receive_buffer_bytes` diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb index 8ee417a0..f8901976 100644 --- a/lib/logstash/outputs/kafka.rb +++ b/lib/logstash/outputs/kafka.rb @@ -110,7 +110,7 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base config :message_headers, :validate => :hash, :default => {} # the timeout setting for initial metadata request to fetch topic metadata. config :metadata_fetch_timeout_ms, :validate => :number, :default => 60_000 - # Partitioner to use - can be `default`, `uniform_sticky`, `round_robin` or a fully qualified class name of a custom partitioner. + # Partitioner to use - can be `uniform_sticky`, `round_robin` or a fully qualified class name of a custom partitioner. config :partitioner, :validate => :string # The size of the TCP receive buffer to use when reading data config :receive_buffer_bytes, :validate => :number, :default => 32_768 # (32KB) Kafka default @@ -412,9 +412,6 @@ def partitioner_class 'org.apache.kafka.clients.producer.RoundRobinPartitioner' when 'uniform_sticky' nil - when 'default' - # The default is `uniform_sticky` since Kafka client 3.0 - nil else unless partitioner.index('.') raise LogStash::ConfigurationError, "unsupported partitioner: #{partitioner.inspect}" From 535c5395698c560a4802460260ac637f1422a167 Mon Sep 17 00:00:00 2001 From: Kaise Cheng Date: Wed, 8 Oct 2025 22:55:03 +0100 Subject: [PATCH 06/15] test: fix default partitioner --- spec/integration/outputs/kafka_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/integration/outputs/kafka_spec.rb b/spec/integration/outputs/kafka_spec.rb index 77a8ecd5..683373b0 100644 --- a/spec/integration/outputs/kafka_spec.rb +++ b/spec/integration/outputs/kafka_spec.rb @@ -221,7 +221,7 @@ load_kafka_data(config) end - [ 'default', 'round_robin', 'uniform_sticky' ].each do |partitioner| + [ 'round_robin', 'uniform_sticky' ].each do |partitioner| describe partitioner do let(:partitioner) { partitioner } it 'loads data' do From cdf94c04c3f329fa2b5563751f1636c7cfb4686f Mon Sep 17 00:00:00 2001 From: Kaise Cheng Date: Thu, 9 Oct 2025 11:00:26 +0100 Subject: [PATCH 07/15] refactor partitioner --- lib/logstash/outputs/kafka.rb | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb index f8901976..48dcb53a 100644 --- a/lib/logstash/outputs/kafka.rb +++ b/lib/logstash/outputs/kafka.rb @@ -405,19 +405,12 @@ def create_producer end def partitioner_class - return nil if partitioner.nil? + return nil if partitioner.nil? || partitioner == 'uniform_sticky' + return 'org.apache.kafka.clients.producer.RoundRobinPartitioner' if partitioner == 'round_robin' - case partitioner - when 'round_robin' - 'org.apache.kafka.clients.producer.RoundRobinPartitioner' - when 'uniform_sticky' - nil - else - unless partitioner.index('.') - raise LogStash::ConfigurationError, "unsupported partitioner: #{partitioner.inspect}" - end - partitioner # assume a fully qualified class-name - end + raise LogStash::ConfigurationError, "unsupported partitioner: #{partitioner.inspect}" unless partitioner.include?('.') + + partitioner end end #class LogStash::Outputs::Kafka From ff1aa50955b38013208fda48bb02f8d2f82c6d57 Mon Sep 17 00:00:00 2001 From: kaisecheng <69120390+kaisecheng@users.noreply.github.com> Date: Thu, 9 Oct 2025 13:35:35 +0100 Subject: [PATCH 08/15] Update docs/output-kafka.asciidoc --- docs/output-kafka.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/output-kafka.asciidoc b/docs/output-kafka.asciidoc index ba5f79af..74027eb0 100644 --- a/docs/output-kafka.asciidoc +++ b/docs/output-kafka.asciidoc @@ -354,7 +354,7 @@ By not setting this value, the plugin uses the default strategy provided by the Available options are as follows: * `round_robin` distributes writes to all partitions equally, regardless of `message_key` -* `uniform_sticky` switches to a new partition after every batch.size bytes are produced, ensuring uniform distribution across partitions. This is the default strategy of Kafka client. +* `uniform_sticky` hashes the `message_key` of an event to get the partition. When no message key is present, the plugin switches to a new partition after every batch.size bytes are produced, ensuring uniform distribution across partitions. This is the default strategy of Kafka client. [id="plugins-{type}s-{plugin}-receive_buffer_bytes"] ===== `receive_buffer_bytes` From 23bd2239c577069f439159bd90e44e23f6316b28 Mon Sep 17 00:00:00 2001 From: Kaise Cheng Date: Fri, 10 Oct 2025 17:43:42 +0100 Subject: [PATCH 09/15] update linger_ms default to 5 Align with Kafka client 4.0 default value change --- CHANGELOG.md | 1 + docs/output-kafka.asciidoc | 2 +- lib/logstash/outputs/kafka.rb | 2 +- 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6bf93f25..f7dd84ce 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ ## 12.0.0 - Update kafka client to 4.1.0 and transitive dependencies [#205](https://github.com/logstash-plugins/logstash-integration-kafka/pull/205) - `partitioner => "default"` option is removed + - `linger_ms` default value changed from 0 to 5 - Add `group_protocols` options for configuring Kafka consumer rebalance protocol - Setting `group_protocols => consumer` opts in to the new consumer group protocol diff --git a/docs/output-kafka.asciidoc b/docs/output-kafka.asciidoc index ba5f79af..8b34f82f 100644 --- a/docs/output-kafka.asciidoc +++ b/docs/output-kafka.asciidoc @@ -285,7 +285,7 @@ Serializer class for the key of the message ===== `linger_ms` * Value type is <> - * Default value is `0` + * Default value is `5` The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb index 48dcb53a..97bd15d4 100644 --- a/lib/logstash/outputs/kafka.rb +++ b/lib/logstash/outputs/kafka.rb @@ -101,7 +101,7 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base # This setting accomplishes this by adding a small amount of artificial delay—that is, # rather than immediately sending out a record the producer will wait for up to the given delay # to allow other records to be sent so that the sends can be batched together. - config :linger_ms, :validate => :number, :default => 0 # Kafka default + config :linger_ms, :validate => :number, :default => 5 # Kafka default # The maximum size of a request config :max_request_size, :validate => :number, :default => 1_048_576 # (1MB) Kafka default # The key for the message From ab598749ca9e71391fd633b4ffd2d3c00fb9e721 Mon Sep 17 00:00:00 2001 From: Kaise Cheng Date: Wed, 15 Oct 2025 13:34:05 +0100 Subject: [PATCH 10/15] align partitioner options with Kafka client --- CHANGELOG.md | 2 +- docs/output-kafka.asciidoc | 5 ++--- lib/logstash/outputs/kafka.rb | 4 ++-- spec/integration/outputs/kafka_spec.rb | 11 +++-------- 4 files changed, 8 insertions(+), 14 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f7dd84ce..a11537fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ ## 12.0.0 - Update kafka client to 4.1.0 and transitive dependencies [#205](https://github.com/logstash-plugins/logstash-integration-kafka/pull/205) - - `partitioner => "default"` option is removed + - partitioner options `default` and `uniform_sticky` are removed - `linger_ms` default value changed from 0 to 5 - Add `group_protocols` options for configuring Kafka consumer rebalance protocol - Setting `group_protocols => consumer` opts in to the new consumer group protocol diff --git a/docs/output-kafka.asciidoc b/docs/output-kafka.asciidoc index 4cd89202..b6f6ea5e 100644 --- a/docs/output-kafka.asciidoc +++ b/docs/output-kafka.asciidoc @@ -349,12 +349,11 @@ The max time in milliseconds before a metadata refresh is forced. * Value type is <> * There is no default value for this setting. -By not setting this value, the plugin uses the default strategy provided by the Kafka client. +By not setting this value, the plugin uses the built-in partitioning strategy provided by the Kafka client. Read more about the "partitioner.class" on the Kafka documentation. -Available options are as follows: +Available option is as follows: * `round_robin` distributes writes to all partitions equally, regardless of `message_key` -* `uniform_sticky` hashes the `message_key` of an event to get the partition. When no message key is present, the plugin switches to a new partition after every batch.size bytes are produced, ensuring uniform distribution across partitions. This is the default strategy of Kafka client. [id="plugins-{type}s-{plugin}-receive_buffer_bytes"] ===== `receive_buffer_bytes` diff --git a/lib/logstash/outputs/kafka.rb b/lib/logstash/outputs/kafka.rb index 97bd15d4..f9b5e236 100644 --- a/lib/logstash/outputs/kafka.rb +++ b/lib/logstash/outputs/kafka.rb @@ -110,7 +110,7 @@ class LogStash::Outputs::Kafka < LogStash::Outputs::Base config :message_headers, :validate => :hash, :default => {} # the timeout setting for initial metadata request to fetch topic metadata. config :metadata_fetch_timeout_ms, :validate => :number, :default => 60_000 - # Partitioner to use - can be `uniform_sticky`, `round_robin` or a fully qualified class name of a custom partitioner. + # Partitioner to use - can be `round_robin` or a fully qualified class name of a custom partitioner. config :partitioner, :validate => :string # The size of the TCP receive buffer to use when reading data config :receive_buffer_bytes, :validate => :number, :default => 32_768 # (32KB) Kafka default @@ -405,7 +405,7 @@ def create_producer end def partitioner_class - return nil if partitioner.nil? || partitioner == 'uniform_sticky' + return nil if partitioner.nil? return 'org.apache.kafka.clients.producer.RoundRobinPartitioner' if partitioner == 'round_robin' raise LogStash::ConfigurationError, "unsupported partitioner: #{partitioner.inspect}" unless partitioner.include?('.') diff --git a/spec/integration/outputs/kafka_spec.rb b/spec/integration/outputs/kafka_spec.rb index 683373b0..83797aa3 100644 --- a/spec/integration/outputs/kafka_spec.rb +++ b/spec/integration/outputs/kafka_spec.rb @@ -212,7 +212,7 @@ context 'setting partitioner' do let(:test_topic) { 'logstash_integration_partitioner_topic' } - let(:partitioner) { nil } + let(:partitioner) { 'round_robin' } before :each do @messages_offset = fetch_messages_from_all_partitions @@ -221,13 +221,8 @@ load_kafka_data(config) end - [ 'round_robin', 'uniform_sticky' ].each do |partitioner| - describe partitioner do - let(:partitioner) { partitioner } - it 'loads data' do - expect(fetch_messages_from_all_partitions - @messages_offset).to eql num_events - end - end + it 'loads data' do + expect(fetch_messages_from_all_partitions - @messages_offset).to eql num_events end def fetch_messages_from_all_partitions From 99c076dbe7a3ffebe664826cd2698cb63d403d66 Mon Sep 17 00:00:00 2001 From: kaisecheng <69120390+kaisecheng@users.noreply.github.com> Date: Wed, 15 Oct 2025 18:52:10 +0100 Subject: [PATCH 11/15] Update lib/logstash/inputs/kafka.rb MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: João Duarte --- lib/logstash/inputs/kafka.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/inputs/kafka.rb b/lib/logstash/inputs/kafka.rb index d2f4fa18..eba9362a 100644 --- a/lib/logstash/inputs/kafka.rb +++ b/lib/logstash/inputs/kafka.rb @@ -538,7 +538,7 @@ def set_group_protocol! @heartbeat_interval_ms = nil @session_timeout_ms = nil - logger.debug("Reset `heartbeat_interval_ms` and `session_timeout_ms` for the consumer `group_protocol`") + logger.debug("Settings 'heartbeat_interval_ms' and 'session_timeout_ms' have been reset since 'group_protocol' is configured as 'consumer'") end def partition_assignment_strategy_class From 08e1542c07939720845343fafedd537f11b93768 Mon Sep 17 00:00:00 2001 From: Kaise Cheng Date: Wed, 15 Oct 2025 18:48:18 +0100 Subject: [PATCH 12/15] update comment on dependency --- build.gradle | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 218a6bac..69ba9309 100644 --- a/build.gradle +++ b/build.gradle @@ -65,11 +65,12 @@ dependencies { exclude group: 'org.apache.kafka', module:'kafka-clients' } implementation "org.apache.kafka:kafka-clients:${apacheKafkaVersion}" - // slf4j, zstd, lz4-java, snappy, jackson-datatype-jdk8 are dependencies from "kafka-clients" + // slf4j, zstd, lz4-java, and snappy are dependencies from "kafka-clients" implementation 'org.slf4j:slf4j-api:1.7.36' implementation 'com.github.luben:zstd-jni:1.5.6-10' implementation 'org.lz4:lz4-java:1.8.0' implementation 'org.xerial.snappy:snappy-java:1.1.10.7' + // jackson-datatype-jdk8 is a dependency of kafka-avro-serializer implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.20.0' } task generateGemJarRequiresFile { From 557f5166db587798aede52b67f18eb37dbf5ca5e Mon Sep 17 00:00:00 2001 From: Kaise Cheng Date: Wed, 15 Oct 2025 18:51:50 +0100 Subject: [PATCH 13/15] update release note --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a11537fd..c3d21be6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ ## 12.0.0 - Update kafka client to 4.1.0 and transitive dependencies [#205](https://github.com/logstash-plugins/logstash-integration-kafka/pull/205) - - partitioner options `default` and `uniform_sticky` are removed + - Breaking Change: partitioner options `default` and `uniform_sticky` are removed - `linger_ms` default value changed from 0 to 5 - Add `group_protocols` options for configuring Kafka consumer rebalance protocol - Setting `group_protocols => consumer` opts in to the new consumer group protocol From c3256e8de477b7aeb81dd01741f98c9b4a3cc8ac Mon Sep 17 00:00:00 2001 From: Kaise Cheng Date: Wed, 15 Oct 2025 19:54:59 +0100 Subject: [PATCH 14/15] update dependency to match confluent version --- build.gradle | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/build.gradle b/build.gradle index 69ba9309..1e120517 100644 --- a/build.gradle +++ b/build.gradle @@ -64,14 +64,15 @@ dependencies { implementation("io.confluent:kafka-schema-registry-client:${confluentKafkaVersion}") { exclude group: 'org.apache.kafka', module:'kafka-clients' } + // dependency of kafka-avro-serializer. jackson-datatype-jdk8 is a dependency of kafka-schema-registry-client + implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.16.0' + implementation "org.apache.kafka:kafka-clients:${apacheKafkaVersion}" // slf4j, zstd, lz4-java, and snappy are dependencies from "kafka-clients" implementation 'org.slf4j:slf4j-api:1.7.36' implementation 'com.github.luben:zstd-jni:1.5.6-10' implementation 'org.lz4:lz4-java:1.8.0' implementation 'org.xerial.snappy:snappy-java:1.1.10.7' - // jackson-datatype-jdk8 is a dependency of kafka-avro-serializer - implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.20.0' } task generateGemJarRequiresFile { doLast { From 96e7338a5566ed8edc43de7f663ee0abed69f440 Mon Sep 17 00:00:00 2001 From: kaisecheng <69120390+kaisecheng@users.noreply.github.com> Date: Thu, 16 Oct 2025 10:27:51 +0100 Subject: [PATCH 15/15] Update docs/input-kafka.asciidoc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: João Duarte --- docs/input-kafka.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/input-kafka.asciidoc b/docs/input-kafka.asciidoc index 332a1f00..a9da63b4 100644 --- a/docs/input-kafka.asciidoc +++ b/docs/input-kafka.asciidoc @@ -423,7 +423,7 @@ Specifies the consumer group rebalance protocol used by the Kafka client. `classic` is the default protocol. During a rebalance, all consumer instances pause message processing until partition assignments are complete. -`consumer` An incremental rebalance protocol introduced in Kafka 4. It avoids global synchronization barriers by only pausing partitions that are reassigned. +`consumer` is an incremental rebalance protocol introduced in Kafka 4. It avoids global synchronization barriers by only pausing partitions that are reassigned. When using `consumer`, the following settings **cannot be configured**: `partition_assignment_strategy`, `heartbeat_interval_ms`, and `session_timeout_ms`.