Skip to content
27 changes: 27 additions & 0 deletions .chloggen/support-arrayof-string.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: deprecation

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: receiver/kafka

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support configuring a list of topics and exclude_topics; deprecate topic and exclude_topic

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [44477]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
36 changes: 24 additions & 12 deletions receiver/kafkareceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,37 @@ The following settings can be optionally configured:
- `protocol_version` (default = 2.1.0): Kafka protocol version.
- `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup
- `logs`
- `topic` (default = otlp\_logs): The name of the Kafka topic from which to consume logs.
- `topic` (Deprecated [v0.142.0]: use `topics`)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- `topic` (Deprecated [v0.142.0]: use `topics`)
- `topic` (Deprecated [v0.141.0]: use `topics`)

(default = otlp\logs): If this is set, it will take precedence over default value of `topics`
- `topics` (default = otlp\_logs): List of kafka topics from which to consume logs
- `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings).
- `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`)
- `exclude_topic` (Deprecated [v0.141.0]: use `exclude_topics`)

(default = ""): If this is set, it will take precedence over default value of `exclude_topics`
- `exclude_topics` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `metrics`
- `topic` (default = otlp\_metrics): The name of the Kafka topic from which to consume metrics.
- `topic` (Deprecated [v0.142.0]: use `topics`)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- `topic` (Deprecated [v0.142.0]: use `topics`)
- `topic` (Deprecated [v0.141.0]: use `topics`)

(default = otlp\_metrics): If this is set, it will take precedence over default value of `topics`
- `topics` (default = otlp\_metrics): List of Kafka topic from which to consume metrics.
- `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings).
- `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`)
- `exclude_topic` (Deprecated [v0.141.0]: use `exclude_topics`)

(default = ""): If this is set, it will take precedence over default value of `exclude_topics`
- `exclude_topics` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `traces`
- `topic` (default = otlp\_spans): The name of the Kafka topic from which to consume traces.
- `topic` (Deprecated [v0.142.0]: use `topics`)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- `topic` (Deprecated [v0.142.0]: use `topics`)
- `topic` (Deprecated [v0.141.0]: use `topics`)

(default = otlp\_spans): If this is set, it will take precedence over default value of `topics`
- `topics` (default = otlp\_spans): List of Kafka topic from which to consume traces.
- `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings).
- `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`)
- `exclude_topic` (Deprecated [v0.141.0]: use `exclude_topics`)

(default = ""): If this is set, it will take precedence over default value of `exclude_topics`
- `exclude_topics` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `profiles`
- `topic` (default = otlp\_profiles): The name of the Kafka topic from which to consume profiles.
- `topic` (Deprecated [v0.142.0]: use `topics`)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- `topic` (Deprecated [v0.142.0]: use `topics`)
- `topic` (Deprecated [v0.141.0]: use `topics`)

(default = otlp\_profiles): If this is set, it will take precedence over default value of `topics`
- `topics` (default = otlp\_profiles): List of Kafka topic from which to consume profiles.
- `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings).
- `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `topic` (Deprecated [v0.124.0]: use `logs::topic`, `traces::topic`, or `metrics::topic`).
If this is set, it will take precedence over the default value for those fields.
- `encoding` (Deprecated [v0.124.0]: use `logs::encoding`, `traces::encoding`, or `metrics::encoding`).
If this is set, it will take precedence over the default value for those fields.
- `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`)
- `exclude_topic` (Deprecated [v0.141.0]: use `exclude_topics`)

(default = ""): If this is set, it will take precedence over default value of `exclude_topics`
- `exclude_topics` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `group_id` (default = otel-collector): The consumer group that receiver will be consuming messages from
- `client_id` (default = otel-collector): The consumer client ID that receiver will use
- `rack_id` (default = ""): The rack identifier for this client. When set and brokers are configured with a rack-aware replica selector, the client will prefer fetching from the closest replica.
Expand Down
129 changes: 111 additions & 18 deletions receiver/kafkareceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,79 @@
return err
}

