diff --git a/.chloggen/support-arrayof-string.yaml b/.chloggen/support-arrayof-string.yaml new file mode 100644 index 000000000000..fef4ad772830 --- /dev/null +++ b/.chloggen/support-arrayof-string.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: deprecation + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: receiver/kafka + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Support configuring a list of topics and exclude_topics; deprecate topic and exclude_topic + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [44477] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index aba804a278a2..11f8d9f8ca26 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -42,25 +42,37 @@ The following settings can be optionally configured: - `protocol_version` (default = 2.1.0): Kafka protocol version. - `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup - `logs` - - `topic` (default = otlp\_logs): The name of the Kafka topic from which to consume logs. + - `topic` (Deprecated [v0.142.0]: use `topics`) + (default = otlp\logs): If this is set, it will take precedence over default value of `topics` + - `topics` (default = otlp\_logs): List of kafka topics from which to consume logs - `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings). - - `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex. + - `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`) + (default = ""): If this is set, it will take precedence over default value of `exclude_topics` + - `exclude_topics` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex. - `metrics` - - `topic` (default = otlp\_metrics): The name of the Kafka topic from which to consume metrics. + - `topic` (Deprecated [v0.142.0]: use `topics`) + (default = otlp\_metrics): If this is set, it will take precedence over default value of `topics` + - `topics` (default = otlp\_metrics): List of Kafka topic from which to consume metrics. - `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings). - - `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex. + - `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`) + (default = ""): If this is set, it will take precedence over default value of `exclude_topics` + - `exclude_topics` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex. - `traces` - - `topic` (default = otlp\_spans): The name of the Kafka topic from which to consume traces. + - `topic` (Deprecated [v0.142.0]: use `topics`) + (default = otlp\_spans): If this is set, it will take precedence over default value of `topics` + - `topics` (default = otlp\_spans): List of Kafka topic from which to consume traces. - `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings). - - `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex. + - `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`) + (default = ""): If this is set, it will take precedence over default value of `exclude_topics` + - `exclude_topics` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex. - `profiles` - - `topic` (default = otlp\_profiles): The name of the Kafka topic from which to consume profiles. + - `topic` (Deprecated [v0.142.0]: use `topics`) + (default = otlp\_profiles): If this is set, it will take precedence over default value of `topics` + - `topics` (default = otlp\_profiles): List of Kafka topic from which to consume profiles. - `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings). - - `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex. -- `topic` (Deprecated [v0.124.0]: use `logs::topic`, `traces::topic`, or `metrics::topic`). - If this is set, it will take precedence over the default value for those fields. -- `encoding` (Deprecated [v0.124.0]: use `logs::encoding`, `traces::encoding`, or `metrics::encoding`). - If this is set, it will take precedence over the default value for those fields. + - `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`) + (default = ""): If this is set, it will take precedence over default value of `exclude_topics` + - `exclude_topics` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex. - `group_id` (default = otel-collector): The consumer group that receiver will be consuming messages from - `client_id` (default = otel-collector): The consumer client ID that receiver will use - `rack_id` (default = ""): The rack identifier for this client. When set and brokers are configured with a rack-aware replica selector, the client will prefer fetching from the closest replica. diff --git a/receiver/kafkareceiver/config.go b/receiver/kafkareceiver/config.go index eed203ef6845..e0d7382b6b27 100644 --- a/receiver/kafkareceiver/config.go +++ b/receiver/kafkareceiver/config.go @@ -53,6 +53,71 @@ func (c *Config) Unmarshal(conf *confmap.Conf) error { return err } + // Check if deprecated fields have been explicitly set + // give them precedence + var zeroConfig Config + if err := conf.Unmarshal(&zeroConfig); err != nil { + return err + } + + // handle deprecated topic and exclude_topic for log signal + if zeroConfig.Logs.Topic != "" { + if len(zeroConfig.Logs.Topics) == 0 { + c.Logs.Topics = []string{zeroConfig.Logs.Topic} + c.Logs.Topic = "" + } + } + if zeroConfig.Logs.ExcludeTopic != "" { + if len(zeroConfig.Logs.ExcludeTopics) == 0 { + c.Logs.ExcludeTopics = []string{zeroConfig.Logs.ExcludeTopic} + c.Logs.ExcludeTopic = "" + } + } + + // handle deprecated topic and exclude_topic for metric signal + if zeroConfig.Metrics.Topic != "" { + if len(zeroConfig.Logs.Topics) == 0 { + c.Metrics.Topics = []string{zeroConfig.Metrics.Topic} + c.Metrics.Topic = "" + } + } + + if zeroConfig.Metrics.ExcludeTopic != "" { + if len(zeroConfig.Logs.ExcludeTopics) == 0 { + c.Metrics.ExcludeTopics = []string{zeroConfig.Metrics.ExcludeTopic} + c.Metrics.ExcludeTopic = "" + } + } + + // handle deprecated topic and exclude_topic for trace signal + if zeroConfig.Traces.Topic != "" { + if len(zeroConfig.Logs.Topics) == 0 { + c.Traces.Topics = []string{zeroConfig.Traces.Topic} + c.Traces.Topic = "" + } + } + + if zeroConfig.Traces.ExcludeTopic != "" { + if len(zeroConfig.Logs.ExcludeTopics) == 0 { + c.Traces.ExcludeTopics = []string{zeroConfig.Traces.ExcludeTopic} + c.Traces.ExcludeTopic = "" + } + } + + // handle deprecated topic and exclude_topic for profile signal + if zeroConfig.Profiles.Topic != "" { + if len(zeroConfig.Logs.Topics) == 0 { + c.Profiles.Topics = []string{zeroConfig.Profiles.Topic} + c.Profiles.Topic = "" + } + } + if zeroConfig.Profiles.ExcludeTopic != "" { + if len(zeroConfig.Logs.ExcludeTopics) == 0 { + c.Profiles.ExcludeTopics = []string{zeroConfig.Profiles.ExcludeTopic} + c.Profiles.ExcludeTopic = "" + } + } + // Set OnPermanentError default value to inherit from OnError for backward compatibility // Only if OnPermanentError was not explicitly set in the config rawConf := conf.Get("message_marking") @@ -72,46 +137,89 @@ func (c *Config) Unmarshal(conf *confmap.Conf) error { // Validate checks the receiver configuration is valid. func (c *Config) Validate() error { + if c.Logs.Topic != "" && len(c.Logs.Topics) != 0 { + return fmt.Errorf("both logs.topic and logs.topics cannot be set") + } + if c.Metrics.Topic != "" && len(c.Metrics.Topics) != 0 { + return fmt.Errorf("both metrics.topic and metrics.topics cannot be set") + } + if c.Traces.Topic != "" && len(c.Traces.Topics) != 0 { + return fmt.Errorf("both traces.topic and traces.topics cannot be set") + } + if c.Profiles.Topic != "" && len(c.Profiles.Topics) != 0 { + return fmt.Errorf("both profiles.topic and profiles.topics cannot be set") + } + + if c.Logs.ExcludeTopic != "" && len(c.Logs.ExcludeTopics) != 0 { + return fmt.Errorf("both logs.exclude_topic and logs.exclude_topics cannot be set") + } + if c.Metrics.ExcludeTopic != "" && len(c.Metrics.ExcludeTopics) != 0 { + return fmt.Errorf("both metrics.exclude_topic and metrics.exclude_topics cannot be set") + } + if c.Traces.ExcludeTopic != "" && len(c.Traces.ExcludeTopics) != 0 { + return fmt.Errorf("both traces.exclude_topic and traces.exclude_topic cannot be set") + } + if c.Profiles.Topic != "" && len(c.Profiles.Topics) != 0 { + return fmt.Errorf("both profiles.exclude_topic and profiles.exclude_topics cannot be set") + } + // Validate that exclude_topic is only used with regex topic patterns - if err := validateExcludeTopic("logs", c.Logs.Topic, c.Logs.ExcludeTopic); err != nil { + if err := validateExcludeTopic("logs", c.Logs.Topics, c.Logs.ExcludeTopics); err != nil { return err } - if err := validateExcludeTopic("metrics", c.Metrics.Topic, c.Metrics.ExcludeTopic); err != nil { + if err := validateExcludeTopic("metrics", c.Metrics.Topics, c.Metrics.ExcludeTopics); err != nil { return err } - if err := validateExcludeTopic("traces", c.Traces.Topic, c.Traces.ExcludeTopic); err != nil { + if err := validateExcludeTopic("traces", c.Traces.Topics, c.Traces.ExcludeTopics); err != nil { return err } - if err := validateExcludeTopic("profiles", c.Profiles.Topic, c.Profiles.ExcludeTopic); err != nil { + if err := validateExcludeTopic("profiles", c.Profiles.Topics, c.Profiles.ExcludeTopics); err != nil { return err } return nil } -// validateExcludeTopic checks that exclude_topic is only configured when topic uses regex pattern -func validateExcludeTopic(signalType, topic, excludeTopic string) error { - if excludeTopic == "" { +// validateExcludeTopic checks that exclude_topic is only configured when topics uses regex pattern +func validateExcludeTopic(signalType string, topics, excludeTopics []string) error { + if len(excludeTopics) == 0 { return nil // No exclude_topic configured, nothing to validate } - if !strings.HasPrefix(topic, "^") { + + // if none of the configured topic uses regex return error + var usesRegex bool + for _, topic := range topics { + if strings.HasPrefix(topic, "^") { + usesRegex = true + break + } + } + + if !usesRegex { return fmt.Errorf( - "%s.exclude_topic is configured but %s.topic does not use regex pattern (must start with '^')", + "%s.exclude_topics is configured but none of the configured %s.topics use regex pattern (must start with '^')", signalType, signalType, ) } - // Validate that exclude_topic is a valid regex pattern - if _, err := regexp.Compile(excludeTopic); err != nil { - return fmt.Errorf( - "%s.exclude_topic contains invalid regex pattern: %w", - signalType, err, - ) + + for _, excludeTopic := range excludeTopics { + // Validate that exclude_topic is a valid regex pattern + if _, err := regexp.Compile(excludeTopic); err != nil { + return fmt.Errorf( + "%s.exclude_topic contains invalid regex pattern: %w", + signalType, err, + ) + } } + return nil } // TopicEncodingConfig holds signal-specific topic and encoding configuration. type TopicEncodingConfig struct { - // Topic holds the name of the Kafka topic from which messages of the + // Deprecated [v0.142.0]: Use Topics + Topic string `mapstructure:"topic"` + + // Topics holds the name of the Kafka topics from which messages of the // signal type should be consumed. // // The default depends on the signal type: @@ -119,15 +227,18 @@ type TopicEncodingConfig struct { // - "otlp_metrics" for metrics // - "otlp_logs" for logs // - "otlp_profiles" for profiles - Topic string `mapstructure:"topic"` + Topics []string `mapstructure:"topics"` // Encoding holds the expected encoding of messages for the signal type // // Defaults to "otlp_proto". Encoding string `mapstructure:"encoding"` - // Optional exclude topic option, used only in regex mode. + // Deprecated [v0.142.0]: Use ExcludeTopics ExcludeTopic string `mapstructure:"exclude_topic"` + + // Optional exclude topics option, used only in regex mode. + ExcludeTopics []string `mapstructure:"exclude_topics"` } type MessageMarking struct { diff --git a/receiver/kafkareceiver/config_test.go b/receiver/kafkareceiver/config_test.go index 1493fea1eac2..1a65ba2e0b80 100644 --- a/receiver/kafkareceiver/config_test.go +++ b/receiver/kafkareceiver/config_test.go @@ -31,6 +31,44 @@ func TestLoadConfig(t *testing.T) { expected component.Config expectedErr error }{ + { + id: component.NewIDWithName(metadata.Type, "legacy_topic"), + expected: &Config{ + ClientConfig: func() configkafka.ClientConfig { + config := configkafka.NewDefaultClientConfig() + return config + }(), + ConsumerConfig: func() configkafka.ConsumerConfig { + config := configkafka.NewDefaultConsumerConfig() + return config + }(), + Logs: TopicEncodingConfig{ + // if deprecated topic is set and topics is not set + // give precedence to topic + Topic: "legacy_logs", + Topics: []string{"legacy_logs"}, + Encoding: "otlp_proto", + }, + Metrics: TopicEncodingConfig{ + Topic: "legacy_metric", + Topics: []string{"legacy_metric"}, + Encoding: "otlp_proto", + }, + Traces: TopicEncodingConfig{ + Topic: "legacy_spans", + Topics: []string{"legacy_spans"}, + Encoding: "otlp_proto", + }, + Profiles: TopicEncodingConfig{ + Topic: "legacy_profile", + Topics: []string{"legacy_profile"}, + Encoding: "otlp_proto", + }, + ErrorBackOff: configretry.BackOffConfig{ + Enabled: false, + }, + }, + }, { id: component.NewIDWithName(metadata.Type, "logs"), expected: &Config{ @@ -61,19 +99,19 @@ func TestLoadConfig(t *testing.T) { return config }(), Logs: TopicEncodingConfig{ - Topic: "logs", + Topics: []string{"logs"}, // topics is given precedence if it is set Encoding: "direct", }, Metrics: TopicEncodingConfig{ - Topic: "otlp_metrics", + Topics: []string{"otlp_metrics"}, Encoding: "otlp_proto", }, Traces: TopicEncodingConfig{ - Topic: "otlp_spans", + Topics: []string{"otlp_spans"}, Encoding: "otlp_proto", }, Profiles: TopicEncodingConfig{ - Topic: "otlp_profiles", + Topics: []string{"otlp_profiles"}, Encoding: "otlp_proto", }, ErrorBackOff: configretry.BackOffConfig{ @@ -96,19 +134,19 @@ func TestLoadConfig(t *testing.T) { return config }(), Logs: TopicEncodingConfig{ - Topic: "otlp_logs", + Topics: []string{"otlp_logs"}, Encoding: "otlp_proto", }, Metrics: TopicEncodingConfig{ - Topic: "otlp_metrics", + Topics: []string{"otlp_metrics"}, Encoding: "otlp_proto", }, Traces: TopicEncodingConfig{ - Topic: "otlp_spans", + Topics: []string{"otlp_spans"}, Encoding: "otlp_proto", }, Profiles: TopicEncodingConfig{ - Topic: "otlp_profiles", + Topics: []string{"otlp_profiles"}, Encoding: "otlp_proto", }, ErrorBackOff: configretry.BackOffConfig{ @@ -122,19 +160,19 @@ func TestLoadConfig(t *testing.T) { ClientConfig: configkafka.NewDefaultClientConfig(), ConsumerConfig: configkafka.NewDefaultConsumerConfig(), Logs: TopicEncodingConfig{ - Topic: "otlp_logs", + Topics: []string{"otlp_logs"}, Encoding: "otlp_proto", }, Metrics: TopicEncodingConfig{ - Topic: "otlp_metrics", + Topics: []string{"otlp_metrics"}, Encoding: "otlp_proto", }, Traces: TopicEncodingConfig{ - Topic: "otlp_spans", + Topics: []string{"otlp_spans"}, Encoding: "otlp_proto", }, Profiles: TopicEncodingConfig{ - Topic: "otlp_profiles", + Topics: []string{"otlp_profiles"}, Encoding: "otlp_proto", }, MessageMarking: MessageMarking{ @@ -153,19 +191,19 @@ func TestLoadConfig(t *testing.T) { ClientConfig: configkafka.NewDefaultClientConfig(), ConsumerConfig: configkafka.NewDefaultConsumerConfig(), Logs: TopicEncodingConfig{ - Topic: "otlp_logs", + Topics: []string{"otlp_logs"}, Encoding: "otlp_proto", }, Metrics: TopicEncodingConfig{ - Topic: "otlp_metrics", + Topics: []string{"otlp_metrics"}, Encoding: "otlp_proto", }, Traces: TopicEncodingConfig{ - Topic: "otlp_spans", + Topics: []string{"otlp_spans"}, Encoding: "otlp_proto", }, Profiles: TopicEncodingConfig{ - Topic: "otlp_profiles", + Topics: []string{"otlp_profiles"}, Encoding: "otlp_proto", }, MessageMarking: MessageMarking{ @@ -184,19 +222,19 @@ func TestLoadConfig(t *testing.T) { ClientConfig: configkafka.NewDefaultClientConfig(), ConsumerConfig: configkafka.NewDefaultConsumerConfig(), Logs: TopicEncodingConfig{ - Topic: "otlp_logs", + Topics: []string{"otlp_logs"}, Encoding: "otlp_proto", }, Metrics: TopicEncodingConfig{ - Topic: "otlp_metrics", + Topics: []string{"otlp_metrics"}, Encoding: "otlp_proto", }, Traces: TopicEncodingConfig{ - Topic: "otlp_spans", + Topics: []string{"otlp_spans"}, Encoding: "otlp_proto", }, Profiles: TopicEncodingConfig{ - Topic: "otlp_profiles", + Topics: []string{"otlp_profiles"}, Encoding: "otlp_proto", }, MessageMarking: MessageMarking{ @@ -215,22 +253,22 @@ func TestLoadConfig(t *testing.T) { ClientConfig: configkafka.NewDefaultClientConfig(), ConsumerConfig: configkafka.NewDefaultConsumerConfig(), Logs: TopicEncodingConfig{ - Topic: "^logs-.*", - ExcludeTopic: "^logs-(test|dev)$", - Encoding: "otlp_proto", + Topics: []string{"^logs-.*"}, + ExcludeTopics: []string{"^logs-(test|dev)$"}, + Encoding: "otlp_proto", }, Metrics: TopicEncodingConfig{ - Topic: "^metrics-.*", - ExcludeTopic: "^metrics-internal-.*$", - Encoding: "otlp_proto", + Topics: []string{"^metrics-.*"}, + ExcludeTopics: []string{"^metrics-internal-.*$"}, + Encoding: "otlp_proto", }, Traces: TopicEncodingConfig{ - Topic: "^traces-.*", - ExcludeTopic: "^traces-debug-.*$", - Encoding: "otlp_proto", + Topics: []string{"^traces-.*"}, + ExcludeTopics: []string{"^traces-debug-.*$"}, + Encoding: "otlp_proto", }, Profiles: TopicEncodingConfig{ - Topic: "otlp_profiles", + Topics: []string{"otlp_profiles"}, Encoding: "otlp_proto", }, ErrorBackOff: configretry.BackOffConfig{ @@ -265,9 +303,9 @@ func TestConfigValidate(t *testing.T) { name: "valid config with regex and exclude_topic", config: &Config{ Logs: TopicEncodingConfig{ - Topic: "^logs-.*", - ExcludeTopic: "^logs-test$", - Encoding: "otlp_proto", + Topics: []string{"^logs-.*"}, + ExcludeTopics: []string{"^logs-test$"}, + Encoding: "otlp_proto", }, }, expectedErr: "", @@ -276,51 +314,73 @@ func TestConfigValidate(t *testing.T) { name: "invalid config with non-regex topic and exclude_topic for logs", config: &Config{ Logs: TopicEncodingConfig{ - Topic: "logs", - ExcludeTopic: "^logs-test$", - Encoding: "otlp_proto", + Topics: []string{"logs"}, + ExcludeTopics: []string{"^logs-test$"}, + Encoding: "otlp_proto", }, }, - expectedErr: "logs.exclude_topic is configured but logs.topic does not use regex pattern (must start with '^')", + expectedErr: "logs.exclude_topics is configured but none of the configured logs.topics use regex pattern (must start with '^')", }, { name: "invalid config with non-regex topic and exclude_topic for metrics", config: &Config{ Metrics: TopicEncodingConfig{ - Topic: "metrics", - ExcludeTopic: "^metrics-test$", - Encoding: "otlp_proto", + Topics: []string{"metrics"}, + ExcludeTopics: []string{"^metrics-test$"}, + Encoding: "otlp_proto", }, }, - expectedErr: "metrics.exclude_topic is configured but metrics.topic does not use regex pattern (must start with '^')", + expectedErr: "metrics.exclude_topics is configured but none of the configured metrics.topics use regex pattern (must start with '^')", }, { name: "invalid config with non-regex topic and exclude_topic for traces", config: &Config{ Traces: TopicEncodingConfig{ - Topic: "traces", - ExcludeTopic: "^traces-test$", - Encoding: "otlp_proto", + Topics: []string{"traces"}, + ExcludeTopics: []string{"^traces-test$"}, + Encoding: "otlp_proto", + }, + }, + expectedErr: "traces.exclude_topics is configured but none of the configured traces.topics use regex pattern (must start with '^')", + }, + { + name: "invalid config when both topic and topics are set", + config: &Config{ + Logs: TopicEncodingConfig{ + Topic: "legacy_log", + Topics: []string{"logs"}, + Encoding: "otlp_proto", + }, + }, + expectedErr: "both logs.topic and logs.topics cannot be set", + }, + { + name: "invalid config when both exclude_topic and exclude_topics are set", + config: &Config{ + Logs: TopicEncodingConfig{ + ExcludeTopic: "^logs-[invalid(regex", + ExcludeTopics: []string{"^logs-[invalid(regex"}, + Encoding: "otlp_proto", }, }, - expectedErr: "traces.exclude_topic is configured but traces.topic does not use regex pattern (must start with '^')", + expectedErr: "both logs.exclude_topic and logs.exclude_topics cannot be set", }, { name: "invalid config with non-regex topic and exclude_topic for profiles", config: &Config{ Profiles: TopicEncodingConfig{ - Topic: "profiles", - ExcludeTopic: "^profiles-test$", - Encoding: "otlp_proto", + Topics: []string{"profiles"}, + ExcludeTopics: []string{"^profiles-test$"}, + Encoding: "otlp_proto", }, }, - expectedErr: "profiles.exclude_topic is configured but profiles.topic does not use regex pattern (must start with '^')", + expectedErr: "profiles.exclude_topics is configured but none of the configured profiles.topics use regex pattern (must start with '^')", }, { name: "valid config without exclude_topic", config: &Config{ Logs: TopicEncodingConfig{ - Topic: "logs", + Topics: []string{"logs"}, Encoding: "otlp_proto", }, }, @@ -330,9 +390,9 @@ func TestConfigValidate(t *testing.T) { name: "invalid config with invalid regex in exclude_topic", config: &Config{ Logs: TopicEncodingConfig{ - Topic: "^logs-.*", - ExcludeTopic: "^logs-[invalid(regex", - Encoding: "otlp_proto", + Topics: []string{"^logs-.*"}, + ExcludeTopics: []string{"^logs-[invalid(regex"}, + Encoding: "otlp_proto", }, }, expectedErr: "logs.exclude_topic contains invalid regex pattern", diff --git a/receiver/kafkareceiver/consumer_bench_test.go b/receiver/kafkareceiver/consumer_bench_test.go index e53231621a79..72144dc9c1e4 100644 --- a/receiver/kafkareceiver/consumer_bench_test.go +++ b/receiver/kafkareceiver/consumer_bench_test.go @@ -73,9 +73,9 @@ func newBenchConfigClient(b *testing.B, topic string, partitions int32, messageMarking MessageMarking, ) (*Config, *kgo.Client) { client, cfg := mustNewFakeCluster(b, kfake.SeedTopics(partitions, topic)) - cfg.Logs.Topic = topic - cfg.Traces.Topic = topic - cfg.Metrics.Topic = topic + cfg.Logs.Topics = []string{topic} + cfg.Traces.Topics = []string{topic} + cfg.Metrics.Topics = []string{topic} cfg.GroupID = b.Name() cfg.InitialOffset = "earliest" cfg.AutoCommit = autoCommit diff --git a/receiver/kafkareceiver/factory.go b/receiver/kafkareceiver/factory.go index f6b2246c2fb3..450b5ee039c0 100644 --- a/receiver/kafkareceiver/factory.go +++ b/receiver/kafkareceiver/factory.go @@ -47,19 +47,19 @@ func createDefaultConfig() component.Config { ClientConfig: configkafka.NewDefaultClientConfig(), ConsumerConfig: configkafka.NewDefaultConsumerConfig(), Logs: TopicEncodingConfig{ - Topic: defaultLogsTopic, + Topics: []string{defaultLogsTopic}, Encoding: defaultLogsEncoding, }, Metrics: TopicEncodingConfig{ - Topic: defaultMetricsTopic, + Topics: []string{defaultMetricsTopic}, Encoding: defaultMetricsEncoding, }, Traces: TopicEncodingConfig{ - Topic: defaultTracesTopic, + Topics: []string{defaultTracesTopic}, Encoding: defaultTracesEncoding, }, Profiles: TopicEncodingConfig{ - Topic: defaultProfilesTopic, + Topics: []string{defaultProfilesTopic}, Encoding: defaultProfilesEncoding, }, MessageMarking: MessageMarking{ diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 7a315d79d5a6..7ff6ef9cb3dd 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -75,6 +75,12 @@ func newLogsReceiver(config *Config, set receiver.Settings, nextConsumer consume if err != nil { return nil, err } + if config.Logs.Topic != "" { + set.Logger.Warn("logs.topic is deprecated, please use logs.topics instead") + } + if config.Logs.ExcludeTopic != "" { + set.Logger.Warn("logs.exclude_topic is deprecated, please use logs.exclude_topics instead") + } return func(ctx context.Context, message kafkaMessage, attrs attribute.Set) error { return processMessage(ctx, message, config, set.Logger, telBldr, &logsHandler{ @@ -87,7 +93,7 @@ func newLogsReceiver(config *Config, set receiver.Settings, nextConsumer consume ) }, nil } - return newReceiver(config, set, []string{config.Logs.Topic}, []string{config.Logs.ExcludeTopic}, newConsumeMessageFunc) + return newReceiver(config, set, config.Logs.Topics, config.Logs.ExcludeTopics, newConsumeMessageFunc) } func newMetricsReceiver(config *Config, set receiver.Settings, nextConsumer consumer.Metrics) (receiver.Metrics, error) { @@ -99,6 +105,13 @@ func newMetricsReceiver(config *Config, set receiver.Settings, nextConsumer cons if err != nil { return nil, err } + if config.Metrics.Topic != "" { + set.Logger.Warn("metrics.topic is deprecated, please use metrics.topics instead") + } + if config.Metrics.ExcludeTopic != "" { + set.Logger.Warn("metrics.exclude_topic is deprecated, please use metrics.exclude_topics instead") + } + return func(ctx context.Context, message kafkaMessage, attrs attribute.Set) error { return processMessage(ctx, message, config, set.Logger, telBldr, &metricsHandler{ @@ -111,7 +124,7 @@ func newMetricsReceiver(config *Config, set receiver.Settings, nextConsumer cons ) }, nil } - return newReceiver(config, set, []string{config.Metrics.Topic}, []string{config.Metrics.ExcludeTopic}, newConsumeMessageFunc) + return newReceiver(config, set, config.Metrics.Topics, config.Metrics.ExcludeTopics, newConsumeMessageFunc) } func newTracesReceiver(config *Config, set receiver.Settings, nextConsumer consumer.Traces) (receiver.Traces, error) { @@ -123,7 +136,16 @@ func newTracesReceiver(config *Config, set receiver.Settings, nextConsumer consu if err != nil { return nil, err } + + if config.Traces.Topic != "" { + set.Logger.Warn("traces.topic is deprecated, please use traces.topics instead") + } + if config.Traces.ExcludeTopic != "" { + set.Logger.Warn("traces.exclude_topic is deprecated, please use traces.exclude_topics instead") + } + return func(ctx context.Context, message kafkaMessage, attrs attribute.Set) error { + return processMessage(ctx, message, config, set.Logger, telBldr, &tracesHandler{ unmarshaler: unmarshaler, @@ -135,7 +157,7 @@ func newTracesReceiver(config *Config, set receiver.Settings, nextConsumer consu ) }, nil } - return newReceiver(config, set, []string{config.Traces.Topic}, []string{config.Traces.ExcludeTopic}, consumeFn) + return newReceiver(config, set, config.Traces.Topics, config.Traces.ExcludeTopics, consumeFn) } func newProfilesReceiver(config *Config, set receiver.Settings, nextConsumer xconsumer.Profiles) (xreceiver.Profiles, error) { @@ -147,6 +169,13 @@ func newProfilesReceiver(config *Config, set receiver.Settings, nextConsumer xco if err != nil { return nil, err } + if config.Profiles.Topic != "" { + set.Logger.Warn("profiles.topic is deprecated, please use profiles.topics instead") + } + if config.Profiles.ExcludeTopic != "" { + set.Logger.Warn("profiles.exclude_topic is deprecated, please use profiles.exclude_topics instead") + } + return func(ctx context.Context, message kafkaMessage, attrs attribute.Set) error { return processMessage(ctx, message, config, set.Logger, telBldr, &profilesHandler{ @@ -159,7 +188,7 @@ func newProfilesReceiver(config *Config, set receiver.Settings, nextConsumer xco ) }, nil } - return newReceiver(config, set, []string{config.Profiles.Topic}, []string{config.Profiles.ExcludeTopic}, consumeFn) + return newReceiver(config, set, config.Profiles.Topics, config.Profiles.ExcludeTopics, consumeFn) } func newReceiver( diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go index d0c4c9f099a3..c0ac10c77963 100644 --- a/receiver/kafkareceiver/kafka_receiver_test.go +++ b/receiver/kafkareceiver/kafka_receiver_test.go @@ -766,8 +766,8 @@ func TestExcludeTopicWithSarama(t *testing.T) { _, receiverConfig := mustNewFakeCluster(t, kfake.SeedTopics(1, "otlp_spans")) // Configure exclude_topic - only supported with franz-go - receiverConfig.Traces.Topic = "^otlp_spans.*" - receiverConfig.Traces.ExcludeTopic = "^otlp_spans-test$" + receiverConfig.Traces.Topics = []string{"^otlp_spans.*"} + receiverConfig.Traces.ExcludeTopics = []string{"^otlp_spans-test$"} set, _, _ := mustNewSettings(t) _, err := newTracesReceiver(receiverConfig, set, &consumertest.TracesSink{}) diff --git a/receiver/kafkareceiver/testdata/config.yaml b/receiver/kafkareceiver/testdata/config.yaml index 765b6bcbb4ba..a915f5c487a5 100644 --- a/receiver/kafkareceiver/testdata/config.yaml +++ b/receiver/kafkareceiver/testdata/config.yaml @@ -1,6 +1,21 @@ +kafka/legacy_topic: + metrics: + topic: legacy_metric + encoding: otlp_proto + traces: + topic: legacy_spans + encoding: otlp_proto + logs: + topic: legacy_logs + encoding: otlp_proto + profiles: + topic: legacy_profile + encoding: otlp_proto + kafka/logs: logs: - topic: logs + topics: + - logs encoding: direct session_timeout: 45s heartbeat_interval: 15s @@ -29,65 +44,40 @@ kafka/logs: max_interval: 10s max_elapsed_time: 1m multiplier: 1.5 + kafka/rebalance_strategy: - metrics: - topic: otlp_metrics - encoding: otlp_proto - traces: - topic: otlp_spans - encoding: otlp_proto - logs: - topic: otlp_logs - encoding: otlp_proto group_rebalance_strategy: sticky group_instance_id: test-instance + kafka/message_marking: - metrics: - topic: otlp_metrics - encoding: otlp_proto - traces: - topic: otlp_spans - encoding: otlp_proto - logs: - topic: otlp_logs - encoding: otlp_proto message_marking: after: true on_error: true on_permanent_error: false + kafka/message_marking_not_specified: - metrics: - topic: otlp_metrics - encoding: otlp_proto - traces: - topic: otlp_spans - encoding: otlp_proto - logs: - topic: otlp_logs - encoding: otlp_proto + kafka/message_marking_on_permanent_error_inherited: - metrics: - topic: otlp_metrics - encoding: otlp_proto - traces: - topic: otlp_spans - encoding: otlp_proto - logs: - topic: otlp_logs - encoding: otlp_proto message_marking: after: true on_error: true + kafka/regex_topic_with_exclusion: logs: - topic: "^logs-.*" - exclude_topic: "^logs-(test|dev)$" + topics: + - "^logs-.*" + exclude_topics: + - "^logs-(test|dev)$" encoding: otlp_proto metrics: - topic: "^metrics-.*" - exclude_topic: "^metrics-internal-.*$" + topics: + - "^metrics-.*" + exclude_topics: + - "^metrics-internal-.*$" encoding: otlp_proto traces: - topic: "^traces-.*" - exclude_topic: "^traces-debug-.*$" + topics: + - "^traces-.*" + exclude_topics: + - "^traces-debug-.*$" encoding: otlp_proto