Skip to content
27 changes: 27 additions & 0 deletions .chloggen/support-arrayof-string.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: receiver/kafka

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support configuring a list of topics and exclude_topics

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [44477]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
36 changes: 24 additions & 12 deletions receiver/kafkareceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,37 @@ The following settings can be optionally configured:
- `protocol_version` (default = 2.1.0): Kafka protocol version.
- `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup
- `logs`
- `topic` (default = otlp\_logs): The name of the Kafka topic from which to consume logs.
- `topic` (Deprecated [v0.142.0]: use `topics`)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- `topic` (Deprecated [v0.142.0]: use `topics`)
- `topic` (Deprecated [v0.141.0]: use `topics`)

(default = otlp\logs): If this is set, it will take precedence over default value of `topics`
- `topics` (default = otlp\_logs): List of kafka topics from which to consume logs
- `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings).
- `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`)
- `exclude_topic` (Deprecated [v0.141.0]: use `exclude_topics`)

(default = ""): If this is set, it will take precedence over default value of `exclude_topics`
- `exclude_topics` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `metrics`
- `topic` (default = otlp\_metrics): The name of the Kafka topic from which to consume metrics.
- `topic` (Deprecated [v0.142.0]: use `topics`)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- `topic` (Deprecated [v0.142.0]: use `topics`)
- `topic` (Deprecated [v0.141.0]: use `topics`)

(default = otlp\_metrics): If this is set, it will take precedence over default value of `topics`
- `topics` (default = otlp\_metrics): List of Kafka topic from which to consume metrics.
- `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings).
- `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`)
- `exclude_topic` (Deprecated [v0.141.0]: use `exclude_topics`)

(default = ""): If this is set, it will take precedence over default value of `exclude_topics`
- `exclude_topics` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `traces`
- `topic` (default = otlp\_spans): The name of the Kafka topic from which to consume traces.
- `topic` (Deprecated [v0.142.0]: use `topics`)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- `topic` (Deprecated [v0.142.0]: use `topics`)
- `topic` (Deprecated [v0.141.0]: use `topics`)

(default = otlp\_spans): If this is set, it will take precedence over default value of `topics`
- `topics` (default = otlp\_spans): List of Kafka topic from which to consume traces.
- `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings).
- `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`)
- `exclude_topic` (Deprecated [v0.141.0]: use `exclude_topics`)

(default = ""): If this is set, it will take precedence over default value of `exclude_topics`
- `exclude_topics` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `profiles`
- `topic` (default = otlp\_profiles): The name of the Kafka topic from which to consume profiles.
- `topic` (Deprecated [v0.142.0]: use `topics`)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- `topic` (Deprecated [v0.142.0]: use `topics`)
- `topic` (Deprecated [v0.141.0]: use `topics`)

(default = otlp\_profiles): If this is set, it will take precedence over default value of `topics`
- `topics` (default = otlp\_profiles): List of Kafka topic from which to consume profiles.
- `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings).
- `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `topic` (Deprecated [v0.124.0]: use `logs::topic`, `traces::topic`, or `metrics::topic`).
If this is set, it will take precedence over the default value for those fields.
- `encoding` (Deprecated [v0.124.0]: use `logs::encoding`, `traces::encoding`, or `metrics::encoding`).
If this is set, it will take precedence over the default value for those fields.
- `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`)
- `exclude_topic` (Deprecated [v0.141.0]: use `exclude_topics`)

(default = ""): If this is set, it will take precedence over default value of `exclude_topics`
- `exclude_topics` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `group_id` (default = otel-collector): The consumer group that receiver will be consuming messages from
- `client_id` (default = otel-collector): The consumer client ID that receiver will use
- `rack_id` (default = ""): The rack identifier for this client. When set and brokers are configured with a rack-aware replica selector, the client will prefer fetching from the closest replica.
Expand Down
92 changes: 75 additions & 17 deletions receiver/kafkareceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,45 @@ func (c *Config) Unmarshal(conf *confmap.Conf) error {
return err
}

// Check if deprecated fields have been explicitly set
// give them precedence
var zeroConfig Config
if err := conf.Unmarshal(&zeroConfig); err != nil {
return err
}

// handle deprecated topic and exclude_topic for log signal
if zeroConfig.Logs.Topic != "" && len(zeroConfig.Logs.Topics) == 0 {
c.Logs.Topics = []string{zeroConfig.Logs.Topic}
}
if zeroConfig.Logs.ExcludeTopic != "" && len(zeroConfig.Logs.ExcludeTopics) == 0 {
c.Logs.ExcludeTopics = []string{zeroConfig.Logs.ExcludeTopic}
}

// handle deprecated topic and exclude_topic for metric signal
if zeroConfig.Metrics.Topic != "" && len(zeroConfig.Logs.Topics) == 0 {
c.Metrics.Topics = []string{zeroConfig.Metrics.Topic}
}
if zeroConfig.Metrics.ExcludeTopic != "" && len(zeroConfig.Logs.ExcludeTopics) == 0 {
c.Metrics.ExcludeTopics = []string{zeroConfig.Metrics.ExcludeTopic}
}

