Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions .ci/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,27 @@ env

set -ex

export KAFKA_VERSION=3.3.1
./kafka_test_setup.sh
# Define the Kafka:Confluent version pairs
VERSIONS=(
# "3.9.1:7.4.0"
"4.1.0:8.0.0"
)

bundle exec rspec -fd
bundle exec rspec -fd --tag integration
for pair in "${VERSIONS[@]}"; do
KAFKA_VERSION="${pair%%:*}"
CONFLUENT_VERSION="${pair##*:}"

./kafka_test_teardown.sh
echo "=================================================="
echo " Testing with Kafka $KAFKA_VERSION / Confluent $CONFLUENT_VERSION"
echo "=================================================="

export KAFKA_VERSION
export CONFLUENT_VERSION

./kafka_test_setup.sh

bundle exec rspec -fd
bundle exec rspec -fd --tag integration

./kafka_test_teardown.sh
done
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 12.0.0
- Update kafka client to 4.1.0 and transitive dependencies [#205](https://github.com/logstash-plugins/logstash-integration-kafka/pull/205)
- Breaking Change: partitioner options `default` and `uniform_sticky` are removed
- `linger_ms` default value changed from 0 to 5
- Add `group_protocols` options for configuring Kafka consumer rebalance protocol
- Setting `group_protocols => consumer` opts in to the new consumer group protocol

## 11.7.0
- Add `reconnect_backoff_max_ms` option for configuring kafka client [#204](https://github.com/logstash-plugins/logstash-integration-kafka/pull/204)

Expand Down
11 changes: 7 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ java {

// given https://docs.confluent.io/current/installation/versions-interoperability.html matrix
// Confluent Platform 7.9.x is Apache Kafka 3.9.x
String confluentKafkaVersion = '7.9.1'
String apacheKafkaVersion = '3.9.1'
String confluentKafkaVersion = '8.0.0'
String apacheKafkaVersion = '4.1.0'

repositories {
mavenCentral()
Expand All @@ -64,10 +64,13 @@ dependencies {
implementation("io.confluent:kafka-schema-registry-client:${confluentKafkaVersion}") {
exclude group: 'org.apache.kafka', module:'kafka-clients'
}
// dependency of kafka-avro-serializer. jackson-datatype-jdk8 is a dependency of kafka-schema-registry-client
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.16.0'

implementation "org.apache.kafka:kafka-clients:${apacheKafkaVersion}"
// slf4j, zstd, lz4-java, snappy are dependencies from "kafka-clients"
// slf4j, zstd, lz4-java, and snappy are dependencies from "kafka-clients"
implementation 'org.slf4j:slf4j-api:1.7.36'
implementation 'com.github.luben:zstd-jni:1.5.6-8'
implementation 'com.github.luben:zstd-jni:1.5.6-10'
implementation 'org.lz4:lz4-java:1.8.0'
implementation 'org.xerial.snappy:snappy-java:1.1.10.7'
}
Expand Down
2 changes: 1 addition & 1 deletion docs/index.asciidoc
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
:plugin: kafka
:type: integration
:no_codec:
:kafka_client: 3.9.1
:kafka_client: 4.1.0

///////////////////////////////////////////
START - GENERATED VARIABLES, DO NOT EDIT!
Expand Down
20 changes: 18 additions & 2 deletions docs/input-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
:plugin: kafka
:type: input
:default_codec: plain
:kafka_client: 3.9.1
:kafka_client_doc: 39
:kafka_client: 4.1.0
:kafka_client_doc: 41

///////////////////////////////////////////
START - GENERATED VARIABLES, DO NOT EDIT!
Expand Down Expand Up @@ -132,6 +132,7 @@ See the https://kafka.apache.org/{kafka_client_doc}/documentation for more detai
| <<plugins-{type}s-{plugin}-fetch_min_bytes>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-group_id>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-group_instance_id>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-group_protocol>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-heartbeat_interval_ms>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-isolation_level>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-jaas_path>> |a valid filesystem path|No
Expand Down Expand Up @@ -412,6 +413,21 @@ You can set this value to use information such as a hostname, an IP, or anything
NOTE: In cases when multiple threads are configured and `consumer_threads` is greater than one, a suffix is appended to
the `group_instance_id` to avoid collisions.

[id="plugins-{type}s-{plugin}-group_protocol"]
===== `group_protocol`

* Value can be either of: `classic`, `consumer`
* Default value is `classic`.

Specifies the consumer group rebalance protocol used by the Kafka client.

`classic` is the default protocol. During a rebalance, all consumer instances pause message processing until partition assignments are complete.

`consumer` is an incremental rebalance protocol introduced in Kafka 4. It avoids global synchronization barriers by only pausing partitions that are reassigned.
When using `consumer`, the following settings **cannot be configured**:
`partition_assignment_strategy`, `heartbeat_interval_ms`, and `session_timeout_ms`.


[id="plugins-{type}s-{plugin}-heartbeat_interval_ms"]
===== `heartbeat_interval_ms`

Expand Down
13 changes: 5 additions & 8 deletions docs/output-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
:plugin: kafka
:type: output
:default_codec: plain
:kafka_client: 3.9.1
:kafka_client_doc: 39
:kafka_client: 4.1.0
:kafka_client_doc: 41

///////////////////////////////////////////
START - GENERATED VARIABLES, DO NOT EDIT!
Expand Down Expand Up @@ -285,7 +285,7 @@ Serializer class for the key of the message
===== `linger_ms`

* Value type is <<number,number>>
* Default value is `0`
* Default value is `5`

The producer groups together any records that arrive in between request
transmissions into a single batched request. Normally this occurs only under
Expand Down Expand Up @@ -349,14 +349,11 @@ The max time in milliseconds before a metadata refresh is forced.
* Value type is <<string,string>>
* There is no default value for this setting.

The default behavior is to hash the `message_key` of an event to get the partition.
When no message key is present, the plugin picks a partition in a round-robin fashion.
By not setting this value, the plugin uses the built-in partitioning strategy provided by the Kafka client. Read more about the "partitioner.class" on the Kafka documentation.

Available options for choosing a partitioning strategy are as follows:
Available option is as follows:

* `default` use the default partitioner as described above
* `round_robin` distributes writes to all partitions equally, regardless of `message_key`
* `uniform_sticky` sticks to a partition for the duration of a batch than randomly picks a new one

[id="plugins-{type}s-{plugin}-receive_buffer_bytes"]
===== `receive_buffer_bytes`
Expand Down
72 changes: 45 additions & 27 deletions kafka_test_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ set -ex
if [ -n "${KAFKA_VERSION+1}" ]; then
echo "KAFKA_VERSION is $KAFKA_VERSION"
else
KAFKA_VERSION=3.4.1
KAFKA_VERSION=4.1.0
fi

KAFKA_MAJOR_VERSION="${KAFKA_VERSION%%.*}"

export _JAVA_OPTIONS="-Djava.net.preferIPv4Stack=true"

rm -rf build
Expand All @@ -17,54 +19,70 @@ mkdir build
echo "Setup Kafka version $KAFKA_VERSION"
if [ ! -e "kafka_2.13-$KAFKA_VERSION.tgz" ]; then
echo "Kafka not present locally, downloading"
curl -s -o "kafka_2.13-$KAFKA_VERSION.tgz" "https://archive.apache.org/dist/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz"
curl -s -o "kafka_2.13-$KAFKA_VERSION.tgz" "https://downloads.apache.org/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz"
fi
cp kafka_2.13-$KAFKA_VERSION.tgz build/kafka.tgz
mkdir build/kafka && tar xzf build/kafka.tgz -C build/kafka --strip-components 1
cp "kafka_2.13-$KAFKA_VERSION.tgz" "build/kafka.tgz"
mkdir "build/kafka" && tar xzf "build/kafka.tgz" -C "build/kafka" --strip-components 1

echo "Starting ZooKeeper"
build/kafka/bin/zookeeper-server-start.sh -daemon build/kafka/config/zookeeper.properties
sleep 10
if [ -f "build/kafka/bin/zookeeper-server-start.sh" ]; then
echo "Starting ZooKeeper"
build/kafka/bin/zookeeper-server-start.sh -daemon "build/kafka/config/zookeeper.properties"
sleep 10
else
echo "Use KRaft for Kafka version $KAFKA_VERSION"

echo "log.dirs=${PWD}/build/kafka-logs" >> build/kafka/config/server.properties

build/kafka/bin/kafka-storage.sh format \
--cluster-id $(build/kafka/bin/kafka-storage.sh random-uuid) \
--config build/kafka/config/server.properties \
--ignore-formatted \
--standalone
fi
echo "Starting Kafka broker"
build/kafka/bin/kafka-server-start.sh -daemon build/kafka/config/server.properties --override advertised.host.name=127.0.0.1 --override log.dirs="${PWD}/build/kafka-logs"
build/kafka/bin/kafka-server-start.sh -daemon "build/kafka/config/server.properties" --override advertised.host.name=127.0.0.1 --override log.dirs="${PWD}/build/kafka-logs"
sleep 10

echo "Setup Confluent Platform"
# check if CONFLUENT_VERSION env var is set
if [ -n "${CONFLUENT_VERSION+1}" ]; then
echo "CONFLUENT_VERSION is $CONFLUENT_VERSION"
else
CONFLUENT_VERSION=7.4.0
CONFLUENT_VERSION=8.0.0
fi
if [ ! -e confluent-community-$CONFLUENT_VERSION.tar.gz ]; then
if [ ! -e "confluent-community-$CONFLUENT_VERSION.tar.gz" ]; then
echo "Confluent Platform not present locally, downloading"
CONFLUENT_MINOR=$(echo "$CONFLUENT_VERSION" | sed -n 's/^\([[:digit:]]*\.[[:digit:]]*\)\.[[:digit:]]*$/\1/p')
echo "CONFLUENT_MINOR is $CONFLUENT_MINOR"
curl -s -o confluent-community-$CONFLUENT_VERSION.tar.gz http://packages.confluent.io/archive/$CONFLUENT_MINOR/confluent-community-$CONFLUENT_VERSION.tar.gz
curl -s -o "confluent-community-$CONFLUENT_VERSION.tar.gz" "http://packages.confluent.io/archive/$CONFLUENT_MINOR/confluent-community-$CONFLUENT_VERSION.tar.gz"
fi
cp confluent-community-$CONFLUENT_VERSION.tar.gz build/confluent_platform.tar.gz
mkdir build/confluent_platform && tar xzf build/confluent_platform.tar.gz -C build/confluent_platform --strip-components 1
cp "confluent-community-$CONFLUENT_VERSION.tar.gz" "build/confluent_platform.tar.gz"
mkdir "build/confluent_platform" && tar xzf "build/confluent_platform.tar.gz" -C "build/confluent_platform" --strip-components 1

echo "Configuring TLS on Schema registry"
rm -Rf tls_repository
mkdir tls_repository
./setup_keystore_and_truststore.sh
# configure schema-registry to handle https on 8083 port
if [[ "$OSTYPE" == "darwin"* ]]; then
sed -i '' 's/http:\/\/0.0.0.0:8081/http:\/\/0.0.0.0:8081, https:\/\/0.0.0.0:8083/g' build/confluent_platform/etc/schema-registry/schema-registry.properties
sed -i '' 's/http:\/\/0.0.0.0:8081/http:\/\/0.0.0.0:8081, https:\/\/0.0.0.0:8083/g' "build/confluent_platform/etc/schema-registry/schema-registry.properties"
else
sed -i 's/http:\/\/0.0.0.0:8081/http:\/\/0.0.0.0:8081, https:\/\/0.0.0.0:8083/g' build/confluent_platform/etc/schema-registry/schema-registry.properties
sed -i 's/http:\/\/0.0.0.0:8081/http:\/\/0.0.0.0:8081, https:\/\/0.0.0.0:8083/g' "build/confluent_platform/etc/schema-registry/schema-registry.properties"
fi
echo "ssl.keystore.location=`pwd`/tls_repository/schema_reg.jks" >> "build/confluent_platform/etc/schema-registry/schema-registry.properties"
echo "ssl.keystore.password=changeit" >> "build/confluent_platform/etc/schema-registry/schema-registry.properties"
echo "ssl.key.password=changeit" >> "build/confluent_platform/etc/schema-registry/schema-registry.properties"

cp "build/confluent_platform/etc/schema-registry/schema-registry.properties" "build/confluent_platform/etc/schema-registry/authed-schema-registry.properties"
echo "authentication.method=BASIC" >> "build/confluent_platform/etc/schema-registry/authed-schema-registry.properties"
echo "authentication.roles=admin,developer,user,sr-user" >> "build/confluent_platform/etc/schema-registry/authed-schema-registry.properties"
echo "authentication.realm=SchemaRegistry-Props" >> "build/confluent_platform/etc/schema-registry/authed-schema-registry.properties"
cp spec/fixtures/jaas.config "build/confluent_platform/etc/schema-registry"
if [[ "$KAFKA_MAJOR_VERSION" -eq 3 ]]; then
cp "spec/fixtures/jaas$KAFKA_MAJOR_VERSION.config" "build/confluent_platform/etc/schema-registry/jaas.config"
Comment on lines +81 to +82
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CI only tests against Kafka 4. Keeping old jaas.config for development testing against Kafka 3.

fi
echo "ssl.keystore.location=`pwd`/tls_repository/schema_reg.jks" >> build/confluent_platform/etc/schema-registry/schema-registry.properties
echo "ssl.keystore.password=changeit" >> build/confluent_platform/etc/schema-registry/schema-registry.properties
echo "ssl.key.password=changeit" >> build/confluent_platform/etc/schema-registry/schema-registry.properties

cp build/confluent_platform/etc/schema-registry/schema-registry.properties build/confluent_platform/etc/schema-registry/authed-schema-registry.properties
echo "authentication.method=BASIC" >> build/confluent_platform/etc/schema-registry/authed-schema-registry.properties
echo "authentication.roles=admin,developer,user,sr-user" >> build/confluent_platform/etc/schema-registry/authed-schema-registry.properties
echo "authentication.realm=SchemaRegistry-Props" >> build/confluent_platform/etc/schema-registry/authed-schema-registry.properties
cp spec/fixtures/jaas.config build/confluent_platform/etc/schema-registry
cp spec/fixtures/pwd build/confluent_platform/etc/schema-registry
cp spec/fixtures/pwd "build/confluent_platform/etc/schema-registry"

echo "Setting up test topics with test data"
build/kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 --topic logstash_integration_topic_plain --bootstrap-server localhost:9092
Expand All @@ -82,8 +100,8 @@ build/kafka/bin/kafka-topics.sh --create --partitions 1 --replication-factor 1 -
build/kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 --topic logstash_integration_partitioner_topic --bootstrap-server localhost:9092
build/kafka/bin/kafka-topics.sh --create --partitions 3 --replication-factor 1 --topic logstash_integration_static_membership_topic --bootstrap-server localhost:9092
curl -s -o build/apache_logs.txt https://s3.amazonaws.com/data.elasticsearch.org/apache_logs/apache_logs.txt
cat build/apache_logs.txt | build/kafka/bin/kafka-console-producer.sh --topic logstash_integration_topic_plain --broker-list localhost:9092
cat build/apache_logs.txt | build/kafka/bin/kafka-console-producer.sh --topic logstash_integration_topic_snappy --broker-list localhost:9092 --compression-codec snappy
cat build/apache_logs.txt | build/kafka/bin/kafka-console-producer.sh --topic logstash_integration_topic_lz4 --broker-list localhost:9092 --compression-codec lz4
cat build/apache_logs.txt | build/kafka/bin/kafka-console-producer.sh --topic logstash_integration_topic_plain --bootstrap-server localhost:9092
cat build/apache_logs.txt | build/kafka/bin/kafka-console-producer.sh --topic logstash_integration_topic_snappy --bootstrap-server localhost:9092 --compression-codec snappy
cat build/apache_logs.txt | build/kafka/bin/kafka-console-producer.sh --topic logstash_integration_topic_lz4 --bootstrap-server localhost:9092 --compression-codec lz4

echo "Setup complete, running specs"
9 changes: 6 additions & 3 deletions kafka_test_teardown.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ set -ex

echo "Unregistering test topics"
build/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic 'logstash_integration_.*'
build/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic 'topic_avro.*'
build/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --delete --topic 'topic_avro.*' 2>&1

echo "Stopping Kafka broker"
build/kafka/bin/kafka-server-stop.sh
echo "Stopping zookeeper"
build/kafka/bin/zookeeper-server-stop.sh

if [ -f "build/kafka/bin/zookeeper-server-stop.sh" ]; then
echo "Stopping ZooKeeper"
build/kafka/bin/zookeeper-server-stop.sh
fi

echo "Clean TLS folder"
rm -Rf tls_repository
30 changes: 29 additions & 1 deletion lib/logstash/inputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ class LogStash::Inputs::Kafka < LogStash::Inputs::Base
# consumer crated by each thread an artificial suffix is appended to the user provided `group_instance_id`
# to avoid clashing.
config :group_instance_id, :validate => :string
# `classic` is the "stop-the-world" rebalances.
# Any consumer restart or failure triggers a full-group rebalance, pausing processing for all consumers.
# `consumer` is an incremental rebalance protocol that avoids global sync barriers,
# pausing only the partitions that are reassigned.
# It cannot set along with `partition_assignment_strategy`, `heartbeat_interval_ms` and `session_timeout_ms`
config :group_protocol, :validate => ["classic", "consumer"], :default => "classic"
# The expected time between heartbeats to the consumer coordinator. Heartbeats are used to ensure
# that the consumer's session stays active and to facilitate rebalancing when new
# consumers join or leave the group. The value must be set lower than
Expand Down Expand Up @@ -293,6 +299,8 @@ def register
reassign_dns_lookup
@pattern ||= java.util.regex.Pattern.compile(@topics_pattern) unless @topics_pattern.nil?
check_schema_registry_parameters

set_group_protocol!
end

METADATA_NONE = Set[].freeze
Expand Down Expand Up @@ -450,6 +458,7 @@ def create_consumer(client_id, group_instance_id)
props.put(kafka::FETCH_MIN_BYTES_CONFIG, fetch_min_bytes.to_s) unless fetch_min_bytes.nil?
props.put(kafka::GROUP_ID_CONFIG, group_id)
props.put(kafka::GROUP_INSTANCE_ID_CONFIG, group_instance_id) unless group_instance_id.nil?
props.put(kafka::GROUP_PROTOCOL_CONFIG, group_protocol)
props.put(kafka::HEARTBEAT_INTERVAL_MS_CONFIG, heartbeat_interval_ms.to_s) unless heartbeat_interval_ms.nil?
props.put(kafka::ISOLATION_LEVEL_CONFIG, isolation_level)
props.put(kafka::KEY_DESERIALIZER_CLASS_CONFIG, key_deserializer_class)
Expand All @@ -471,7 +480,7 @@ def create_consumer(client_id, group_instance_id)
props.put("security.protocol", security_protocol) unless security_protocol.nil?
if schema_registry_url
props.put(kafka::VALUE_DESERIALIZER_CLASS_CONFIG, Java::io.confluent.kafka.serializers.KafkaAvroDeserializer.java_class)
serdes_config = Java::io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig
serdes_config = Java::io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig
props.put(serdes_config::SCHEMA_REGISTRY_URL_CONFIG, schema_registry_url.uri.to_s)
if schema_registry_proxy && !schema_registry_proxy.empty?
props.put(serdes_config::PROXY_HOST, @schema_registry_proxy_host)
Expand Down Expand Up @@ -513,6 +522,25 @@ def create_consumer(client_id, group_instance_id)
end
end

# In order to use group_protocol => consumer, heartbeat_interval_ms, session_timeout_ms and partition_assignment_strategy need to be unset
# If any of these are not using the default value of the plugin, we raise a configuration error
def set_group_protocol!
return unless group_protocol == "consumer"

heartbeat_overridden = heartbeat_interval_ms != self.class.get_config.dig("heartbeat_interval_ms", :default)
session_overridden = session_timeout_ms != self.class.get_config.dig("session_timeout_ms", :default)
strategy_defined = !partition_assignment_strategy.nil?

if strategy_defined || heartbeat_overridden || session_overridden
raise LogStash::ConfigurationError, "group_protocol cannot be set to 'consumer' "\
"when any of partition_assignment_strategy, heartbeat_interval_ms or session_timeout_ms is set"
end

@heartbeat_interval_ms = nil
@session_timeout_ms = nil
logger.debug("Settings 'heartbeat_interval_ms' and 'session_timeout_ms' have been reset since 'group_protocol' is configured as 'consumer'")
end

def partition_assignment_strategy_class
case partition_assignment_strategy
when 'range'
Expand Down
Loading