// Check if deprecated fields have been explicitly set
// give them precedence
var zeroConfig Config
if err := conf.Unmarshal(&zeroConfig); err != nil {
return err
}

// handle deprecated topic and exclude_topic for log signal
if zeroConfig.Logs.Topic != "" {
if len(zeroConfig.Logs.Topics) == 0 {

Check failure on line 65 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / scoped-tests-matrix (ubuntu-latest)

early-return: if c { ... } else { ... return } can be simplified to if !c { ... return } ... (revive)

Check failure on line 65 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, receiver-1)

early-return: if c { ... } else { ... return } can be simplified to if !c { ... return } ... (revive)

Check failure on line 65 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, receiver-1)

early-return: if c { ... } else { ... return } can be simplified to if !c { ... return } ... (revive)
c.Logs.Topics = []string{zeroConfig.Logs.Topic}
} else {
return fmt.Errorf("both logs.topic and logs.topics cannot be set")

Check failure on line 68 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / scoped-tests-matrix (ubuntu-latest)

use-errors-new: replace fmt.Errorf by errors.New (revive)

Check failure on line 68 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, receiver-1)

use-errors-new: replace fmt.Errorf by errors.New (revive)

Check failure on line 68 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, receiver-1)

use-errors-new: replace fmt.Errorf by errors.New (revive)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sort of things should be done in Validate

Copy link
Contributor Author

@khushijain21 khushijain21 Nov 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, because we were mutating an empty logs.topics in Unmarshall function here https://github.com/khushijain21/opentelemetry-collector-contrib/blob/kafkarec/receiver/kafkareceiver/config.go#L66 - doing this check in Validate would result in a false error. For ex: it would fail even when just "logs.topic" is set. (We are appending logs.topics above so).

We would have to clear logs.Topic in case we really want this to go in Validate.

So I guess we can keep it here if this is okay. Either way, your call

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

anyway, I moved it to validate in this commit 0fdab34

Copy link
Contributor Author

@khushijain21 khushijain21 Nov 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would have to clear logs.Topic in case we really want this to go in Validate

I cleared the logs.Topic but when we unmarshal it here again https://github.com/khushijain21/opentelemetry-collector-contrib/blob/kafkarec/receiver/kafkareceiver/config.go#L135 - it sets log.topic back to what user configured.

Hence validate() method returns a false error. I think it is okay to do this check in Unmarshal

}
}
if zeroConfig.Logs.ExcludeTopic != "" {
if len(zeroConfig.Logs.ExcludeTopics) == 0 {

Check failure on line 72 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / scoped-tests-matrix (ubuntu-latest)

early-return: if c { ... } else { ... return } can be simplified to if !c { ... return } ... (revive)

Check failure on line 72 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, receiver-1)

early-return: if c { ... } else { ... return } can be simplified to if !c { ... return } ... (revive)

Check failure on line 72 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, receiver-1)

early-return: if c { ... } else { ... return } can be simplified to if !c { ... return } ... (revive)
c.Logs.ExcludeTopics = []string{zeroConfig.Logs.ExcludeTopic}
} else {
return fmt.Errorf("both logs.exclude_topic and logs.exclude_topics cannot be set")

Check failure on line 75 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / scoped-tests-matrix (ubuntu-latest)

use-errors-new: replace fmt.Errorf by errors.New (revive)

Check failure on line 75 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, receiver-1)

use-errors-new: replace fmt.Errorf by errors.New (revive)

Check failure on line 75 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, receiver-1)

use-errors-new: replace fmt.Errorf by errors.New (revive)
}
}

// handle deprecated topic and exclude_topic for metric signal
if zeroConfig.Metrics.Topic != "" {
if len(zeroConfig.Logs.Topics) == 0 {

Check failure on line 81 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / scoped-tests-matrix (ubuntu-latest)

early-return: if c { ... } else { ... return } can be simplified to if !c { ... return } ... (revive)

Check failure on line 81 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, receiver-1)

early-return: if c { ... } else { ... return } can be simplified to if !c { ... return } ... (revive)

Check failure on line 81 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, receiver-1)

