Skip to content
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f5b98de
[receiver/kafka] Add support for topic and exclude_topic as an array
khushijain21 Nov 27, 2025
f65e86b
Merge branch 'main' into kafkarec
khushijain21 Nov 27, 2025
df16d03
fix commit
khushijain21 Nov 28, 2025
fb85547
Merge branch 'main' into kafkarec
khushijain21 Nov 28, 2025
9237a1b
Update receiver/kafkareceiver/config.go
khushijain21 Nov 28, 2025
88957f1
Update .chloggen/support-arrayof-string.yaml
khushijain21 Nov 28, 2025
e875d97
Update .chloggen/support-arrayof-string.yaml
khushijain21 Nov 28, 2025
bda0ef6
address review comments
khushijain21 Nov 28, 2025
7de8cc1
Merge branch 'main' into kafkarec
khushijain21 Nov 28, 2025
0fdab34
move to validate
khushijain21 Nov 28, 2025
add9ed8
fix test
khushijain21 Nov 28, 2025
b608202
Update receiver/kafkareceiver/README.md
khushijain21 Dec 1, 2025
6df431c
Update receiver/kafkareceiver/README.md
khushijain21 Dec 1, 2025
9756bf3
Update receiver/kafkareceiver/README.md
khushijain21 Dec 1, 2025
8109d33
Update receiver/kafkareceiver/README.md
khushijain21 Dec 1, 2025
91ab746
Update receiver/kafkareceiver/README.md
khushijain21 Dec 1, 2025
d8f527b
Update receiver/kafkareceiver/README.md
khushijain21 Dec 1, 2025
37134eb
Update receiver/kafkareceiver/README.md
khushijain21 Dec 1, 2025
c18168d
Update receiver/kafkareceiver/README.md
khushijain21 Dec 1, 2025
f172ad1
Merge branch 'main' into kafkarec
khushijain21 Dec 1, 2025
ae18185
fix lint
khushijain21 Dec 1, 2025
f6e8760
Merge branch 'main' into kafkarec
khushijain21 Dec 1, 2025
6aea572
fix test
khushijain21 Dec 1, 2025
1c74e61
fix version
khushijain21 Dec 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/support-arrayof-string.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: deprecation

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: receiver/kafka

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Support configuring a list of topics and exclude_topics; deprecate topic and exclude_topic

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [44477]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
36 changes: 24 additions & 12 deletions receiver/kafkareceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,37 @@ The following settings can be optionally configured:
- `protocol_version` (default = 2.1.0): Kafka protocol version.
- `resolve_canonical_bootstrap_servers_only` (default = false): Whether to resolve then reverse-lookup broker IPs during startup
- `logs`
- `topic` (default = otlp\_logs): The name of the Kafka topic from which to consume logs.
- `topic` (Deprecated [v0.142.0]: use `topics`)
(default = otlp\logs): If this is set, it will take precedence over default value of `topics`
- `topics` (default = otlp\_logs): List of kafka topics from which to consume logs
- `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings).
- `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`)
(default = ""): If this is set, it will take precedence over default value of `exclude_topics`
- `exclude_topics` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `metrics`
- `topic` (default = otlp\_metrics): The name of the Kafka topic from which to consume metrics.
- `topic` (Deprecated [v0.142.0]: use `topics`)
(default = otlp\_metrics): If this is set, it will take precedence over default value of `topics`
- `topics` (default = otlp\_metrics): List of Kafka topic from which to consume metrics.
- `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings).
- `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`)
(default = ""): If this is set, it will take precedence over default value of `exclude_topics`
- `exclude_topics` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `traces`
- `topic` (default = otlp\_spans): The name of the Kafka topic from which to consume traces.
- `topic` (Deprecated [v0.142.0]: use `topics`)
(default = otlp\_spans): If this is set, it will take precedence over default value of `topics`
- `topics` (default = otlp\_spans): List of Kafka topic from which to consume traces.
- `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings).
- `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`)
(default = ""): If this is set, it will take precedence over default value of `exclude_topics`
- `exclude_topics` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `profiles`
- `topic` (default = otlp\_profiles): The name of the Kafka topic from which to consume profiles.
- `topic` (Deprecated [v0.142.0]: use `topics`)
(default = otlp\_profiles): If this is set, it will take precedence over default value of `topics`
- `topics` (default = otlp\_profiles): List of Kafka topic from which to consume profiles.
- `encoding` (default = otlp\_proto): The encoding for the Kafka topic. See [Supported encodings](#supported-encodings).
- `exclude_topic` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `topic` (Deprecated [v0.124.0]: use `logs::topic`, `traces::topic`, or `metrics::topic`).
If this is set, it will take precedence over the default value for those fields.
- `encoding` (Deprecated [v0.124.0]: use `logs::encoding`, `traces::encoding`, or `metrics::encoding`).
If this is set, it will take precedence over the default value for those fields.
- `exclude_topic` (Deprecated [v0.142.0]: use `exclude_topics`)
(default = ""): If this is set, it will take precedence over default value of `exclude_topics`
- `exclude_topics` (default = ""): When using regex topic patterns (prefix with `^`), this regex pattern excludes matching topics. Only works with franz-go client and when topic uses regex.
- `group_id` (default = otel-collector): The consumer group that receiver will be consuming messages from
- `client_id` (default = otel-collector): The consumer client ID that receiver will use
- `rack_id` (default = ""): The rack identifier for this client. When set and brokers are configured with a rack-aware replica selector, the client will prefer fetching from the closest replica.
Expand Down
147 changes: 129 additions & 18 deletions receiver/kafkareceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,71 @@
return err
}

// Check if deprecated fields have been explicitly set
// give them precedence
var zeroConfig Config
if err := conf.Unmarshal(&zeroConfig); err != nil {
return err
}

// handle deprecated topic and exclude_topic for log signal
if zeroConfig.Logs.Topic != "" {
if len(zeroConfig.Logs.Topics) == 0 {
c.Logs.Topics = []string{zeroConfig.Logs.Topic}
c.Logs.Topic = ""
}
}
if zeroConfig.Logs.ExcludeTopic != "" {
if len(zeroConfig.Logs.ExcludeTopics) == 0 {
c.Logs.ExcludeTopics = []string{zeroConfig.Logs.ExcludeTopic}
c.Logs.ExcludeTopic = ""
}
}

// handle deprecated topic and exclude_topic for metric signal
if zeroConfig.Metrics.Topic != "" {
if len(zeroConfig.Logs.Topics) == 0 {
c.Metrics.Topics = []string{zeroConfig.Metrics.Topic}
c.Metrics.Topic = ""
}
}

if zeroConfig.Metrics.ExcludeTopic != "" {
if len(zeroConfig.Logs.ExcludeTopics) == 0 {
c.Metrics.ExcludeTopics = []string{zeroConfig.Metrics.ExcludeTopic}
c.Metrics.ExcludeTopic = ""
}
}

// handle deprecated topic and exclude_topic for trace signal
if zeroConfig.Traces.Topic != "" {
if len(zeroConfig.Logs.Topics) == 0 {
c.Traces.Topics = []string{zeroConfig.Traces.Topic}
c.Traces.Topic = ""
}
}

if zeroConfig.Traces.ExcludeTopic != "" {
if len(zeroConfig.Logs.ExcludeTopics) == 0 {
c.Traces.ExcludeTopics = []string{zeroConfig.Traces.ExcludeTopic}
c.Traces.ExcludeTopic = ""
}
}

// handle deprecated topic and exclude_topic for profile signal
if zeroConfig.Profiles.Topic != "" {
if len(zeroConfig.Logs.Topics) == 0 {
c.Profiles.Topics = []string{zeroConfig.Profiles.Topic}
c.Profiles.Topic = ""
}
}
if zeroConfig.Profiles.ExcludeTopic != "" {
if len(zeroConfig.Logs.ExcludeTopics) == 0 {
c.Profiles.ExcludeTopics = []string{zeroConfig.Profiles.ExcludeTopic}
c.Profiles.ExcludeTopic = ""
}
}

// Set OnPermanentError default value to inherit from OnError for backward compatibility
// Only if OnPermanentError was not explicitly set in the config
rawConf := conf.Get("message_marking")
Expand All @@ -72,62 +137,108 @@

// Validate checks the receiver configuration is valid.
func (c *Config) Validate() error {
if c.Logs.Topic != "" && len(c.Logs.Topics) != 0 {
return fmt.Errorf("both logs.topic and logs.topics cannot be set")

Check failure on line 141 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, receiver-1)

use-errors-new: replace fmt.Errorf by errors.New (revive)

Check failure on line 141 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, receiver-1)

use-errors-new: replace fmt.Errorf by errors.New (revive)
}
if c.Metrics.Topic != "" && len(c.Metrics.Topics) != 0 {
return fmt.Errorf("both metrics.topic and metrics.topics cannot be set")

Check failure on line 144 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, receiver-1)

use-errors-new: replace fmt.Errorf by errors.New (revive)

Check failure on line 144 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, receiver-1)

use-errors-new: replace fmt.Errorf by errors.New (revive)
}
if c.Traces.Topic != "" && len(c.Traces.Topics) != 0 {
return fmt.Errorf("both traces.topic and traces.topics cannot be set")

Check failure on line 147 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, receiver-1)

use-errors-new: replace fmt.Errorf by errors.New (revive)

Check failure on line 147 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, receiver-1)

use-errors-new: replace fmt.Errorf by errors.New (revive)
}
if c.Profiles.Topic != "" && len(c.Profiles.Topics) != 0 {
return fmt.Errorf("both profiles.topic and profiles.topics cannot be set")

Check failure on line 150 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, receiver-1)

use-errors-new: replace fmt.Errorf by errors.New (revive)

Check failure on line 150 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, receiver-1)

use-errors-new: replace fmt.Errorf by errors.New (revive)
}

if c.Logs.ExcludeTopic != "" && len(c.Logs.ExcludeTopics) != 0 {
return fmt.Errorf("both logs.exclude_topic and logs.exclude_topics cannot be set")

Check failure on line 154 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, receiver-1)

use-errors-new: replace fmt.Errorf by errors.New (revive)

Check failure on line 154 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, receiver-1)

use-errors-new: replace fmt.Errorf by errors.New (revive)
}
if c.Metrics.ExcludeTopic != "" && len(c.Metrics.ExcludeTopics) != 0 {
return fmt.Errorf("both metrics.exclude_topic and metrics.exclude_topics cannot be set")

Check failure on line 157 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, receiver-1)

use-errors-new: replace fmt.Errorf by errors.New (revive)

Check failure on line 157 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, receiver-1)

use-errors-new: replace fmt.Errorf by errors.New (revive)
}
if c.Traces.ExcludeTopic != "" && len(c.Traces.ExcludeTopics) != 0 {
return fmt.Errorf("both traces.exclude_topic and traces.exclude_topic cannot be set")

Check failure on line 160 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, receiver-1)

use-errors-new: replace fmt.Errorf by errors.New (revive)

Check failure on line 160 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, receiver-1)

use-errors-new: replace fmt.Errorf by errors.New (revive)
}
if c.Profiles.Topic != "" && len(c.Profiles.Topics) != 0 {
return fmt.Errorf("both profiles.exclude_topic and profiles.exclude_topics cannot be set")

Check failure on line 163 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, receiver-1)

use-errors-new: replace fmt.Errorf by errors.New (revive)

Check failure on line 163 in receiver/kafkareceiver/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, receiver-1)

use-errors-new: replace fmt.Errorf by errors.New (revive)
}

// Validate that exclude_topic is only used with regex topic patterns
if err := validateExcludeTopic("logs", c.Logs.Topic, c.Logs.ExcludeTopic); err != nil {
if err := validateExcludeTopic("logs", c.Logs.Topics, c.Logs.ExcludeTopics); err != nil {
return err
}
if err := validateExcludeTopic("metrics", c.Metrics.Topic, c.Metrics.ExcludeTopic); err != nil {
if err := validateExcludeTopic("metrics", c.Metrics.Topics, c.Metrics.ExcludeTopics); err != nil {
return err
}
if err := validateExcludeTopic("traces", c.Traces.Topic, c.Traces.ExcludeTopic); err != nil {
if err := validateExcludeTopic("traces", c.Traces.Topics, c.Traces.ExcludeTopics); err != nil {
return err
}
if err := validateExcludeTopic("profiles", c.Profiles.Topic, c.Profiles.ExcludeTopic); err != nil {
if err := validateExcludeTopic("profiles", c.Profiles.Topics, c.Profiles.ExcludeTopics); err != nil {
return err
}
return nil
}

// validateExcludeTopic checks that exclude_topic is only configured when topic uses regex pattern
func validateExcludeTopic(signalType, topic, excludeTopic string) error {
if excludeTopic == "" {
// validateExcludeTopic checks that exclude_topic is only configured when topics uses regex pattern
func validateExcludeTopic(signalType string, topics, excludeTopics []string) error {
if len(excludeTopics) == 0 {
return nil // No exclude_topic configured, nothing to validate
}
if !strings.HasPrefix(topic, "^") {

// if none of the configured topic uses regex return error
var usesRegex bool
for _, topic := range topics {
if strings.HasPrefix(topic, "^") {
usesRegex = true
break
}
}

if !usesRegex {
return fmt.Errorf(
"%s.exclude_topic is configured but %s.topic does not use regex pattern (must start with '^')",
"%s.exclude_topics is configured but none of the configured %s.topics use regex pattern (must start with '^')",
signalType, signalType,
)
}
// Validate that exclude_topic is a valid regex pattern
if _, err := regexp.Compile(excludeTopic); err != nil {
return fmt.Errorf(
"%s.exclude_topic contains invalid regex pattern: %w",
signalType, err,
)

for _, excludeTopic := range excludeTopics {
// Validate that exclude_topic is a valid regex pattern
if _, err := regexp.Compile(excludeTopic); err != nil {
return fmt.Errorf(
"%s.exclude_topic contains invalid regex pattern: %w",
signalType, err,
)
}
}

return nil
}

// TopicEncodingConfig holds signal-specific topic and encoding configuration.
type TopicEncodingConfig struct {
// Topic holds the name of the Kafka topic from which messages of the
// Deprecated [v0.142.0]: Use Topics
Topic string `mapstructure:"topic"`

// Topics holds the name of the Kafka topics from which messages of the
// signal type should be consumed.
//
// The default depends on the signal type:
// - "otlp_spans" for traces
// - "otlp_metrics" for metrics
// - "otlp_logs" for logs
// - "otlp_profiles" for profiles
Topic string `mapstructure:"topic"`
Topics []string `mapstructure:"topics"`

// Encoding holds the expected encoding of messages for the signal type
//
// Defaults to "otlp_proto".
Encoding string `mapstructure:"encoding"`

// Optional exclude topic option, used only in regex mode.
// Deprecated [v0.142.0]: Use ExcludeTopics
ExcludeTopic string `mapstructure:"exclude_topic"`

// Optional exclude topics option, used only in regex mode.
ExcludeTopics []string `mapstructure:"exclude_topics"`
}

type MessageMarking struct {
Expand Down
Loading
Loading