From 1ac56132509f2b64824249a346c2ae05988f2a66 Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Thu, 27 Nov 2025 22:33:13 +0000 Subject: [PATCH 1/3] [receiver/kafka] move franz-go feature gate to stable Signed-off-by: Paulo Dias --- .chloggen/feat_44564.yaml | 31 +++++++++++++++++++ receiver/kafkareceiver/README.md | 8 ++--- receiver/kafkareceiver/consumer_franz.go | 3 +- receiver/kafkareceiver/consumer_franz_test.go | 12 +++---- receiver/kafkareceiver/kafka_receiver_test.go | 5 +-- 5 files changed, 46 insertions(+), 13 deletions(-) create mode 100644 .chloggen/feat_44564.yaml diff --git a/.chloggen/feat_44564.yaml b/.chloggen/feat_44564.yaml new file mode 100644 index 0000000000000..863183486bda6 --- /dev/null +++ b/.chloggen/feat_44564.yaml @@ -0,0 +1,31 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: receiver/kafka + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "`receiver.kafkareceiver.UseFranzGo` feature gate moved to Stable and is now always enabled" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [44564] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + The franz-go client is now the default and only Kafka client library for the Kafka receiver. + The feature gate `receiver.kafkareceiver.UseFranzGo` has been promoted to Stable status and cannot be disabled. + Users can no longer opt out of using the franz-go client in favor of the legacy Sarama client. + The Sarama code and the feature gate will be removed completely after v0.143.0. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index aba804a278a27..c3373d6085d1b 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -23,9 +23,9 @@ If used in conjunction with the `kafkaexporter` configured with `include_metadat ## Getting Started > [!NOTE] -> You can opt out of using the [`franz-go`](https://github.com/twmb/franz-go) client by disabling the feature gate -> `receiver.kafkareceiver.UseFranzGo` when you run the OpenTelemetry Collector. See the following page -> for more details: [Feature Gates](https://github.com/open-telemetry/opentelemetry-collector/tree/main/featuregate#controlling-gates) +> The Kafka receiver uses the [`franz-go`](https://github.com/twmb/franz-go) client library, which provides +> better performance and support for modern Kafka features. The `receiver.kafkareceiver.UseFranzGo` feature +> gate is now stable and always enabled (as of v0.141.0). The legacy Sarama client will be removed after v0.143.0. > > The `franz-go` client supports directly consuming from multiple topics by specifying a regex expression. > To enable this feature, prefix your topic with the `^` character. This is identical to how the `librdkafka` @@ -68,7 +68,7 @@ The following settings can be optionally configured: - `initial_offset` (default = latest): The initial offset to use if no offset was previously committed. Must be `latest` or `earliest`. - `session_timeout` (default = `10s`): The request timeout for detecting client failures when using Kafka’s group management facilities. - `heartbeat_interval` (default = `3s`): The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities. -- `group_rebalance_strategy` (default = `cooperative-sticky` (franz-go), 'range' (Sarama)): This strategy is used to assign partitions to consumers within a consumer group. This setting determines how Kafka distributes topic partitions among the consumers in the group during rebalances. Supported strategies are: +- `group_rebalance_strategy` (default = `cooperative-sticky`): This strategy is used to assign partitions to consumers within a consumer group. This setting determines how Kafka distributes topic partitions among the consumers in the group during rebalances. Supported strategies are: - `range`: This strategy assigns partitions to consumers based on a range. It aims to distribute partitions evenly across consumers, but it can lead to uneven distribution if the number of partitions is not a multiple of the number of consumers. For more information, refer to the Kafka RangeAssignor documentation, see [RangeAssignor](https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html). - `roundrobin`: This strategy assigns partitions to consumers in a round-robin fashion. It ensures a more even distribution of partitions across consumers, especially when the number of partitions is not a multiple of the number of consumers. For more information, refer to the Kafka RoundRobinAssignor documentation, see [RoundRobinAssignor](https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html). - `sticky`: This strategy aims to maintain the same partition assignments during rebalances as much as possible. It minimizes the number of partition movements, which can be beneficial for stateful consumers. For more information, refer to the Kafka StickyAssignor documentation, see [StickyAssignor](https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html). diff --git a/receiver/kafkareceiver/consumer_franz.go b/receiver/kafkareceiver/consumer_franz.go index 81851136538ea..d06f711c51f05 100644 --- a/receiver/kafkareceiver/consumer_franz.go +++ b/receiver/kafkareceiver/consumer_franz.go @@ -35,9 +35,10 @@ const franzGoConsumerFeatureGateName = "receiver.kafkareceiver.UseFranzGo" // the Kafka receiver will use the franz-go client, which is more performant and has // better support for modern Kafka features. var franzGoConsumerFeatureGate = featuregate.GlobalRegistry().MustRegister( - franzGoConsumerFeatureGateName, featuregate.StageBeta, + franzGoConsumerFeatureGateName, featuregate.StageStable, featuregate.WithRegisterDescription("When enabled, the Kafka receiver will use the franz-go client to consume messages."), featuregate.WithRegisterFromVersion("v0.129.0"), + featuregate.WithRegisterToVersion("v0.141.0"), ) type topicPartition struct { diff --git a/receiver/kafkareceiver/consumer_franz_test.go b/receiver/kafkareceiver/consumer_franz_test.go index b146134e8d083..b8c58983d1bb3 100644 --- a/receiver/kafkareceiver/consumer_franz_test.go +++ b/receiver/kafkareceiver/consumer_franz_test.go @@ -19,7 +19,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configretry" - "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pdata/testdata" "go.opentelemetry.io/collector/receiver/receiverhelper" @@ -30,11 +29,12 @@ import ( ) func setFranzGo(tb testing.TB, value bool) { - currentFranzState := franzGoConsumerFeatureGate.IsEnabled() - require.NoError(tb, featuregate.GlobalRegistry().Set(franzGoConsumerFeatureGate.ID(), value)) - tb.Cleanup(func() { - require.NoError(tb, featuregate.GlobalRegistry().Set(franzGoConsumerFeatureGate.ID(), currentFranzState)) - }) + // Feature gate is now stable and always enabled. + // This function is kept for backward compatibility with existing tests + // but no longer modifies the feature gate state. + if !value { + tb.Skip("Sarama client tests are skipped as the franz-go feature gate is now stable and always enabled") + } } func TestConsumerShutdownConsuming(t *testing.T) { diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go index d0c4c9f099a33..c718578bc855a 100644 --- a/receiver/kafkareceiver/kafka_receiver_test.go +++ b/receiver/kafkareceiver/kafka_receiver_test.go @@ -59,10 +59,11 @@ func init() { } func runTestForClients(t *testing.T, fn func(t *testing.T)) { - clients := []string{"Sarama", "Franz"} + // Only run tests with Franz client since the feature gate is now stable. + // Sarama tests will be removed in a future version. + clients := []string{"Franz"} for _, client := range clients { t.Run(client, func(t *testing.T) { - setFranzGo(t, client == "Franz") fn(t) }) } From c0421c0fb3062ffc492e2f61f948165751d5ee32 Mon Sep 17 00:00:00 2001 From: Paulo Dias Date: Thu, 27 Nov 2025 23:33:28 +0000 Subject: [PATCH 2/3] fix: fix windows CI Signed-off-by: Paulo Dias --- receiver/kafkareceiver/factory_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/receiver/kafkareceiver/factory_test.go b/receiver/kafkareceiver/factory_test.go index 6395c62e59c3f..bc9389f5512c1 100644 --- a/receiver/kafkareceiver/factory_test.go +++ b/receiver/kafkareceiver/factory_test.go @@ -58,7 +58,7 @@ func TestCreateDefaultConfig(t *testing.T) { func TestCreateTraces(t *testing.T) { cfg := createDefaultConfig().(*Config) - cfg.Brokers = []string{"invalid:9092"} + cfg.Brokers = []string{"localhost:9092"} cfg.ProtocolVersion = "2.0.0" r, err := createTracesReceiver(t.Context(), receivertest.NewNopSettings(metadata.Type), cfg, nil) require.NoError(t, err) @@ -110,7 +110,7 @@ func TestWithTracesUnmarshalers(t *testing.T) { func TestCreateMetrics(t *testing.T) { cfg := createDefaultConfig().(*Config) - cfg.Brokers = []string{"invalid:9092"} + cfg.Brokers = []string{"localhost:9092"} cfg.ProtocolVersion = "2.0.0" r, err := createMetricsReceiver(t.Context(), receivertest.NewNopSettings(metadata.Type), cfg, nil) require.NoError(t, err) @@ -162,7 +162,7 @@ func TestWithMetricsUnmarshalers(t *testing.T) { func TestCreateLogs(t *testing.T) { cfg := createDefaultConfig().(*Config) - cfg.Brokers = []string{"invalid:9092"} + cfg.Brokers = []string{"localhost:9092"} cfg.ProtocolVersion = "2.0.0" r, err := createLogsReceiver(t.Context(), receivertest.NewNopSettings(metadata.Type), cfg, nil) require.NoError(t, err) @@ -214,7 +214,7 @@ func TestWithLogsUnmarshalers(t *testing.T) { func TestCreateProfiles(t *testing.T) { cfg := createDefaultConfig().(*Config) - cfg.Brokers = []string{"invalid:9092"} + cfg.Brokers = []string{"localhost:9092"} cfg.ProtocolVersion = "2.0.0" r, err := createProfilesReceiver(t.Context(), receivertest.NewNopSettings(metadata.Type), cfg, nil) require.NoError(t, err) From 499af3ea65694e9f169a0744077ca524c73033b9 Mon Sep 17 00:00:00 2001 From: Paulo Dias <44772900+paulojmdias@users.noreply.github.com> Date: Fri, 28 Nov 2025 07:57:35 +0000 Subject: [PATCH 3/3] Update consumer_franz.go Co-authored-by: Andrew Wilkins --- receiver/kafkareceiver/consumer_franz.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/kafkareceiver/consumer_franz.go b/receiver/kafkareceiver/consumer_franz.go index d06f711c51f05..f5f5298ad4463 100644 --- a/receiver/kafkareceiver/consumer_franz.go +++ b/receiver/kafkareceiver/consumer_franz.go @@ -38,7 +38,7 @@ var franzGoConsumerFeatureGate = featuregate.GlobalRegistry().MustRegister( franzGoConsumerFeatureGateName, featuregate.StageStable, featuregate.WithRegisterDescription("When enabled, the Kafka receiver will use the franz-go client to consume messages."), featuregate.WithRegisterFromVersion("v0.129.0"), - featuregate.WithRegisterToVersion("v0.141.0"), + featuregate.WithRegisterToVersion("v0.143.0"), ) type topicPartition struct {