From f5b98de35fc0707e8f73b337299277c794c0e731 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Thu, 27 Nov 2025 15:21:03 +0530 Subject: [PATCH 1/8] [receiver/kafka] Add support for topic and exclude_topic as an array --- .chloggen/support-arrayof-string.yaml | 27 ++++ receiver/kafkareceiver/README.md | 36 +++-- receiver/kafkareceiver/config.go | 92 ++++++++++--- receiver/kafkareceiver/config_test.go | 129 +++++++++++++----- receiver/kafkareceiver/consumer_bench_test.go | 6 +- receiver/kafkareceiver/factory.go | 4 + receiver/kafkareceiver/kafka_receiver.go | 6 +- receiver/kafkareceiver/testdata/config.yaml | 14 +- 8 files changed, 246 insertions(+), 68 deletions(-) create mode 100644 .chloggen/support-arrayof-string.yaml diff --git a/.chloggen/support-arrayof-string.yaml b/.chloggen/support-arrayof-string.yaml new file mode 100644 index 0000000000000..ccb36e6806312 --- /dev/null +++ b/.chloggen/support-arrayof-string.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: receiver/kafka + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Support configuring a list of topics and exclude_topics + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [44477] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index aba804a278a27..11f8d9f8ca266 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -42,25 +42,37 @@ The following settings can be optionally configured: - `protocol_version` (default = 2.1.0): Kafka protocol version. - `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup - `logs` - - `topic` (default = otlp\_logs): The name of the Kafka topic from which to consume logs. + - `topic` (Deprecated [v0.142.0]: use `topics`) + (default = otlp\logs): If this is set, it will take precedence over default value of `topics` + - `topics` (default = otlp\_logs): List of kafka topics from which to consume logs - `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings). - - `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex. + - `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`) + (default = ""): If this is set, it will take precedence over default value of `exclude_topics` + - `exclude_topics` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex. - `metrics` - - `topic` (default = otlp\_metrics): The name of the Kafka topic from which to consume metrics. + - `topic` (Deprecated [v0.142.0]: use `topics`) + (default = otlp\_metrics): If this is set, it will take precedence over default value of `topics` + - `topics` (default = otlp\_metrics): List of Kafka topic from which to consume metrics. - `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings). - - `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex. + - `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`) + (default = ""): If this is set, it will take precedence over default value of `exclude_topics` + - `exclude_topics` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex. - `traces` - - `topic` (default = otlp\_spans): The name of the Kafka topic from which to consume traces. + - `topic` (Deprecated [v0.142.0]: use `topics`) + (default = otlp\_spans): If this is set, it will take precedence over default value of `topics` + - `topics` (default = otlp\_spans): List of Kafka topic from which to consume traces. - `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings). - - `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex. + - `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`) + (default = ""): If this is set, it will take precedence over default value of `exclude_topics` + - `exclude_topics` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex. - `profiles` - - `topic` (default = otlp\_profiles): The name of the Kafka topic from which to consume profiles. + - `topic` (Deprecated [v0.142.0]: use `topics`) + (default = otlp\_profiles): If this is set, it will take precedence over default value of `topics` + - `topics` (default = otlp\_profiles): List of Kafka topic from which to consume profiles. - `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings). - - `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex. -- `topic` (Deprecated [v0.124.0]: use `logs::topic`, `traces::topic`, or `metrics::topic`). - If this is set, it will take precedence over the default value for those fields. -- `encoding` (Deprecated [v0.124.0]: use `logs::encoding`, `traces::encoding`, or `metrics::encoding`). - If this is set, it will take precedence over the default value for those fields. + - `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`) + (default = ""): If this is set, it will take precedence over default value of `exclude_topics` + - `exclude_topics` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex. - `group_id` (default = otel-collector): The consumer group that receiver will be consuming messages from - `client_id` (default = otel-collector): The consumer client ID that receiver will use - `rack_id` (default = ""): The rack identifier for this client. When set and brokers are configured with a rack-aware replica selector, the client will prefer fetching from the closest replica. diff --git a/receiver/kafkareceiver/config.go b/receiver/kafkareceiver/config.go index eed203ef68452..bb44d24e1f25d 100644 --- a/receiver/kafkareceiver/config.go +++ b/receiver/kafkareceiver/config.go @@ -53,6 +53,45 @@ func (c *Config) Unmarshal(conf *confmap.Conf) error { return err } + // Check if deprecated fields have been explicitly set + // give them precedence + var zeroConfig Config + if err := conf.Unmarshal(&zeroConfig); err != nil { + return err + } + + // handle deprecated topic and exclude_topic for log signal + if zeroConfig.Logs.Topic != "" && len(zeroConfig.Logs.Topics) == 0 { + c.Logs.Topics = []string{zeroConfig.Logs.Topic} + } + if zeroConfig.Logs.ExcludeTopic != "" && len(zeroConfig.Logs.ExcludeTopics) == 0 { + c.Logs.ExcludeTopics = []string{zeroConfig.Logs.ExcludeTopic} + } + + // handle deprecated topic and exclude_topic for metric signal + if zeroConfig.Metrics.Topic != "" && len(zeroConfig.Logs.Topics) == 0 { + c.Metrics.Topics = []string{zeroConfig.Metrics.Topic} + } + if zeroConfig.Metrics.ExcludeTopic != "" && len(zeroConfig.Logs.ExcludeTopics) == 0 { + c.Metrics.ExcludeTopics = []string{zeroConfig.Metrics.ExcludeTopic} + } + + // handle deprecated topic and exclude_topic for trace signal + if zeroConfig.Traces.Topic != "" && len(zeroConfig.Logs.Topics) == 0 { + c.Traces.Topics = []string{zeroConfig.Traces.Topic} + } + if zeroConfig.Traces.ExcludeTopic != "" && len(zeroConfig.Logs.ExcludeTopics) == 0 { + c.Traces.ExcludeTopics = []string{zeroConfig.Traces.ExcludeTopic} + } + + // handle deprecated topic and exclude_topic for profile signal + if zeroConfig.Profiles.Topic != "" && len(zeroConfig.Logs.Topics) == 0 { + c.Profiles.Topics = []string{zeroConfig.Profiles.Topic} + } + if zeroConfig.Profiles.ExcludeTopic != "" && len(zeroConfig.Logs.ExcludeTopics) == 0 { + c.Profiles.ExcludeTopics = []string{zeroConfig.Profiles.ExcludeTopic} + } + // Set OnPermanentError default value to inherit from OnError for backward compatibility // Only if OnPermanentError was not explicitly set in the config rawConf := conf.Get("message_marking") @@ -73,45 +112,61 @@ func (c *Config) Unmarshal(conf *confmap.Conf) error { // Validate checks the receiver configuration is valid. func (c *Config) Validate() error { // Validate that exclude_topic is only used with regex topic patterns - if err := validateExcludeTopic("logs", c.Logs.Topic, c.Logs.ExcludeTopic); err != nil { + if err := validateExcludeTopic("logs", c.Logs.Topics, c.Logs.ExcludeTopics); err != nil { return err } - if err := validateExcludeTopic("metrics", c.Metrics.Topic, c.Metrics.ExcludeTopic); err != nil { + if err := validateExcludeTopic("metrics", c.Metrics.Topics, c.Metrics.ExcludeTopics); err != nil { return err } - if err := validateExcludeTopic("traces", c.Traces.Topic, c.Traces.ExcludeTopic); err != nil { + if err := validateExcludeTopic("traces", c.Traces.Topics, c.Traces.ExcludeTopics); err != nil { return err } - if err := validateExcludeTopic("profiles", c.Profiles.Topic, c.Profiles.ExcludeTopic); err != nil { + if err := validateExcludeTopic("profiles", c.Profiles.Topics, c.Profiles.ExcludeTopics); err != nil { return err } return nil } // validateExcludeTopic checks that exclude_topic is only configured when topic uses regex pattern -func validateExcludeTopic(signalType, topic, excludeTopic string) error { - if excludeTopic == "" { +func validateExcludeTopic(signalType string, topics, excludeTopics []string) error { + if len(excludeTopics) == 0 { return nil // No exclude_topic configured, nothing to validate } - if !strings.HasPrefix(topic, "^") { + + // if none of the configured topic uses regex return error + var usesRegex bool + for _, topic := range topics { + if strings.HasPrefix(topic, "^") { + usesRegex = true + } + } + + if !usesRegex { return fmt.Errorf( - "%s.exclude_topic is configured but %s.topic does not use regex pattern (must start with '^')", + "%s.exclude_topics is configured but none of the configured %s.topics use regex pattern (must start with '^')", signalType, signalType, ) } - // Validate that exclude_topic is a valid regex pattern - if _, err := regexp.Compile(excludeTopic); err != nil { - return fmt.Errorf( - "%s.exclude_topic contains invalid regex pattern: %w", - signalType, err, - ) + + for _, excludeTopic := range excludeTopics { + // Validate that exclude_topic is a valid regex pattern + if _, err := regexp.Compile(excludeTopic); err != nil { + return fmt.Errorf( + "%s.exclude_topic contains invalid regex pattern: %w", + signalType, err, + ) + } } + return nil } // TopicEncodingConfig holds signal-specific topic and encoding configuration. type TopicEncodingConfig struct { - // Topic holds the name of the Kafka topic from which messages of the + // Deprecated [v0.142.0]: Use Topics + Topic string `mapstructure:"topic"` + + // Topics holds the name of the Kafka topic from which messages of the // signal type should be consumed. // // The default depends on the signal type: @@ -119,15 +174,18 @@ type TopicEncodingConfig struct { // - "otlp_metrics" for metrics // - "otlp_logs" for logs // - "otlp_profiles" for profiles - Topic string `mapstructure:"topic"` + Topics []string `mapstructure:"topics"` // Encoding holds the expected encoding of messages for the signal type // // Defaults to "otlp_proto". Encoding string `mapstructure:"encoding"` - // Optional exclude topic option, used only in regex mode. + // Deprecated [v0.142.0]: Use ExcludeTopics ExcludeTopic string `mapstructure:"exclude_topic"` + + // Optional exclude topics option, used only in regex mode. + ExcludeTopics []string `mapstructure:"exclude_topics"` } type MessageMarking struct { diff --git a/receiver/kafkareceiver/config_test.go b/receiver/kafkareceiver/config_test.go index 1493fea1eac2a..c00d1f612aede 100644 --- a/receiver/kafkareceiver/config_test.go +++ b/receiver/kafkareceiver/config_test.go @@ -31,6 +31,44 @@ func TestLoadConfig(t *testing.T) { expected component.Config expectedErr error }{ + { + id: component.NewIDWithName(metadata.Type, "legacy_topic"), + expected: &Config{ + ClientConfig: func() configkafka.ClientConfig { + config := configkafka.NewDefaultClientConfig() + return config + }(), + ConsumerConfig: func() configkafka.ConsumerConfig { + config := configkafka.NewDefaultConsumerConfig() + return config + }(), + Logs: TopicEncodingConfig{ + // if deprecated topic is set and topics is not set + // give precedence to topic + Topic: "legacy_logs", + Topics: []string{"legacy_logs"}, + Encoding: "otlp_proto", + }, + Metrics: TopicEncodingConfig{ + Topic: "legacy_metric", + Topics: []string{"legacy_metric"}, + Encoding: "otlp_proto", + }, + Traces: TopicEncodingConfig{ + Topic: "legacy_spans", + Topics: []string{"legacy_spans"}, + Encoding: "otlp_proto", + }, + Profiles: TopicEncodingConfig{ + Topic: "otlp_profiles", + Topics: []string{"otlp_profiles"}, + Encoding: "otlp_proto", + }, + ErrorBackOff: configretry.BackOffConfig{ + Enabled: false, + }, + }, + }, { id: component.NewIDWithName(metadata.Type, "logs"), expected: &Config{ @@ -61,19 +99,23 @@ func TestLoadConfig(t *testing.T) { return config }(), Logs: TopicEncodingConfig{ - Topic: "logs", + Topic: "legacy_topic_log", + Topics: []string{"logs"}, // topics is given precedence if it is set Encoding: "direct", }, Metrics: TopicEncodingConfig{ Topic: "otlp_metrics", + Topics: []string{"otlp_metrics"}, Encoding: "otlp_proto", }, Traces: TopicEncodingConfig{ Topic: "otlp_spans", + Topics: []string{"otlp_spans"}, Encoding: "otlp_proto", }, Profiles: TopicEncodingConfig{ Topic: "otlp_profiles", + Topics: []string{"otlp_profiles"}, Encoding: "otlp_proto", }, ErrorBackOff: configretry.BackOffConfig{ @@ -97,18 +139,22 @@ func TestLoadConfig(t *testing.T) { }(), Logs: TopicEncodingConfig{ Topic: "otlp_logs", + Topics: []string{"otlp_logs"}, Encoding: "otlp_proto", }, Metrics: TopicEncodingConfig{ Topic: "otlp_metrics", + Topics: []string{"otlp_metrics"}, Encoding: "otlp_proto", }, Traces: TopicEncodingConfig{ Topic: "otlp_spans", + Topics: []string{"otlp_spans"}, Encoding: "otlp_proto", }, Profiles: TopicEncodingConfig{ Topic: "otlp_profiles", + Topics: []string{"otlp_profiles"}, Encoding: "otlp_proto", }, ErrorBackOff: configretry.BackOffConfig{ @@ -123,18 +169,22 @@ func TestLoadConfig(t *testing.T) { ConsumerConfig: configkafka.NewDefaultConsumerConfig(), Logs: TopicEncodingConfig{ Topic: "otlp_logs", + Topics: []string{"otlp_logs"}, Encoding: "otlp_proto", }, Metrics: TopicEncodingConfig{ Topic: "otlp_metrics", + Topics: []string{"otlp_metrics"}, Encoding: "otlp_proto", }, Traces: TopicEncodingConfig{ Topic: "otlp_spans", + Topics: []string{"otlp_spans"}, Encoding: "otlp_proto", }, Profiles: TopicEncodingConfig{ Topic: "otlp_profiles", + Topics: []string{"otlp_profiles"}, Encoding: "otlp_proto", }, MessageMarking: MessageMarking{ @@ -154,18 +204,22 @@ func TestLoadConfig(t *testing.T) { ConsumerConfig: configkafka.NewDefaultConsumerConfig(), Logs: TopicEncodingConfig{ Topic: "otlp_logs", + Topics: []string{"otlp_logs"}, Encoding: "otlp_proto", }, Metrics: TopicEncodingConfig{ Topic: "otlp_metrics", + Topics: []string{"otlp_metrics"}, Encoding: "otlp_proto", }, Traces: TopicEncodingConfig{ Topic: "otlp_spans", + Topics: []string{"otlp_spans"}, Encoding: "otlp_proto", }, Profiles: TopicEncodingConfig{ Topic: "otlp_profiles", + Topics: []string{"otlp_profiles"}, Encoding: "otlp_proto", }, MessageMarking: MessageMarking{ @@ -185,18 +239,22 @@ func TestLoadConfig(t *testing.T) { ConsumerConfig: configkafka.NewDefaultConsumerConfig(), Logs: TopicEncodingConfig{ Topic: "otlp_logs", + Topics: []string{"otlp_logs"}, Encoding: "otlp_proto", }, Metrics: TopicEncodingConfig{ Topic: "otlp_metrics", + Topics: []string{"otlp_metrics"}, Encoding: "otlp_proto", }, Traces: TopicEncodingConfig{ Topic: "otlp_spans", + Topics: []string{"otlp_spans"}, Encoding: "otlp_proto", }, Profiles: TopicEncodingConfig{ Topic: "otlp_profiles", + Topics: []string{"otlp_profiles"}, Encoding: "otlp_proto", }, MessageMarking: MessageMarking{ @@ -215,22 +273,29 @@ func TestLoadConfig(t *testing.T) { ClientConfig: configkafka.NewDefaultClientConfig(), ConsumerConfig: configkafka.NewDefaultConsumerConfig(), Logs: TopicEncodingConfig{ - Topic: "^logs-.*", - ExcludeTopic: "^logs-(test|dev)$", - Encoding: "otlp_proto", + Topic: "^logs-.*", + Topics: []string{"^logs-.*"}, + ExcludeTopic: "^logs-(test|dev)$", + ExcludeTopics: []string{"^logs-(test|dev)$"}, + Encoding: "otlp_proto", }, Metrics: TopicEncodingConfig{ - Topic: "^metrics-.*", - ExcludeTopic: "^metrics-internal-.*$", - Encoding: "otlp_proto", + Topic: "^metrics-.*", + Topics: []string{"^metrics-.*"}, + ExcludeTopic: "^metrics-internal-.*$", + ExcludeTopics: []string{"^metrics-internal-.*$"}, + Encoding: "otlp_proto", }, Traces: TopicEncodingConfig{ - Topic: "^traces-.*", - ExcludeTopic: "^traces-debug-.*$", - Encoding: "otlp_proto", + Topic: "^traces-.*", + Topics: []string{"^traces-.*"}, + ExcludeTopic: "^traces-debug-.*$", + ExcludeTopics: []string{"^traces-debug-.*$"}, + Encoding: "otlp_proto", }, Profiles: TopicEncodingConfig{ Topic: "otlp_profiles", + Topics: []string{"otlp_profiles"}, Encoding: "otlp_proto", }, ErrorBackOff: configretry.BackOffConfig{ @@ -265,9 +330,9 @@ func TestConfigValidate(t *testing.T) { name: "valid config with regex and exclude_topic", config: &Config{ Logs: TopicEncodingConfig{ - Topic: "^logs-.*", - ExcludeTopic: "^logs-test$", - Encoding: "otlp_proto", + Topics: []string{"^logs-.*"}, + ExcludeTopics: []string{"^logs-test$"}, + Encoding: "otlp_proto", }, }, expectedErr: "", @@ -276,45 +341,45 @@ func TestConfigValidate(t *testing.T) { name: "invalid config with non-regex topic and exclude_topic for logs", config: &Config{ Logs: TopicEncodingConfig{ - Topic: "logs", - ExcludeTopic: "^logs-test$", - Encoding: "otlp_proto", + Topics: []string{"logs"}, + ExcludeTopics: []string{"^logs-test$"}, + Encoding: "otlp_proto", }, }, - expectedErr: "logs.exclude_topic is configured but logs.topic does not use regex pattern (must start with '^')", + expectedErr: "logs.exclude_topics is configured but none of the configured logs.topics use regex pattern (must start with '^')", }, { name: "invalid config with non-regex topic and exclude_topic for metrics", config: &Config{ Metrics: TopicEncodingConfig{ - Topic: "metrics", - ExcludeTopic: "^metrics-test$", - Encoding: "otlp_proto", + Topics: []string{"metrics"}, + ExcludeTopics: []string{"^metrics-test$"}, + Encoding: "otlp_proto", }, }, - expectedErr: "metrics.exclude_topic is configured but metrics.topic does not use regex pattern (must start with '^')", + expectedErr: "metrics.exclude_topics is configured but none of the configured metrics.topics use regex pattern (must start with '^')", }, { name: "invalid config with non-regex topic and exclude_topic for traces", config: &Config{ Traces: TopicEncodingConfig{ - Topic: "traces", - ExcludeTopic: "^traces-test$", - Encoding: "otlp_proto", + Topics: []string{"traces"}, + ExcludeTopics: []string{"^traces-test$"}, + Encoding: "otlp_proto", }, }, - expectedErr: "traces.exclude_topic is configured but traces.topic does not use regex pattern (must start with '^')", + expectedErr: "traces.exclude_topics is configured but none of the configured traces.topics use regex pattern (must start with '^')", }, { name: "invalid config with non-regex topic and exclude_topic for profiles", config: &Config{ Profiles: TopicEncodingConfig{ - Topic: "profiles", - ExcludeTopic: "^profiles-test$", - Encoding: "otlp_proto", + Topics: []string{"profiles"}, + ExcludeTopics: []string{"^profiles-test$"}, + Encoding: "otlp_proto", }, }, - expectedErr: "profiles.exclude_topic is configured but profiles.topic does not use regex pattern (must start with '^')", + expectedErr: "profiles.exclude_topics is configured but none of the configured profiles.topics use regex pattern (must start with '^')", }, { name: "valid config without exclude_topic", @@ -330,9 +395,9 @@ func TestConfigValidate(t *testing.T) { name: "invalid config with invalid regex in exclude_topic", config: &Config{ Logs: TopicEncodingConfig{ - Topic: "^logs-.*", - ExcludeTopic: "^logs-[invalid(regex", - Encoding: "otlp_proto", + Topics: []string{"^logs-.*"}, + ExcludeTopics: []string{"^logs-[invalid(regex"}, + Encoding: "otlp_proto", }, }, expectedErr: "logs.exclude_topic contains invalid regex pattern", diff --git a/receiver/kafkareceiver/consumer_bench_test.go b/receiver/kafkareceiver/consumer_bench_test.go index e53231621a793..72144dc9c1e44 100644 --- a/receiver/kafkareceiver/consumer_bench_test.go +++ b/receiver/kafkareceiver/consumer_bench_test.go @@ -73,9 +73,9 @@ func newBenchConfigClient(b *testing.B, topic string, partitions int32, messageMarking MessageMarking, ) (*Config, *kgo.Client) { client, cfg := mustNewFakeCluster(b, kfake.SeedTopics(partitions, topic)) - cfg.Logs.Topic = topic - cfg.Traces.Topic = topic - cfg.Metrics.Topic = topic + cfg.Logs.Topics = []string{topic} + cfg.Traces.Topics = []string{topic} + cfg.Metrics.Topics = []string{topic} cfg.GroupID = b.Name() cfg.InitialOffset = "earliest" cfg.AutoCommit = autoCommit diff --git a/receiver/kafkareceiver/factory.go b/receiver/kafkareceiver/factory.go index f6b2246c2fb36..23dbe4a3fe268 100644 --- a/receiver/kafkareceiver/factory.go +++ b/receiver/kafkareceiver/factory.go @@ -48,18 +48,22 @@ func createDefaultConfig() component.Config { ConsumerConfig: configkafka.NewDefaultConsumerConfig(), Logs: TopicEncodingConfig{ Topic: defaultLogsTopic, + Topics: []string{defaultLogsTopic}, Encoding: defaultLogsEncoding, }, Metrics: TopicEncodingConfig{ Topic: defaultMetricsTopic, + Topics: []string{defaultMetricsTopic}, Encoding: defaultMetricsEncoding, }, Traces: TopicEncodingConfig{ Topic: defaultTracesTopic, + Topics: []string{defaultTracesTopic}, Encoding: defaultTracesEncoding, }, Profiles: TopicEncodingConfig{ Topic: defaultProfilesTopic, + Topics: []string{defaultProfilesTopic}, Encoding: defaultProfilesEncoding, }, MessageMarking: MessageMarking{ diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 7a315d79d5a6b..618e1fd5dedc5 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -87,7 +87,7 @@ func newLogsReceiver(config *Config, set receiver.Settings, nextConsumer consume ) }, nil } - return newReceiver(config, set, []string{config.Logs.Topic}, []string{config.Logs.ExcludeTopic}, newConsumeMessageFunc) + return newReceiver(config, set, config.Logs.Topics, config.Logs.ExcludeTopics, newConsumeMessageFunc) } func newMetricsReceiver(config *Config, set receiver.Settings, nextConsumer consumer.Metrics) (receiver.Metrics, error) { @@ -111,7 +111,7 @@ func newMetricsReceiver(config *Config, set receiver.Settings, nextConsumer cons ) }, nil } - return newReceiver(config, set, []string{config.Metrics.Topic}, []string{config.Metrics.ExcludeTopic}, newConsumeMessageFunc) + return newReceiver(config, set, config.Metrics.Topics, config.Metrics.ExcludeTopics, newConsumeMessageFunc) } func newTracesReceiver(config *Config, set receiver.Settings, nextConsumer consumer.Traces) (receiver.Traces, error) { @@ -135,7 +135,7 @@ func newTracesReceiver(config *Config, set receiver.Settings, nextConsumer consu ) }, nil } - return newReceiver(config, set, []string{config.Traces.Topic}, []string{config.Traces.ExcludeTopic}, consumeFn) + return newReceiver(config, set, config.Traces.Topics, config.Traces.ExcludeTopics, consumeFn) } func newProfilesReceiver(config *Config, set receiver.Settings, nextConsumer xconsumer.Profiles) (xreceiver.Profiles, error) { diff --git a/receiver/kafkareceiver/testdata/config.yaml b/receiver/kafkareceiver/testdata/config.yaml index 765b6bcbb4ba2..51c43356a6a78 100644 --- a/receiver/kafkareceiver/testdata/config.yaml +++ b/receiver/kafkareceiver/testdata/config.yaml @@ -1,6 +1,18 @@ +kafka/legacy_topic: + metrics: + topic: legacy_metric + encoding: otlp_proto + traces: + topic: legacy_spans + encoding: otlp_proto + logs: + topic: legacy_logs + encoding: otlp_proto kafka/logs: logs: - topic: logs + topic: legacy_topic_log + topics: + - logs encoding: direct session_timeout: 45s heartbeat_interval: 15s From df16d030c24d3be61ab63d239b2c7963e8de34ab Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Fri, 28 Nov 2025 11:34:08 +0530 Subject: [PATCH 2/8] fix commit --- receiver/kafkareceiver/kafka_receiver_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go index d0c4c9f099a33..c0ac10c779634 100644 --- a/receiver/kafkareceiver/kafka_receiver_test.go +++ b/receiver/kafkareceiver/kafka_receiver_test.go @@ -766,8 +766,8 @@ func TestExcludeTopicWithSarama(t *testing.T) { _, receiverConfig := mustNewFakeCluster(t, kfake.SeedTopics(1, "otlp_spans")) // Configure exclude_topic - only supported with franz-go - receiverConfig.Traces.Topic = "^otlp_spans.*" - receiverConfig.Traces.ExcludeTopic = "^otlp_spans-test$" + receiverConfig.Traces.Topics = []string{"^otlp_spans.*"} + receiverConfig.Traces.ExcludeTopics = []string{"^otlp_spans-test$"} set, _, _ := mustNewSettings(t) _, err := newTracesReceiver(receiverConfig, set, &consumertest.TracesSink{}) From 9237a1b541010e84c69e9b7471cc7dc42d22cb12 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Fri, 28 Nov 2025 14:33:30 +0530 Subject: [PATCH 3/8] Update receiver/kafkareceiver/config.go Co-authored-by: Andrew Wilkins --- receiver/kafkareceiver/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/kafkareceiver/config.go b/receiver/kafkareceiver/config.go index bb44d24e1f25d..0f08e4b94ea5c 100644 --- a/receiver/kafkareceiver/config.go +++ b/receiver/kafkareceiver/config.go @@ -166,7 +166,7 @@ type TopicEncodingConfig struct { // Deprecated [v0.142.0]: Use Topics Topic string `mapstructure:"topic"` - // Topics holds the name of the Kafka topic from which messages of the + // Topics holds the name of the Kafka topics from which messages of the // signal type should be consumed. // // The default depends on the signal type: From 88957f11ff26fdd95c6679855494c030af5c3f15 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Fri, 28 Nov 2025 14:33:38 +0530 Subject: [PATCH 4/8] Update .chloggen/support-arrayof-string.yaml Co-authored-by: Andrew Wilkins --- .chloggen/support-arrayof-string.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.chloggen/support-arrayof-string.yaml b/.chloggen/support-arrayof-string.yaml index ccb36e6806312..dbf62a992de2a 100644 --- a/.chloggen/support-arrayof-string.yaml +++ b/.chloggen/support-arrayof-string.yaml @@ -1,7 +1,7 @@ # Use this changelog template to create an entry for release notes. # One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' -change_type: enhancement +change_type: deprecation # The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) component: receiver/kafka From e875d9723e2328d5ffc663a3677ddb5f73552988 Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Fri, 28 Nov 2025 14:33:45 +0530 Subject: [PATCH 5/8] Update .chloggen/support-arrayof-string.yaml Co-authored-by: Andrew Wilkins --- .chloggen/support-arrayof-string.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.chloggen/support-arrayof-string.yaml b/.chloggen/support-arrayof-string.yaml index dbf62a992de2a..fef4ad7728303 100644 --- a/.chloggen/support-arrayof-string.yaml +++ b/.chloggen/support-arrayof-string.yaml @@ -7,7 +7,7 @@ change_type: deprecation component: receiver/kafka # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: Support configuring a list of topics and exclude_topics +note: Support configuring a list of topics and exclude_topics; deprecate topic and exclude_topic # Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. issues: [44477] From bda0ef64523008391bd1b1445534bec1bce4185c Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Fri, 28 Nov 2025 15:34:42 +0530 Subject: [PATCH 6/8] address review comments --- receiver/kafkareceiver/config.go | 69 ++++++++++++++++----- receiver/kafkareceiver/config_test.go | 33 +--------- receiver/kafkareceiver/factory.go | 4 -- receiver/kafkareceiver/kafka_receiver.go | 2 +- receiver/kafkareceiver/testdata/config.yaml | 64 +++++++------------ 5 files changed, 77 insertions(+), 95 deletions(-) diff --git a/receiver/kafkareceiver/config.go b/receiver/kafkareceiver/config.go index 0f08e4b94ea5c..e625613fb05c1 100644 --- a/receiver/kafkareceiver/config.go +++ b/receiver/kafkareceiver/config.go @@ -61,35 +61,69 @@ func (c *Config) Unmarshal(conf *confmap.Conf) error { } // handle deprecated topic and exclude_topic for log signal - if zeroConfig.Logs.Topic != "" && len(zeroConfig.Logs.Topics) == 0 { - c.Logs.Topics = []string{zeroConfig.Logs.Topic} + if zeroConfig.Logs.Topic != "" { + if len(zeroConfig.Logs.Topics) == 0 { + c.Logs.Topics = []string{zeroConfig.Logs.Topic} + } else { + return fmt.Errorf("both logs.topic and logs.topics cannot be set") + } } - if zeroConfig.Logs.ExcludeTopic != "" && len(zeroConfig.Logs.ExcludeTopics) == 0 { - c.Logs.ExcludeTopics = []string{zeroConfig.Logs.ExcludeTopic} + if zeroConfig.Logs.ExcludeTopic != "" { + if len(zeroConfig.Logs.ExcludeTopics) == 0 { + c.Logs.ExcludeTopics = []string{zeroConfig.Logs.ExcludeTopic} + } else { + return fmt.Errorf("both logs.exclude_topic and logs.exclude_topics cannot be set") + } } // handle deprecated topic and exclude_topic for metric signal - if zeroConfig.Metrics.Topic != "" && len(zeroConfig.Logs.Topics) == 0 { - c.Metrics.Topics = []string{zeroConfig.Metrics.Topic} + if zeroConfig.Metrics.Topic != "" { + if len(zeroConfig.Logs.Topics) == 0 { + c.Metrics.Topics = []string{zeroConfig.Metrics.Topic} + } else { + return fmt.Errorf("both metrics.topic and metrics.topics cannot be set") + } } - if zeroConfig.Metrics.ExcludeTopic != "" && len(zeroConfig.Logs.ExcludeTopics) == 0 { - c.Metrics.ExcludeTopics = []string{zeroConfig.Metrics.ExcludeTopic} + + if zeroConfig.Metrics.ExcludeTopic != "" { + if len(zeroConfig.Logs.ExcludeTopics) == 0 { + c.Metrics.ExcludeTopics = []string{zeroConfig.Metrics.ExcludeTopic} + } else { + return fmt.Errorf("both metrics.exclude_topic and metrics.exclude_topics cannot be set") + } } // handle deprecated topic and exclude_topic for trace signal - if zeroConfig.Traces.Topic != "" && len(zeroConfig.Logs.Topics) == 0 { - c.Traces.Topics = []string{zeroConfig.Traces.Topic} + if zeroConfig.Traces.Topic != "" { + if len(zeroConfig.Logs.Topics) == 0 { + c.Traces.Topics = []string{zeroConfig.Traces.Topic} + } else { + return fmt.Errorf("both traces.topic and traces.topics cannot be set") + } } - if zeroConfig.Traces.ExcludeTopic != "" && len(zeroConfig.Logs.ExcludeTopics) == 0 { - c.Traces.ExcludeTopics = []string{zeroConfig.Traces.ExcludeTopic} + + if zeroConfig.Traces.ExcludeTopic != "" { + if len(zeroConfig.Logs.ExcludeTopics) == 0 { + c.Traces.ExcludeTopics = []string{zeroConfig.Traces.ExcludeTopic} + } else { + return fmt.Errorf("both traces.exclude_topic and traces.exclude_topics cannot be set") + } } // handle deprecated topic and exclude_topic for profile signal - if zeroConfig.Profiles.Topic != "" && len(zeroConfig.Logs.Topics) == 0 { - c.Profiles.Topics = []string{zeroConfig.Profiles.Topic} + if zeroConfig.Profiles.Topic != "" { + if len(zeroConfig.Logs.Topics) == 0 { + c.Profiles.Topics = []string{zeroConfig.Profiles.Topic} + } else { + return fmt.Errorf("both profiles.topic and profiles.topics cannot be set") + } } - if zeroConfig.Profiles.ExcludeTopic != "" && len(zeroConfig.Logs.ExcludeTopics) == 0 { - c.Profiles.ExcludeTopics = []string{zeroConfig.Profiles.ExcludeTopic} + if zeroConfig.Profiles.ExcludeTopic != "" { + if len(zeroConfig.Logs.ExcludeTopics) == 0 { + c.Profiles.ExcludeTopics = []string{zeroConfig.Profiles.ExcludeTopic} + } else { + return fmt.Errorf("both profiles.exclude_topic and profiles.exclude_topics cannot be set") + } } // Set OnPermanentError default value to inherit from OnError for backward compatibility @@ -127,7 +161,7 @@ func (c *Config) Validate() error { return nil } -// validateExcludeTopic checks that exclude_topic is only configured when topic uses regex pattern +// validateExcludeTopic checks that exclude_topic is only configured when topics uses regex pattern func validateExcludeTopic(signalType string, topics, excludeTopics []string) error { if len(excludeTopics) == 0 { return nil // No exclude_topic configured, nothing to validate @@ -138,6 +172,7 @@ func validateExcludeTopic(signalType string, topics, excludeTopics []string) err for _, topic := range topics { if strings.HasPrefix(topic, "^") { usesRegex = true + break } } diff --git a/receiver/kafkareceiver/config_test.go b/receiver/kafkareceiver/config_test.go index c00d1f612aede..5d868c44df021 100644 --- a/receiver/kafkareceiver/config_test.go +++ b/receiver/kafkareceiver/config_test.go @@ -60,8 +60,8 @@ func TestLoadConfig(t *testing.T) { Encoding: "otlp_proto", }, Profiles: TopicEncodingConfig{ - Topic: "otlp_profiles", - Topics: []string{"otlp_profiles"}, + Topic: "legacy_profile", + Topics: []string{"legacy_profile"}, Encoding: "otlp_proto", }, ErrorBackOff: configretry.BackOffConfig{ @@ -99,22 +99,18 @@ func TestLoadConfig(t *testing.T) { return config }(), Logs: TopicEncodingConfig{ - Topic: "legacy_topic_log", Topics: []string{"logs"}, // topics is given precedence if it is set Encoding: "direct", }, Metrics: TopicEncodingConfig{ - Topic: "otlp_metrics", Topics: []string{"otlp_metrics"}, Encoding: "otlp_proto", }, Traces: TopicEncodingConfig{ - Topic: "otlp_spans", Topics: []string{"otlp_spans"}, Encoding: "otlp_proto", }, Profiles: TopicEncodingConfig{ - Topic: "otlp_profiles", Topics: []string{"otlp_profiles"}, Encoding: "otlp_proto", }, @@ -138,22 +134,18 @@ func TestLoadConfig(t *testing.T) { return config }(), Logs: TopicEncodingConfig{ - Topic: "otlp_logs", Topics: []string{"otlp_logs"}, Encoding: "otlp_proto", }, Metrics: TopicEncodingConfig{ - Topic: "otlp_metrics", Topics: []string{"otlp_metrics"}, Encoding: "otlp_proto", }, Traces: TopicEncodingConfig{ - Topic: "otlp_spans", Topics: []string{"otlp_spans"}, Encoding: "otlp_proto", }, Profiles: TopicEncodingConfig{ - Topic: "otlp_profiles", Topics: []string{"otlp_profiles"}, Encoding: "otlp_proto", }, @@ -168,22 +160,18 @@ func TestLoadConfig(t *testing.T) { ClientConfig: configkafka.NewDefaultClientConfig(), ConsumerConfig: configkafka.NewDefaultConsumerConfig(), Logs: TopicEncodingConfig{ - Topic: "otlp_logs", Topics: []string{"otlp_logs"}, Encoding: "otlp_proto", }, Metrics: TopicEncodingConfig{ - Topic: "otlp_metrics", Topics: []string{"otlp_metrics"}, Encoding: "otlp_proto", }, Traces: TopicEncodingConfig{ - Topic: "otlp_spans", Topics: []string{"otlp_spans"}, Encoding: "otlp_proto", }, Profiles: TopicEncodingConfig{ - Topic: "otlp_profiles", Topics: []string{"otlp_profiles"}, Encoding: "otlp_proto", }, @@ -203,22 +191,18 @@ func TestLoadConfig(t *testing.T) { ClientConfig: configkafka.NewDefaultClientConfig(), ConsumerConfig: configkafka.NewDefaultConsumerConfig(), Logs: TopicEncodingConfig{ - Topic: "otlp_logs", Topics: []string{"otlp_logs"}, Encoding: "otlp_proto", }, Metrics: TopicEncodingConfig{ - Topic: "otlp_metrics", Topics: []string{"otlp_metrics"}, Encoding: "otlp_proto", }, Traces: TopicEncodingConfig{ - Topic: "otlp_spans", Topics: []string{"otlp_spans"}, Encoding: "otlp_proto", }, Profiles: TopicEncodingConfig{ - Topic: "otlp_profiles", Topics: []string{"otlp_profiles"}, Encoding: "otlp_proto", }, @@ -238,22 +222,18 @@ func TestLoadConfig(t *testing.T) { ClientConfig: configkafka.NewDefaultClientConfig(), ConsumerConfig: configkafka.NewDefaultConsumerConfig(), Logs: TopicEncodingConfig{ - Topic: "otlp_logs", Topics: []string{"otlp_logs"}, Encoding: "otlp_proto", }, Metrics: TopicEncodingConfig{ - Topic: "otlp_metrics", Topics: []string{"otlp_metrics"}, Encoding: "otlp_proto", }, Traces: TopicEncodingConfig{ - Topic: "otlp_spans", Topics: []string{"otlp_spans"}, Encoding: "otlp_proto", }, Profiles: TopicEncodingConfig{ - Topic: "otlp_profiles", Topics: []string{"otlp_profiles"}, Encoding: "otlp_proto", }, @@ -273,28 +253,21 @@ func TestLoadConfig(t *testing.T) { ClientConfig: configkafka.NewDefaultClientConfig(), ConsumerConfig: configkafka.NewDefaultConsumerConfig(), Logs: TopicEncodingConfig{ - Topic: "^logs-.*", Topics: []string{"^logs-.*"}, - ExcludeTopic: "^logs-(test|dev)$", ExcludeTopics: []string{"^logs-(test|dev)$"}, Encoding: "otlp_proto", }, Metrics: TopicEncodingConfig{ - Topic: "^metrics-.*", Topics: []string{"^metrics-.*"}, - ExcludeTopic: "^metrics-internal-.*$", ExcludeTopics: []string{"^metrics-internal-.*$"}, Encoding: "otlp_proto", }, Traces: TopicEncodingConfig{ - Topic: "^traces-.*", Topics: []string{"^traces-.*"}, - ExcludeTopic: "^traces-debug-.*$", ExcludeTopics: []string{"^traces-debug-.*$"}, Encoding: "otlp_proto", }, Profiles: TopicEncodingConfig{ - Topic: "otlp_profiles", Topics: []string{"otlp_profiles"}, Encoding: "otlp_proto", }, @@ -385,7 +358,7 @@ func TestConfigValidate(t *testing.T) { name: "valid config without exclude_topic", config: &Config{ Logs: TopicEncodingConfig{ - Topic: "logs", + Topics: []string{"logs"}, Encoding: "otlp_proto", }, }, diff --git a/receiver/kafkareceiver/factory.go b/receiver/kafkareceiver/factory.go index 23dbe4a3fe268..450b5ee039c05 100644 --- a/receiver/kafkareceiver/factory.go +++ b/receiver/kafkareceiver/factory.go @@ -47,22 +47,18 @@ func createDefaultConfig() component.Config { ClientConfig: configkafka.NewDefaultClientConfig(), ConsumerConfig: configkafka.NewDefaultConsumerConfig(), Logs: TopicEncodingConfig{ - Topic: defaultLogsTopic, Topics: []string{defaultLogsTopic}, Encoding: defaultLogsEncoding, }, Metrics: TopicEncodingConfig{ - Topic: defaultMetricsTopic, Topics: []string{defaultMetricsTopic}, Encoding: defaultMetricsEncoding, }, Traces: TopicEncodingConfig{ - Topic: defaultTracesTopic, Topics: []string{defaultTracesTopic}, Encoding: defaultTracesEncoding, }, Profiles: TopicEncodingConfig{ - Topic: defaultProfilesTopic, Topics: []string{defaultProfilesTopic}, Encoding: defaultProfilesEncoding, }, diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 618e1fd5dedc5..4f77a51e176aa 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -159,7 +159,7 @@ func newProfilesReceiver(config *Config, set receiver.Settings, nextConsumer xco ) }, nil } - return newReceiver(config, set, []string{config.Profiles.Topic}, []string{config.Profiles.ExcludeTopic}, consumeFn) + return newReceiver(config, set, config.Profiles.Topics, config.Profiles.ExcludeTopics, consumeFn) } func newReceiver( diff --git a/receiver/kafkareceiver/testdata/config.yaml b/receiver/kafkareceiver/testdata/config.yaml index 51c43356a6a78..a915f5c487a5b 100644 --- a/receiver/kafkareceiver/testdata/config.yaml +++ b/receiver/kafkareceiver/testdata/config.yaml @@ -8,9 +8,12 @@ kafka/legacy_topic: logs: topic: legacy_logs encoding: otlp_proto + profiles: + topic: legacy_profile + encoding: otlp_proto + kafka/logs: logs: - topic: legacy_topic_log topics: - logs encoding: direct @@ -41,65 +44,40 @@ kafka/logs: max_interval: 10s max_elapsed_time: 1m multiplier: 1.5 + kafka/rebalance_strategy: - metrics: - topic: otlp_metrics - encoding: otlp_proto - traces: - topic: otlp_spans - encoding: otlp_proto - logs: - topic: otlp_logs - encoding: otlp_proto group_rebalance_strategy: sticky group_instance_id: test-instance + kafka/message_marking: - metrics: - topic: otlp_metrics - encoding: otlp_proto - traces: - topic: otlp_spans - encoding: otlp_proto - logs: - topic: otlp_logs - encoding: otlp_proto message_marking: after: true on_error: true on_permanent_error: false + kafka/message_marking_not_specified: - metrics: - topic: otlp_metrics - encoding: otlp_proto - traces: - topic: otlp_spans - encoding: otlp_proto - logs: - topic: otlp_logs - encoding: otlp_proto + kafka/message_marking_on_permanent_error_inherited: - metrics: - topic: otlp_metrics - encoding: otlp_proto - traces: - topic: otlp_spans - encoding: otlp_proto - logs: - topic: otlp_logs - encoding: otlp_proto message_marking: after: true on_error: true + kafka/regex_topic_with_exclusion: logs: - topic: "^logs-.*" - exclude_topic: "^logs-(test|dev)$" + topics: + - "^logs-.*" + exclude_topics: + - "^logs-(test|dev)$" encoding: otlp_proto metrics: - topic: "^metrics-.*" - exclude_topic: "^metrics-internal-.*$" + topics: + - "^metrics-.*" + exclude_topics: + - "^metrics-internal-.*$" encoding: otlp_proto traces: - topic: "^traces-.*" - exclude_topic: "^traces-debug-.*$" + topics: + - "^traces-.*" + exclude_topics: + - "^traces-debug-.*$" encoding: otlp_proto From 0fdab343e2ab39f835da8aba1d98200d86bd74ab Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Fri, 28 Nov 2025 16:38:00 +0530 Subject: [PATCH 7/8] move to validate --- receiver/kafkareceiver/config.go | 50 ++++++++++++++++-------- receiver/kafkareceiver/config_test.go | 22 +++++++++++ receiver/kafkareceiver/kafka_receiver.go | 29 ++++++++++++++ 3 files changed, 85 insertions(+), 16 deletions(-) diff --git a/receiver/kafkareceiver/config.go b/receiver/kafkareceiver/config.go index e625613fb05c1..0922aca8d372d 100644 --- a/receiver/kafkareceiver/config.go +++ b/receiver/kafkareceiver/config.go @@ -64,15 +64,13 @@ func (c *Config) Unmarshal(conf *confmap.Conf) error { if zeroConfig.Logs.Topic != "" { if len(zeroConfig.Logs.Topics) == 0 { c.Logs.Topics = []string{zeroConfig.Logs.Topic} - } else { - return fmt.Errorf("both logs.topic and logs.topics cannot be set") + c.Logs.Topic = "" } } if zeroConfig.Logs.ExcludeTopic != "" { if len(zeroConfig.Logs.ExcludeTopics) == 0 { c.Logs.ExcludeTopics = []string{zeroConfig.Logs.ExcludeTopic} - } else { - return fmt.Errorf("both logs.exclude_topic and logs.exclude_topics cannot be set") + c.Logs.ExcludeTopic = "" } } @@ -80,16 +78,14 @@ func (c *Config) Unmarshal(conf *confmap.Conf) error { if zeroConfig.Metrics.Topic != "" { if len(zeroConfig.Logs.Topics) == 0 { c.Metrics.Topics = []string{zeroConfig.Metrics.Topic} - } else { - return fmt.Errorf("both metrics.topic and metrics.topics cannot be set") + c.Metrics.Topic = "" } } if zeroConfig.Metrics.ExcludeTopic != "" { if len(zeroConfig.Logs.ExcludeTopics) == 0 { c.Metrics.ExcludeTopics = []string{zeroConfig.Metrics.ExcludeTopic} - } else { - return fmt.Errorf("both metrics.exclude_topic and metrics.exclude_topics cannot be set") + c.Metrics.ExcludeTopic = "" } } @@ -97,16 +93,14 @@ func (c *Config) Unmarshal(conf *confmap.Conf) error { if zeroConfig.Traces.Topic != "" { if len(zeroConfig.Logs.Topics) == 0 { c.Traces.Topics = []string{zeroConfig.Traces.Topic} - } else { - return fmt.Errorf("both traces.topic and traces.topics cannot be set") + c.Traces.Topic = "" } } if zeroConfig.Traces.ExcludeTopic != "" { if len(zeroConfig.Logs.ExcludeTopics) == 0 { c.Traces.ExcludeTopics = []string{zeroConfig.Traces.ExcludeTopic} - } else { - return fmt.Errorf("both traces.exclude_topic and traces.exclude_topics cannot be set") + c.Traces.ExcludeTopic = "" } } @@ -114,15 +108,13 @@ func (c *Config) Unmarshal(conf *confmap.Conf) error { if zeroConfig.Profiles.Topic != "" { if len(zeroConfig.Logs.Topics) == 0 { c.Profiles.Topics = []string{zeroConfig.Profiles.Topic} - } else { - return fmt.Errorf("both profiles.topic and profiles.topics cannot be set") + c.Profiles.Topic = "" } } if zeroConfig.Profiles.ExcludeTopic != "" { if len(zeroConfig.Logs.ExcludeTopics) == 0 { c.Profiles.ExcludeTopics = []string{zeroConfig.Profiles.ExcludeTopic} - } else { - return fmt.Errorf("both profiles.exclude_topic and profiles.exclude_topics cannot be set") + c.Profiles.ExcludeTopic = "" } } @@ -145,6 +137,32 @@ func (c *Config) Unmarshal(conf *confmap.Conf) error { // Validate checks the receiver configuration is valid. func (c *Config) Validate() error { + if c.Logs.Topic != "" && len(c.Logs.Topics) != 0 { + return fmt.Errorf("both logs.topic and logs.topics cannot be set") + } + if c.Metrics.Topic != "" && len(c.Metrics.Topics) != 0 { + return fmt.Errorf("both metrics.topic and metrics.topics cannot be set") + } + if c.Traces.Topic != "" && len(c.Traces.Topics) != 0 { + return fmt.Errorf("both traces.topic and traces.topics cannot be set") + } + if c.Profiles.Topic != "" && len(c.Profiles.Topics) != 0 { + return fmt.Errorf("both profiles.topic and profiles.topics cannot be set") + } + + if c.Logs.ExcludeTopic != "" && len(c.Logs.ExcludeTopics) != 0 { + return fmt.Errorf("both logs.exlude_topic and logs.exlude_topics cannot be set") + } + if c.Metrics.ExcludeTopic != "" && len(c.Metrics.ExcludeTopics) != 0 { + return fmt.Errorf("both metrics.exclude_topic and metrics.exclude_topics cannot be set") + } + if c.Traces.ExcludeTopic != "" && len(c.Traces.ExcludeTopics) != 0 { + return fmt.Errorf("both traces.exclude_topic and traces.exclude_topic cannot be set") + } + if c.Profiles.Topic != "" && len(c.Profiles.Topics) != 0 { + return fmt.Errorf("both profiles.exclude_topic and profiles.exclude_topics cannot be set") + } + // Validate that exclude_topic is only used with regex topic patterns if err := validateExcludeTopic("logs", c.Logs.Topics, c.Logs.ExcludeTopics); err != nil { return err diff --git a/receiver/kafkareceiver/config_test.go b/receiver/kafkareceiver/config_test.go index 5d868c44df021..1a65ba2e0b801 100644 --- a/receiver/kafkareceiver/config_test.go +++ b/receiver/kafkareceiver/config_test.go @@ -343,6 +343,28 @@ func TestConfigValidate(t *testing.T) { }, expectedErr: "traces.exclude_topics is configured but none of the configured traces.topics use regex pattern (must start with '^')", }, + { + name: "invalid config when both topic and topics are set", + config: &Config{ + Logs: TopicEncodingConfig{ + Topic: "legacy_log", + Topics: []string{"logs"}, + Encoding: "otlp_proto", + }, + }, + expectedErr: "both logs.topic and logs.topics cannot be set", + }, + { + name: "invalid config when both exclude_topic and exclude_topics are set", + config: &Config{ + Logs: TopicEncodingConfig{ + ExcludeTopic: "^logs-[invalid(regex", + ExcludeTopics: []string{"^logs-[invalid(regex"}, + Encoding: "otlp_proto", + }, + }, + expectedErr: "both logs.exclude_topic and logs.exclude_topics cannot be set", + }, { name: "invalid config with non-regex topic and exclude_topic for profiles", config: &Config{ diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 4f77a51e176aa..7ff6ef9cb3ddc 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -75,6 +75,12 @@ func newLogsReceiver(config *Config, set receiver.Settings, nextConsumer consume if err != nil { return nil, err } + if config.Logs.Topic != "" { + set.Logger.Warn("logs.topic is deprecated, please use logs.topics instead") + } + if config.Logs.ExcludeTopic != "" { + set.Logger.Warn("logs.exclude_topic is deprecated, please use logs.exclude_topics instead") + } return func(ctx context.Context, message kafkaMessage, attrs attribute.Set) error { return processMessage(ctx, message, config, set.Logger, telBldr, &logsHandler{ @@ -99,6 +105,13 @@ func newMetricsReceiver(config *Config, set receiver.Settings, nextConsumer cons if err != nil { return nil, err } + if config.Metrics.Topic != "" { + set.Logger.Warn("metrics.topic is deprecated, please use metrics.topics instead") + } + if config.Metrics.ExcludeTopic != "" { + set.Logger.Warn("metrics.exclude_topic is deprecated, please use metrics.exclude_topics instead") + } + return func(ctx context.Context, message kafkaMessage, attrs attribute.Set) error { return processMessage(ctx, message, config, set.Logger, telBldr, &metricsHandler{ @@ -123,7 +136,16 @@ func newTracesReceiver(config *Config, set receiver.Settings, nextConsumer consu if err != nil { return nil, err } + + if config.Traces.Topic != "" { + set.Logger.Warn("traces.topic is deprecated, please use traces.topics instead") + } + if config.Traces.ExcludeTopic != "" { + set.Logger.Warn("traces.exclude_topic is deprecated, please use traces.exclude_topics instead") + } + return func(ctx context.Context, message kafkaMessage, attrs attribute.Set) error { + return processMessage(ctx, message, config, set.Logger, telBldr, &tracesHandler{ unmarshaler: unmarshaler, @@ -147,6 +169,13 @@ func newProfilesReceiver(config *Config, set receiver.Settings, nextConsumer xco if err != nil { return nil, err } + if config.Profiles.Topic != "" { + set.Logger.Warn("profiles.topic is deprecated, please use profiles.topics instead") + } + if config.Profiles.ExcludeTopic != "" { + set.Logger.Warn("profiles.exclude_topic is deprecated, please use profiles.exclude_topics instead") + } + return func(ctx context.Context, message kafkaMessage, attrs attribute.Set) error { return processMessage(ctx, message, config, set.Logger, telBldr, &profilesHandler{ From add9ed8ec7ebc84594eb4a0f7483f008a35838ca Mon Sep 17 00:00:00 2001 From: Khushi Jain Date: Fri, 28 Nov 2025 16:44:52 +0530 Subject: [PATCH 8/8] fix test --- receiver/kafkareceiver/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/receiver/kafkareceiver/config.go b/receiver/kafkareceiver/config.go index 0922aca8d372d..e0d7382b6b275 100644 --- a/receiver/kafkareceiver/config.go +++ b/receiver/kafkareceiver/config.go @@ -151,7 +151,7 @@ func (c *Config) Validate() error { } if c.Logs.ExcludeTopic != "" && len(c.Logs.ExcludeTopics) != 0 { - return fmt.Errorf("both logs.exlude_topic and logs.exlude_topics cannot be set") + return fmt.Errorf("both logs.exclude_topic and logs.exclude_topics cannot be set") } if c.Metrics.ExcludeTopic != "" && len(c.Metrics.ExcludeTopics) != 0 { return fmt.Errorf("both metrics.exclude_topic and metrics.exclude_topics cannot be set")