// handle deprecated topic and exclude_topic for trace signal
if zeroConfig.Traces.Topic != "" && len(zeroConfig.Logs.Topics) == 0 {
c.Traces.Topics = []string{zeroConfig.Traces.Topic}
}
if zeroConfig.Traces.ExcludeTopic != "" && len(zeroConfig.Logs.ExcludeTopics) == 0 {
c.Traces.ExcludeTopics = []string{zeroConfig.Traces.ExcludeTopic}
}

// handle deprecated topic and exclude_topic for profile signal
if zeroConfig.Profiles.Topic != "" && len(zeroConfig.Logs.Topics) == 0 {
c.Profiles.Topics = []string{zeroConfig.Profiles.Topic}
}
if zeroConfig.Profiles.ExcludeTopic != "" && len(zeroConfig.Logs.ExcludeTopics) == 0 {
c.Profiles.ExcludeTopics = []string{zeroConfig.Profiles.ExcludeTopic}
}

// Set OnPermanentError default value to inherit from OnError for backward compatibility
// Only if OnPermanentError was not explicitly set in the config
rawConf := conf.Get("message_marking")
Expand All @@ -73,61 +112,80 @@ func (c *Config) Unmarshal(conf *confmap.Conf) error {
// Validate checks the receiver configuration is valid.
func (c *Config) Validate() error {
// Validate that exclude_topic is only used with regex topic patterns
if err := validateExcludeTopic("logs", c.Logs.Topic, c.Logs.ExcludeTopic); err != nil {
if err := validateExcludeTopic("logs", c.Logs.Topics, c.Logs.ExcludeTopics); err != nil {
return err
}
if err := validateExcludeTopic("metrics", c.Metrics.Topic, c.Metrics.ExcludeTopic); err != nil {
if err := validateExcludeTopic("metrics", c.Metrics.Topics, c.Metrics.ExcludeTopics); err != nil {
return err
}
if err := validateExcludeTopic("traces", c.Traces.Topic, c.Traces.ExcludeTopic); err != nil {
if err := validateExcludeTopic("traces", c.Traces.Topics, c.Traces.ExcludeTopics); err != nil {
return err
}
if err := validateExcludeTopic("profiles", c.Profiles.Topic, c.Profiles.ExcludeTopic); err != nil {
if err := validateExcludeTopic("profiles", c.Profiles.Topics, c.Profiles.ExcludeTopics); err != nil {
return err
}
return nil
}

// validateExcludeTopic checks that exclude_topic is only configured when topic uses regex pattern
func validateExcludeTopic(signalType, topic, excludeTopic string) error {
if excludeTopic == "" {
func validateExcludeTopic(signalType string, topics, excludeTopics []string) error {
if len(excludeTopics) == 0 {
return nil // No exclude_topic configured, nothing to validate
}
if !strings.HasPrefix(topic, "^") {

// if none of the configured topic uses regex return error
var usesRegex bool
for _, topic := range topics {
if strings.HasPrefix(topic, "^") {
usesRegex = true
}
}

if !usesRegex {
return fmt.Errorf(
"%s.exclude_topic is configured but %s.topic does not use regex pattern (must start with '^')",
"%s.exclude_topics is configured but none of the configured %s.topics use regex pattern (must start with '^')",
signalType, signalType,
)
}
// Validate that exclude_topic is a valid regex pattern
if _, err := regexp.Compile(excludeTopic); err != nil {
return fmt.Errorf(
"%s.exclude_topic contains invalid regex pattern: %w",
signalType, err,
)

for _, excludeTopic := range excludeTopics {
// Validate that exclude_topic is a valid regex pattern
if _, err := regexp.Compile(excludeTopic); err != nil {
return fmt.Errorf(
"%s.exclude_topic contains invalid regex pattern: %w",
signalType, err,
)
}
}

return nil
}

// TopicEncodingConfig holds signal-specific topic and encoding configuration.
type TopicEncodingConfig struct {
// Topic holds the name of the Kafka topic from which messages of the
// Deprecated [v0.142.0]: Use Topics
Topic string `mapstructure:"topic"`

// Topics holds the name of the Kafka topic from which messages of the
// signal type should be consumed.
//
// The default depends on the signal type:
// - "otlp_spans" for traces
// - "otlp_metrics" for metrics
// - "otlp_logs" for logs
// - "otlp_profiles" for profiles
Topic string `mapstructure:"topic"`
Topics []string `mapstructure:"topics"`

// Encoding holds the expected encoding of messages for the signal type
//
// Defaults to "otlp_proto".
Encoding string `mapstructure:"encoding"`

// Optional exclude topic option, used only in regex mode.
// Deprecated [v0.142.0]: Use ExcludeTopics
ExcludeTopic string `mapstructure:"exclude_topic"`

// Optional exclude topics option, used only in regex mode.
ExcludeTopics []string `mapstructure:"exclude_topics"`
}

type MessageMarking struct {
Expand Down
Loading
Loading