Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions .chloggen/feat_44564.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: receiver/kafka

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "`receiver.kafkareceiver.UseFranzGo` feature gate moved to Stable and is now always enabled"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [44564]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The franz-go client is now the default and only Kafka client library for the Kafka receiver.
The feature gate `receiver.kafkareceiver.UseFranzGo` has been promoted to Stable status and cannot be disabled.
Users can no longer opt out of using the franz-go client in favor of the legacy Sarama client.
The Sarama code and the feature gate will be removed completely after v0.143.0.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
8 changes: 4 additions & 4 deletions receiver/kafkareceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ If used in conjunction with the `kafkaexporter` configured with `include_metadat
## Getting Started

> [!NOTE]
> You can opt out of using the [`franz-go`](https://github.com/twmb/franz-go) client by disabling the feature gate
> `receiver.kafkareceiver.UseFranzGo` when you run the OpenTelemetry Collector. See the following page
> for more details: [Feature Gates](https://github.com/open-telemetry/opentelemetry-collector/tree/main/featuregate#controlling-gates)
> The Kafka receiver uses the [`franz-go`](https://github.com/twmb/franz-go) client library, which provides
> better performance and support for modern Kafka features. The `receiver.kafkareceiver.UseFranzGo` feature
> gate is now stable and always enabled (as of v0.141.0). The legacy Sarama client will be removed after v0.143.0.
>
> The `franz-go` client supports directly consuming from multiple topics by specifying a regex expression.
> To enable this feature, prefix your topic with the `^` character. This is identical to how the `librdkafka`
Expand Down Expand Up @@ -68,7 +68,7 @@ The following settings can be optionally configured:
- `initial_offset` (default = latest): The initial offset to use if no offset was previously committed. Must be `latest` or `earliest`.
- `session_timeout` (default = `10s`): The request timeout for detecting client failures when using Kafka’s group management facilities.
- `heartbeat_interval` (default = `3s`): The expected time between heartbeats to the consumer coordinator when using Kafka’s group management facilities.
- `group_rebalance_strategy` (default = `cooperative-sticky` (franz-go), 'range' (Sarama)): This strategy is used to assign partitions to consumers within a consumer group. This setting determines how Kafka distributes topic partitions among the consumers in the group during rebalances. Supported strategies are:
- `group_rebalance_strategy` (default = `cooperative-sticky`): This strategy is used to assign partitions to consumers within a consumer group. This setting determines how Kafka distributes topic partitions among the consumers in the group during rebalances. Supported strategies are:
- `range`: This strategy assigns partitions to consumers based on a range. It aims to distribute partitions evenly across consumers, but it can lead to uneven distribution if the number of partitions is not a multiple of the number of consumers. For more information, refer to the Kafka RangeAssignor documentation, see [RangeAssignor](https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/RangeAssignor.html).
- `roundrobin`: This strategy assigns partitions to consumers in a round-robin fashion. It ensures a more even distribution of partitions across consumers, especially when the number of partitions is not a multiple of the number of consumers. For more information, refer to the Kafka RoundRobinAssignor documentation, see [RoundRobinAssignor](https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/RoundRobinAssignor.html).
- `sticky`: This strategy aims to maintain the same partition assignments during rebalances as much as possible. It minimizes the number of partition movements, which can be beneficial for stateful consumers. For more information, refer to the Kafka StickyAssignor documentation, see [StickyAssignor](https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/StickyAssignor.html).
Expand Down
3 changes: 2 additions & 1 deletion receiver/kafkareceiver/consumer_franz.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ const franzGoConsumerFeatureGateName = "receiver.kafkareceiver.UseFranzGo"
// the Kafka receiver will use the franz-go client, which is more performant and has
// better support for modern Kafka features.
var franzGoConsumerFeatureGate = featuregate.GlobalRegistry().MustRegister(
franzGoConsumerFeatureGateName, featuregate.StageBeta,
franzGoConsumerFeatureGateName, featuregate.StageStable,
featuregate.WithRegisterDescription("When enabled, the Kafka receiver will use the franz-go client to consume messages."),
featuregate.WithRegisterFromVersion("v0.129.0"),
featuregate.WithRegisterToVersion("v0.143.0"),
)

type topicPartition struct {
Expand Down
12 changes: 6 additions & 6 deletions receiver/kafkareceiver/consumer_franz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/pdata/testdata"
"go.opentelemetry.io/collector/receiver/receiverhelper"
Expand All @@ -30,11 +29,12 @@ import (
)

func setFranzGo(tb testing.TB, value bool) {
currentFranzState := franzGoConsumerFeatureGate.IsEnabled()
require.NoError(tb, featuregate.GlobalRegistry().Set(franzGoConsumerFeatureGate.ID(), value))
tb.Cleanup(func() {
require.NoError(tb, featuregate.GlobalRegistry().Set(franzGoConsumerFeatureGate.ID(), currentFranzState))
})
// Feature gate is now stable and always enabled.
// This function is kept for backward compatibility with existing tests
// but no longer modifies the feature gate state.
if !value {
tb.Skip("Sarama client tests are skipped as the franz-go feature gate is now stable and always enabled")
}
}

func TestConsumerShutdownConsuming(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions receiver/kafkareceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestCreateDefaultConfig(t *testing.T) {

func TestCreateTraces(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.Brokers = []string{"localhost:9092"}
cfg.ProtocolVersion = "2.0.0"
r, err := createTracesReceiver(t.Context(), receivertest.NewNopSettings(metadata.Type), cfg, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -110,7 +110,7 @@ func TestWithTracesUnmarshalers(t *testing.T) {

func TestCreateMetrics(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.Brokers = []string{"localhost:9092"}
cfg.ProtocolVersion = "2.0.0"
r, err := createMetricsReceiver(t.Context(), receivertest.NewNopSettings(metadata.Type), cfg, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestWithMetricsUnmarshalers(t *testing.T) {

func TestCreateLogs(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.Brokers = []string{"localhost:9092"}
cfg.ProtocolVersion = "2.0.0"
r, err := createLogsReceiver(t.Context(), receivertest.NewNopSettings(metadata.Type), cfg, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -214,7 +214,7 @@ func TestWithLogsUnmarshalers(t *testing.T) {

func TestCreateProfiles(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.Brokers = []string{"localhost:9092"}
cfg.ProtocolVersion = "2.0.0"
r, err := createProfilesReceiver(t.Context(), receivertest.NewNopSettings(metadata.Type), cfg, nil)
require.NoError(t, err)
Expand Down
5 changes: 3 additions & 2 deletions receiver/kafkareceiver/kafka_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@ func init() {
}

func runTestForClients(t *testing.T, fn func(t *testing.T)) {
clients := []string{"Sarama", "Franz"}
// Only run tests with Franz client since the feature gate is now stable.
// Sarama tests will be removed in a future version.
clients := []string{"Franz"}
for _, client := range clients {
t.Run(client, func(t *testing.T) {
setFranzGo(t, client == "Franz")
fn(t)
})
}
Expand Down
Loading