Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions .chloggen/44229.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'bug_fix'

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: processor/redaction

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Improve database sanitization with system-aware obfuscation, span name sanitization, and URL path parameter redaction.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [44229]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
- Database sanitization now validates span kind (CLIENT/SERVER/INTERNAL ) and requires db.system.name/db.system attribute for traces/metrics
- Implemented span name obfuscation for database operations based on db.system
- Added URL path parameter sanitization for span names with configurable pattern matching
- Improved query validation database sanitizers
- Fix issue ensuring no spans with `...` name can be generated due to enabling multiple sanitizers
- If something went wrong during span name sanitization, original span name is used

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
2 changes: 2 additions & 0 deletions processor/redactionprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,5 @@ The database sanitizer will:
- Preserve query structure while removing sensitive data

This provides an additional layer of protection when collecting telemetry that includes database operations.

**Trace and metric behaviour:** Database sanitization for spans and metric attributes only runs when the telemetry includes a `db.system.name` or `db.system` attribute and the span kind is `CLIENT` or `SERVER`. This prevents non-database spans from being rewritten. Logs automatically enable a sequential fallback internally, so database attributes without `db.system` can still be sanitized when they appear in log records.
10 changes: 8 additions & 2 deletions processor/redactionprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,14 @@ func createLogsProcessor(
next consumer.Logs,
) (processor.Logs, error) {
oCfg := cfg.(*Config)

red, err := newRedaction(ctx, oCfg, set.Logger)
logCfg := *oCfg
// Attributes are defined for metrics and traces:
// https://opentelemetry.io/docs/specs/semconv/database/
// For logs, we don't rely on the "db.system.name" attribute to
// do the sanitization.
logCfg.DBSanitizer.AllowFallbackWithoutSystem = true

red, err := newRedaction(ctx, &logCfg, set.Logger)
if err != nil {
return nil, fmt.Errorf("error creating a redaction processor: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion processor/redactionprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
go.opentelemetry.io/collector/processor v1.46.1-0.20251120204106-2e9c82787618
go.opentelemetry.io/collector/processor/processorhelper v0.140.1-0.20251120204106-2e9c82787618
go.opentelemetry.io/collector/processor/processortest v0.140.1-0.20251120204106-2e9c82787618
go.opentelemetry.io/otel v1.38.0
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
golang.org/x/crypto v0.45.0
Expand Down Expand Up @@ -54,7 +55,6 @@ require (
go.opentelemetry.io/collector/pdata/testdata v0.140.1-0.20251120204106-2e9c82787618 // indirect
go.opentelemetry.io/collector/pipeline v1.46.1-0.20251120204106-2e9c82787618 // indirect
go.opentelemetry.io/collector/processor/xprocessor v0.140.1-0.20251120204106-2e9c82787618 // indirect
go.opentelemetry.io/otel v1.38.0 // indirect
go.opentelemetry.io/otel/metric v1.38.0 // indirect
go.opentelemetry.io/otel/sdk v1.38.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.38.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions processor/redactionprocessor/internal/db/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ type DBSanitizerConfig struct {
MongoConfig MongoConfig `mapstructure:"mongo"`
OpenSearchConfig OpenSearchConfig `mapstructure:"opensearch"`
ESConfig ESConfig `mapstructure:"es"`
// AllowFallbackWithoutSystem enables sequential sanitization when `db.system` is missing.
// This is meant for logs contexts and is set internally, not via YAML.
AllowFallbackWithoutSystem bool `mapstructure:"-"`
}

type SQLConfig struct {
Expand Down
178 changes: 129 additions & 49 deletions processor/redactionprocessor/internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,19 @@
package db // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/redactionprocessor/internal/db"

import (
"strings"

"github.com/DataDog/datadog-agent/pkg/obfuscate"
semconv128 "go.opentelemetry.io/otel/semconv/v1.28.0"
"go.uber.org/zap"
)

type Obfuscator struct {
obfuscators []databaseObfuscator
processAttributesEnabled bool
obfuscators []databaseObfuscator
processAttributesEnabled bool
logger *zap.Logger
allowFallbackWithoutSystem bool
DBSystem string
}

func createAttributes(attributes []string) map[string]bool {
Expand All @@ -20,7 +27,10 @@ func createAttributes(attributes []string) map[string]bool {
return attributesMap
}

func NewObfuscator(cfg DBSanitizerConfig) *Obfuscator {
func NewObfuscator(cfg DBSanitizerConfig, logger *zap.Logger) *Obfuscator {
if logger == nil {
logger = zap.NewNop()
}
o := obfuscate.NewObfuscator(obfuscate.Config{
SQL: obfuscate.SQLConfig{
ReplaceDigits: true,
Expand Down Expand Up @@ -49,83 +59,92 @@ func NewObfuscator(cfg DBSanitizerConfig) *Obfuscator {
processAttributesEnabled := false

if cfg.SQLConfig.Enabled {
attributes := createAttributes(cfg.SQLConfig.Attributes)
processAttributesEnabled = processAttributesEnabled || len(attributes) > 0
dbAttrs := newDBAttributes(cfg.SQLConfig.Attributes, []string{
semconv128.DBSystemOtherSQL.Value.AsString(),
semconv128.DBSystemMySQL.Value.AsString(),
semconv128.DBSystemPostgreSQL.Value.AsString(),
semconv128.DBSystemMariaDB.Value.AsString(),
semconv128.DBSystemSqlite.Value.AsString(),
})
processAttributesEnabled = processAttributesEnabled || len(dbAttrs.attributes) > 0
obfuscators = append(obfuscators, &sqlObfuscator{
dbAttributes: dbAttributes{
attributes: attributes,
},
obfuscator: o,
dbAttributes: dbAttrs,
obfuscator: o,
})
}

if cfg.RedisConfig.Enabled {
attributes := createAttributes(cfg.RedisConfig.Attributes)
processAttributesEnabled = processAttributesEnabled || len(attributes) > 0
dbAttrs := newDBAttributes(cfg.RedisConfig.Attributes, []string{
semconv128.DBSystemRedis.Value.AsString(),
})
processAttributesEnabled = processAttributesEnabled || len(dbAttrs.attributes) > 0
obfuscators = append(obfuscators, &redisObfuscator{
dbAttributes: dbAttributes{
attributes: attributes,
},
obfuscator: o,
dbAttributes: dbAttrs,
obfuscator: o,
})
}

if cfg.ValkeyConfig.Enabled {
attributes := createAttributes(cfg.ValkeyConfig.Attributes)
processAttributesEnabled = processAttributesEnabled || len(attributes) > 0
obfuscators = append(obfuscators, &valkeyObfuscator{
dbAttributes: dbAttributes{
attributes: attributes,
},
obfuscator: o,
dbAttrs := newDBAttributes(cfg.ValkeyConfig.Attributes, []string{
"valkey", // Not part of semantic conventions
})
processAttributesEnabled = processAttributesEnabled || len(dbAttrs.attributes) > 0
obfuscators = append(obfuscators, &redisObfuscator{
dbAttributes: dbAttrs,
obfuscator: o,
})
}

if cfg.MemcachedConfig.Enabled {
attributes := createAttributes(cfg.MemcachedConfig.Attributes)
processAttributesEnabled = processAttributesEnabled || len(attributes) > 0
dbAttrs := newDBAttributes(cfg.MemcachedConfig.Attributes, []string{
semconv128.DBSystemMemcached.Value.AsString(),
})
processAttributesEnabled = processAttributesEnabled || len(dbAttrs.attributes) > 0
obfuscators = append(obfuscators, &memcachedObfuscator{
dbAttributes: dbAttributes{
attributes: attributes,
},
obfuscator: o,
dbAttributes: dbAttrs,
obfuscator: o,
})
}

if cfg.MongoConfig.Enabled {
attributes := createAttributes(cfg.MongoConfig.Attributes)
processAttributesEnabled = processAttributesEnabled || len(attributes) > 0
dbAttrs := newDBAttributes(cfg.MongoConfig.Attributes, []string{
semconv128.DBSystemMongoDB.Value.AsString(),
})
processAttributesEnabled = processAttributesEnabled || len(dbAttrs.attributes) > 0
obfuscators = append(obfuscators, &mongoObfuscator{
dbAttributes: dbAttributes{
attributes: attributes,
},
obfuscator: o,
dbAttributes: dbAttrs,
obfuscator: o,
logger: logger,
})
}

if cfg.OpenSearchConfig.Enabled {
attributes := createAttributes([]string{})
dbAttrs := newDBAttributes([]string{}, []string{
"opensearch", // Not part of semantic conventions
})
obfuscators = append(obfuscators, &opensearchObfuscator{
dbAttributes: dbAttributes{
attributes: attributes,
},
obfuscator: o,
dbAttributes: dbAttrs,
obfuscator: o,
logger: logger,
})
}

if cfg.ESConfig.Enabled {
attributes := createAttributes([]string{})
dbAttrs := newDBAttributes([]string{}, []string{
semconv128.DBSystemElasticsearch.Value.AsString(),
})
obfuscators = append(obfuscators, &esObfuscator{
dbAttributes: dbAttributes{
attributes: attributes,
},
obfuscator: o,
dbAttributes: dbAttrs,
obfuscator: o,
logger: logger,
})
}

return &Obfuscator{
obfuscators: obfuscators,
processAttributesEnabled: processAttributesEnabled,
obfuscators: obfuscators,
processAttributesEnabled: processAttributesEnabled,
logger: logger,
allowFallbackWithoutSystem: cfg.AllowFallbackWithoutSystem,
}
}

Expand All @@ -144,14 +163,40 @@ func (o *Obfuscator) ObfuscateAttribute(attributeValue, attributeKey string) (st
if !o.HasSpecificAttributes() {
return attributeValue, nil
}

if o.DBSystem == "" {
if o.allowFallbackWithoutSystem {
return o.obfuscateSequentially(attributeValue, attributeKey)
}
return attributeValue, nil
}

for _, obfuscator := range o.obfuscators {
obfuscatedValue, err := obfuscator.ObfuscateAttribute(attributeValue, attributeKey)
if !obfuscator.SupportsSystem(o.DBSystem) {
continue
}
if !obfuscator.ShouldProcessAttribute(attributeKey) {
continue
}
return obfuscator.ObfuscateAttribute(attributeValue, attributeKey)
}

return attributeValue, nil
}

func (o *Obfuscator) obfuscateSequentially(attributeValue, attributeKey string) (string, error) {
result := attributeValue
for _, obfuscator := range o.obfuscators {
if !obfuscator.ShouldProcessAttribute(attributeKey) {
continue
}
obfuscatedValue, err := obfuscator.ObfuscateAttribute(result, attributeKey)
if err != nil {
return attributeValue, err
}
attributeValue = obfuscatedValue
result = obfuscatedValue
}
return attributeValue, nil
return result, nil
}

func (o *Obfuscator) HasSpecificAttributes() bool {
Expand All @@ -161,3 +206,38 @@ func (o *Obfuscator) HasSpecificAttributes() bool {
func (o *Obfuscator) HasObfuscators() bool {
return len(o.obfuscators) > 0
}

func (o *Obfuscator) ObfuscateWithSystem(val, dbSystem string) (string, error) {
if !o.HasObfuscators() {
return val, nil
}
if dbSystem == "" {
return val, nil
}
lower := strings.ToLower(dbSystem)
for _, obfuscator := range o.obfuscators {
if !obfuscator.SupportsSystem(lower) {
continue
}
return obfuscator.ObfuscateWithSystem(val, lower)
}
return val, nil
}

func createSystems(systems []string) map[string]bool {
if len(systems) == 0 {
return nil
}
systemsMap := make(map[string]bool, len(systems))
for _, system := range systems {
systemsMap[strings.ToLower(system)] = true
}
return systemsMap
}

func newDBAttributes(attributes, systems []string) dbAttributes {
return dbAttributes{
attributes: createAttributes(attributes),
dbSystems: createSystems(systems),
}
}
Loading