early-return: if c { ... } else { ... return } can be simplified to if !c { ... return } ... (revive)
c.Metrics.Topics = []string{zeroConfig.Metrics.Topic}
} else {
return fmt.Errorf("both metrics.topic and metrics.topics cannot be set")

Check failure on line 84 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / scoped-tests-matrix (ubuntu-latest)

use-errors-new: replace fmt.Errorf by errors.New (revive)

Check failure on line 84 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, receiver-1)

use-errors-new: replace fmt.Errorf by errors.New (revive)

Check failure on line 84 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, receiver-1)

use-errors-new: replace fmt.Errorf by errors.New (revive)
}
}

if zeroConfig.Metrics.ExcludeTopic != "" {
if len(zeroConfig.Logs.ExcludeTopics) == 0 {

Check failure on line 89 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / scoped-tests-matrix (ubuntu-latest)

early-return: if c { ... } else { ... return } can be simplified to if !c { ... return } ... (revive)

Check failure on line 89 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, receiver-1)

early-return: if c { ... } else { ... return } can be simplified to if !c { ... return } ... (revive)

Check failure on line 89 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, receiver-1)

early-return: if c { ... } else { ... return } can be simplified to if !c { ... return } ... (revive)
c.Metrics.ExcludeTopics = []string{zeroConfig.Metrics.ExcludeTopic}
} else {
return fmt.Errorf("both metrics.exclude_topic and metrics.exclude_topics cannot be set")

Check failure on line 92 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / scoped-tests-matrix (ubuntu-latest)

use-errors-new: replace fmt.Errorf by errors.New (revive)

Check failure on line 92 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, receiver-1)

use-errors-new: replace fmt.Errorf by errors.New (revive)

Check failure on line 92 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, receiver-1)

use-errors-new: replace fmt.Errorf by errors.New (revive)
}
}

// handle deprecated topic and exclude_topic for trace signal
if zeroConfig.Traces.Topic != "" {
if len(zeroConfig.Logs.Topics) == 0 {

Check failure on line 98 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / scoped-tests-matrix (ubuntu-latest)

early-return: if c { ... } else { ... return } can be simplified to if !c { ... return } ... (revive)

Check failure on line 98 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, receiver-1)

early-return: if c { ... } else { ... return } can be simplified to if !c { ... return } ... (revive)

Check failure on line 98 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, receiver-1)

early-return: if c { ... } else { ... return } can be simplified to if !c { ... return } ... (revive)
c.Traces.Topics = []string{zeroConfig.Traces.Topic}
} else {
return fmt.Errorf("both traces.topic and traces.topics cannot be set")

Check failure on line 101 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / scoped-tests-matrix (ubuntu-latest)

use-errors-new: replace fmt.Errorf by errors.New (revive)

Check failure on line 101 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, receiver-1)

use-errors-new: replace fmt.Errorf by errors.New (revive)

Check failure on line 101 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, receiver-1)

use-errors-new: replace fmt.Errorf by errors.New (revive)
}
}

if zeroConfig.Traces.ExcludeTopic != "" {
if len(zeroConfig.Logs.ExcludeTopics) == 0 {
c.Traces.ExcludeTopics = []string{zeroConfig.Traces.ExcludeTopic}
} else {
return fmt.Errorf("both traces.exclude_topic and traces.exclude_topics cannot be set")
}
}

// handle deprecated topic and exclude_topic for profile signal
if zeroConfig.Profiles.Topic != "" {
if len(zeroConfig.Logs.Topics) == 0 {
c.Profiles.Topics = []string{zeroConfig.Profiles.Topic}
} else {
return fmt.Errorf("both profiles.topic and profiles.topics cannot be set")
}
}
if zeroConfig.Profiles.ExcludeTopic != "" {
if len(zeroConfig.Logs.ExcludeTopics) == 0 {
c.Profiles.ExcludeTopics = []string{zeroConfig.Profiles.ExcludeTopic}
} else {
return fmt.Errorf("both profiles.exclude_topic and profiles.exclude_topics cannot be set")
}
}

