Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add experimental observability metrics for manual reader in `go.opentelemetry.io/otel/sdk/metric`. (#7524)
- Add experimental observability metrics for periodic reader in `go.opentelemetry.io/otel/sdk/metric`. (#7571)
- Support `OTEL_EXPORTER_OTLP_LOGS_INSECURE` and `OTEL_EXPORTER_OTLP_INSECURE` environmental variables in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp`. (#7608)
- Add `Enabled` method to the `Processor` interface in `go.opentelemetry.io/otel/sdk/log`.
All `Processor` implementations now include an `Enabled` method. (#7639)

### Fixed

Expand All @@ -40,6 +42,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
### Removed

- Drop support for [Go 1.23]. (#7274)
- Remove the `FilterProcessor` interface in `go.opentelemetry.io/otel/sdk/log`.
The `Enabled` method has been added to the `Processor` interface instead.
All `Processor` implementations must now implement the `Enabled` method.
Custom processors that do not filter records can implement `Enabled` to return `true`. (#7639)

### Changed

Expand Down
5 changes: 5 additions & 0 deletions sdk/log/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@ func (b *BatchProcessor) OnEmit(_ context.Context, r *Record) error {
return nil
}

// Enabled returns true, indicating this Processor will process all records.
func (*BatchProcessor) Enabled(context.Context, EnabledParameters) bool {
return true
}

// Shutdown flushes queued log records and shuts down the decorated exporter.
func (b *BatchProcessor) Shutdown(ctx context.Context) error {
if b.stopped.Swap(true) || b.q == nil {
Expand Down
6 changes: 3 additions & 3 deletions sdk/log/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (timestampProcessor) OnEmit(_ context.Context, r *Record) error {
return nil
}

func (timestampProcessor) Enabled(context.Context, Record) bool {
func (timestampProcessor) Enabled(context.Context, EnabledParameters) bool {
return true
}

Expand All @@ -155,7 +155,7 @@ func (attrAddProcessor) OnEmit(_ context.Context, r *Record) error {
return nil
}

func (attrAddProcessor) Enabled(context.Context, Record) bool {
func (attrAddProcessor) Enabled(context.Context, EnabledParameters) bool {
return true
}

Expand All @@ -174,7 +174,7 @@ func (attrSetDecorator) OnEmit(_ context.Context, r *Record) error {
return nil
}

func (attrSetDecorator) Enabled(context.Context, Record) bool {
func (attrSetDecorator) Enabled(context.Context, EnabledParameters) bool {
return true
}

Expand Down
27 changes: 12 additions & 15 deletions sdk/log/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"fmt"
"strings"
"sync"

logapi "go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/log/global"
Expand Down Expand Up @@ -52,7 +51,7 @@ func Example() {
}

// Use a processor that filters out records based on the provided context.
func ExampleFilterProcessor() {
func ExampleProcessor_contextFilter() {
// Existing processor that emits telemetry.
var processor log.Processor = log.NewBatchProcessor(nil)

Expand Down Expand Up @@ -82,15 +81,8 @@ func WithIgnoreLogs(ctx context.Context) context.Context {
// [WithIgnoreLogs] is passed to its methods.
type ContextFilterProcessor struct {
log.Processor

lazyFilter sync.Once
// Support the FilterProcessor interface for the embedded processor.
filter log.FilterProcessor
}

// Compile time check.
var _ log.FilterProcessor = (*ContextFilterProcessor)(nil)

func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record *log.Record) error {
if ignoreLogs(ctx) {
return nil
Expand All @@ -99,12 +91,7 @@ func (p *ContextFilterProcessor) OnEmit(ctx context.Context, record *log.Record)
}

func (p *ContextFilterProcessor) Enabled(ctx context.Context, param log.EnabledParameters) bool {
p.lazyFilter.Do(func() {
if f, ok := p.Processor.(log.FilterProcessor); ok {
p.filter = f
}
})
return !ignoreLogs(ctx) && (p.filter == nil || p.filter.Enabled(ctx, param))
return !ignoreLogs(ctx) && p.Processor.Enabled(ctx, param)
}

func ignoreLogs(ctx context.Context) bool {
Expand Down Expand Up @@ -150,6 +137,11 @@ func (*EventNameProcessor) OnEmit(_ context.Context, record *log.Record) error {
return nil
}

// Enabled returns true, indicating this Processor will process all records.
func (*EventNameProcessor) Enabled(context.Context, log.EnabledParameters) bool {
return true
}

// Shutdown returns nil.
func (*EventNameProcessor) Shutdown(context.Context) error {
return nil
Expand Down Expand Up @@ -193,6 +185,11 @@ func (*RedactTokensProcessor) OnEmit(_ context.Context, record *log.Record) erro
return nil
}

// Enabled returns true, indicating this Processor will process all records.
func (*RedactTokensProcessor) Enabled(context.Context, log.EnabledParameters) bool {
return true
}

// Shutdown returns nil.
func (*RedactTokensProcessor) Shutdown(context.Context) error {
return nil
Expand Down
61 changes: 0 additions & 61 deletions sdk/log/filter_processor.go

This file was deleted.

20 changes: 5 additions & 15 deletions sdk/log/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ func (l *logger) Emit(ctx context.Context, r log.Record) {
}

// Enabled returns true if at least one Processor held by the LoggerProvider
// that created the logger will process param for the provided context and param.
// that created the logger will process for the provided context and param.
//
// If it is not possible to definitively determine the param will be
// If it is not possible to definitively determine the record will be
// processed, true will be returned by default. A value of false will only be
// returned if it can be positively verified that no Processor will process.
func (l *logger) Enabled(ctx context.Context, param log.EnabledParameters) bool {
Expand All @@ -66,23 +66,13 @@ func (l *logger) Enabled(ctx context.Context, param log.EnabledParameters) bool
EventName: param.EventName,
}

// If there are more Processors than FilterProcessors,
// which means not all Processors are FilterProcessors,
// we cannot be sure that all Processors will drop the record.
// Therefore, return true.
//
// If all Processors are FilterProcessors, check if any is enabled.
return len(l.provider.processors) > len(l.provider.fltrProcessors) || anyEnabled(ctx, p, l.provider.fltrProcessors)
}

func anyEnabled(ctx context.Context, param EnabledParameters, fltrs []FilterProcessor) bool {
for _, f := range fltrs {
if f.Enabled(ctx, param) {
for _, processor := range l.provider.processors {
if processor.Enabled(ctx, p) {
// At least one Processor will process the Record.
return true
}
}
// No Processor will process the record
// No Processor will process the record.
return false
}

Expand Down
44 changes: 41 additions & 3 deletions sdk/log/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,16 @@ package log // import "go.opentelemetry.io/otel/sdk/log"

import (
"context"

"go.opentelemetry.io/otel/log"
"go.opentelemetry.io/otel/sdk/instrumentation"
)

// Processor handles the processing of log records.
//
// Any of the Processor's methods may be called concurrently with itself
// or with other methods. It is the responsibility of the Processor to manage
// this concurrency.
//
// See [FilterProcessor] for information about how a Processor can support filtering.
type Processor interface {
// OnEmit is called when a Record is emitted.
//
Expand All @@ -32,11 +33,41 @@ type Processor interface {
// they were registered using WithProcessor.
// Implementations may synchronously modify the record so that the changes
// are visible in the next registered processor.
// Notice that Record is not concurrent safe. Therefore, asynchronous
//
// Note that Record is not concurrent safe. Therefore, asynchronous
// processing may cause race conditions. Use Record.Clone
// to create a copy that shares no state with the original.
OnEmit(ctx context.Context, record *Record) error

// Enabled reports whether the Processor will process for the given context
// and param.
//
// The passed param is likely to be partial record information being
// provided (e.g. a param with only the Severity set).
Comment on lines +22 to +23
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment text seems to apply more to the previous version. EnabledParameters contains at most the Severity and EventName.

Copy link
Member Author

@pellared pellared Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which is still partial record information. This may contain more fields in the future. Still we can rephrase it if you have a proposal.

Yet, I prefer doing it in a separate PR. This PR tries to keep the existing documentation.

// If a Processor needs more information than is provided, it
// is said to be in an indeterminate state (see below).
//
// The returned value will be true when the Processor will process for the
// provided context and param, and will be false if the Processor will not
// process. The returned value may be true or false in an indeterminate state.
// An implementation should default to returning true for an indeterminate
// state, but may return false if valid reasons in particular circumstances
// exist (e.g. performance, correctness).
Comment on lines +24 to +32
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't see why this belongs as documentation in the interface. What would I as a user do based on this information?

Copy link
Member Author

@pellared pellared Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for users implementing this interface.

I will try rephrasing but I prefer doing it in a separate PR. This PR tries to keep the existing documentation.

//
// The param should not be held by the implementation. A copy should be
// made if the param needs to be held after the call returns.
//
// Processor implementations are expected to re-evaluate the [Record] passed
// to OnEmit. It is not expected that the caller to OnEmit will
// use the result from Enabled prior to calling OnEmit.
Comment on lines +34 to +39
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This all seems to apply only to the previous implementation?

Copy link
Member Author

@pellared pellared Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation has not changed. However, I think you are right that this comment is unnecessary (param is immutable).

I will propose removing but I prefer doing it in a separate PR. This PR tries to keep the existing documentation.

//
// The SDK's Logger.Enabled returns false if all the registered processors
// return false. Otherwise, it returns true.
Comment on lines +41 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Belongs with Logger; does not belong on the generic interface.

Copy link
Member Author

@pellared pellared Nov 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue is that the Logger implantation is not exported so I am not sure where is a better place to put this information. Do you have any proposal?

//
// Implementations of this method need to be safe for a user to call
// concurrently.
Enabled(ctx context.Context, param EnabledParameters) bool

// Shutdown is called when the SDK shuts down. Any cleanup or release of
// resources held by the exporter should be done in this call.
//
Expand All @@ -54,3 +85,10 @@ type Processor interface {
// appropriate error should be returned in these situations.
ForceFlush(ctx context.Context) error
}

// EnabledParameters represents payload for [Processor]'s Enabled method.
type EnabledParameters struct {
InstrumentationScope instrumentation.Scope
Severity log.Severity
EventName string
}
18 changes: 5 additions & 13 deletions sdk/log/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ const (
)

type providerConfig struct {
resource *resource.Resource
processors []Processor
fltrProcessors []FilterProcessor
attrCntLim setting[int]
attrValLenLim setting[int]
allowDupKeys setting[bool]
resource *resource.Resource
processors []Processor
attrCntLim setting[int]
attrValLenLim setting[int]
allowDupKeys setting[bool]
}

func newProviderConfig(opts []LoggerProviderOption) providerConfig {
Expand Down Expand Up @@ -65,7 +64,6 @@ type LoggerProvider struct {

resource *resource.Resource
processors []Processor
fltrProcessors []FilterProcessor
attributeCountLimit int
attributeValueLengthLimit int
allowDupKeys bool
Expand All @@ -92,7 +90,6 @@ func NewLoggerProvider(opts ...LoggerProviderOption) *LoggerProvider {
return &LoggerProvider{
resource: cfg.resource,
processors: cfg.processors,
fltrProcessors: cfg.fltrProcessors,
attributeCountLimit: cfg.attrCntLim.Value,
attributeValueLengthLimit: cfg.attrValLenLim.Value,
allowDupKeys: cfg.allowDupKeys.Value,
Expand Down Expand Up @@ -208,14 +205,9 @@ func WithResource(res *resource.Resource) LoggerProviderOption {
//
// For production, use [NewBatchProcessor] to batch log records before they are exported.
// For testing and debugging, use [NewSimpleProcessor] to synchronously export log records.
//
// See [FilterProcessor] for information about how a Processor can support filtering.
func WithProcessor(processor Processor) LoggerProviderOption {
return loggerProviderOptionFunc(func(cfg providerConfig) providerConfig {
cfg.processors = append(cfg.processors, processor)
if f, ok := processor.(FilterProcessor); ok {
cfg.fltrProcessors = append(cfg.fltrProcessors, f)
}
return cfg
})
}
Expand Down
6 changes: 4 additions & 2 deletions sdk/log/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func (p *processor) OnEmit(_ context.Context, r *Record) error {
return nil
}

func (*processor) Enabled(context.Context, EnabledParameters) bool {
return true
}

func (p *processor) Shutdown(context.Context) error {
p.shutdownCalls++
return p.Err
Expand All @@ -66,8 +70,6 @@ type fltrProcessor struct {
params []EnabledParameters
}

var _ FilterProcessor = (*fltrProcessor)(nil)

func newFltrProcessor(name string, enabled bool) *fltrProcessor {
return &fltrProcessor{
processor: newProcessor(name),
Expand Down
5 changes: 5 additions & 0 deletions sdk/log/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ func (s *SimpleProcessor) OnEmit(ctx context.Context, r *Record) error {
return s.exporter.Export(ctx, *records)
}

// Enabled returns true, indicating this Processor will process all records.
func (*SimpleProcessor) Enabled(context.Context, EnabledParameters) bool {
return true
}

// Shutdown shuts down the exporter.
func (s *SimpleProcessor) Shutdown(ctx context.Context) error {
if s.exporter == nil {
Expand Down