Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ci/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ env

set -ex

export KAFKA_VERSION=3.3.1
export KAFKA_VERSION=3.9.1
./kafka_test_setup.sh

bundle exec rspec -fd
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 11.8.0
- Deprecate `partitioner => default` option [#206](https://github.com/logstash-plugins/logstash-integration-kafka/pull/206)

## 11.7.0
- Add `reconnect_backoff_max_ms` option for configuring kafka client [#204](https://github.com/logstash-plugins/logstash-integration-kafka/pull/204)

Expand Down
11 changes: 6 additions & 5 deletions docs/output-kafka.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -349,14 +349,15 @@ The max time in milliseconds before a metadata refresh is forced.
* Value type is <<string,string>>
* There is no default value for this setting.

The default behavior is to hash the `message_key` of an event to get the partition.
When no message key is present, the plugin picks a partition in a round-robin fashion.
By not setting this value, the plugin uses the default strategy provided by the Kafka client.

Available options for choosing a partitioning strategy are as follows:
Available options are as follows:

* `default` use the default partitioner as described above
* `default` hashes the `message_key` of an event to get the partition. When no message key is present, the plugin picks a partition in a round-robin fashion.
* `round_robin` distributes writes to all partitions equally, regardless of `message_key`
* `uniform_sticky` sticks to a partition for the duration of a batch than randomly picks a new one
* `uniform_sticky` hashes the `message_key` of an event to get the partition. When no message key is present, the plugin sticks to a partition for the duration of a batch than randomly picks a new one.

NOTE: `default` is deprecated and will be removed in v12.0.0.

[id="plugins-{type}s-{plugin}-receive_buffer_bytes"]
===== `receive_buffer_bytes`
Expand Down
4 changes: 2 additions & 2 deletions kafka_test_setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ set -ex
if [ -n "${KAFKA_VERSION+1}" ]; then
echo "KAFKA_VERSION is $KAFKA_VERSION"
else
KAFKA_VERSION=3.4.1
KAFKA_VERSION=3.9.1
fi

export _JAVA_OPTIONS="-Djava.net.preferIPv4Stack=true"
Expand All @@ -17,7 +17,7 @@ mkdir build
echo "Setup Kafka version $KAFKA_VERSION"
if [ ! -e "kafka_2.13-$KAFKA_VERSION.tgz" ]; then
echo "Kafka not present locally, downloading"
curl -s -o "kafka_2.13-$KAFKA_VERSION.tgz" "https://archive.apache.org/dist/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz"
curl -s -o "kafka_2.13-$KAFKA_VERSION.tgz" "https://downloads.apache.org/kafka/$KAFKA_VERSION/kafka_2.13-$KAFKA_VERSION.tgz"
fi
cp kafka_2.13-$KAFKA_VERSION.tgz build/kafka.tgz
mkdir build/kafka && tar xzf build/kafka.tgz -C build/kafka --strip-components 1
Expand Down
3 changes: 3 additions & 0 deletions lib/logstash/outputs/kafka.rb
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,9 @@ def partitioner_class
when 'uniform_sticky'
'org.apache.kafka.clients.producer.UniformStickyPartitioner'
when 'default'
logger.warn('Producer `partitioner` is configured with the deprecated option `default`. ' \
'DefaultPartitioner is removed in kafka-client 4.0 and the `default` option will be removed in v12.0.0. ' \
'Please update your configuration to use `uniform_sticky` or `round_robin`. ')
'org.apache.kafka.clients.producer.internals.DefaultPartitioner'
else
unless partitioner.index('.')
Expand Down
2 changes: 1 addition & 1 deletion logstash-integration-kafka.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-integration-kafka'
s.version = '11.7.0'
s.version = '11.8.0'
s.licenses = ['Apache-2.0']
s.summary = "Integration with Kafka - input and output plugins"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline "+
Expand Down