// Set OnPermanentError default value to inherit from OnError for backward compatibility
// Only if OnPermanentError was not explicitly set in the config
rawConf := conf.Get("message_marking")
Expand All @@ -73,61 +146,81 @@
// Validate checks the receiver configuration is valid.
func (c *Config) Validate() error {
// Validate that exclude_topic is only used with regex topic patterns
if err := validateExcludeTopic("logs", c.Logs.Topic, c.Logs.ExcludeTopic); err != nil {
if err := validateExcludeTopic("logs", c.Logs.Topics, c.Logs.ExcludeTopics); err != nil {
return err
}
if err := validateExcludeTopic("metrics", c.Metrics.Topic, c.Metrics.ExcludeTopic); err != nil {
if err := validateExcludeTopic("metrics", c.Metrics.Topics, c.Metrics.ExcludeTopics); err != nil {
return err
}
if err := validateExcludeTopic("traces", c.Traces.Topic, c.Traces.ExcludeTopic); err != nil {
if err := validateExcludeTopic("traces", c.Traces.Topics, c.Traces.ExcludeTopics); err != nil {
return err
}
if err := validateExcludeTopic("profiles", c.Profiles.Topic, c.Profiles.ExcludeTopic); err != nil {
if err := validateExcludeTopic("profiles", c.Profiles.Topics, c.Profiles.ExcludeTopics); err != nil {
return err
}
return nil
}

// validateExcludeTopic checks that exclude_topic is only configured when topic uses regex pattern
func validateExcludeTopic(signalType, topic, excludeTopic string) error {
if excludeTopic == "" {
// validateExcludeTopic checks that exclude_topic is only configured when topics uses regex pattern
func validateExcludeTopic(signalType string, topics, excludeTopics []string) error {
if len(excludeTopics) == 0 {
return nil // No exclude_topic configured, nothing to validate
}
if !strings.HasPrefix(topic, "^") {

// if none of the configured topic uses regex return error
var usesRegex bool
for _, topic := range topics {
if strings.HasPrefix(topic, "^") {
usesRegex = true
break
}
}

if !usesRegex {
return fmt.Errorf(
"%s.exclude_topic is configured but %s.topic does not use regex pattern (must start with '^')",
"%s.exclude_topics is configured but none of the configured %s.topics use regex pattern (must start with '^')",
signalType, signalType,
)
}
// Validate that exclude_topic is a valid regex pattern
if _, err := regexp.Compile(excludeTopic); err != nil {
return fmt.Errorf(
"%s.exclude_topic contains invalid regex pattern: %w",
signalType, err,
)

for _, excludeTopic := range excludeTopics {
// Validate that exclude_topic is a valid regex pattern
if _, err := regexp.Compile(excludeTopic); err != nil {
return fmt.Errorf(
"%s.exclude_topic contains invalid regex pattern: %w",
signalType, err,
)
}
}

return nil
}

// TopicEncodingConfig holds signal-specific topic and encoding configuration.
type TopicEncodingConfig struct {
// Topic holds the name of the Kafka topic from which messages of the
// Deprecated [v0.142.0]: Use Topics
Topic string `mapstructure:"topic"`

// Topics holds the name of the Kafka topics from which messages of the
// signal type should be consumed.
//
// The default depends on the signal type:
// - "otlp_spans" for traces
// - "otlp_metrics" for metrics
// - "otlp_logs" for logs
// - "otlp_profiles" for profiles
Topic string `mapstructure:"topic"`
Topics []string `mapstructure:"topics"`

// Encoding holds the expected encoding of messages for the signal type
//
// Defaults to "otlp_proto".
Encoding string `mapstructure:"encoding"`

// Optional exclude topic option, used only in regex mode.
// Deprecated [v0.142.0]: Use ExcludeTopics
ExcludeTopic string `mapstructure:"exclude_topic"`

// Optional exclude topics option, used only in regex mode.
ExcludeTopics []string `mapstructure:"exclude_topics"`
}

type MessageMarking struct {
Expand Down
Loading
Loading