diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index 0a48aed74dd..eb9e4c0d9ca 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -27,12 +27,14 @@ type periodicReaderConfig struct { interval time.Duration timeout time.Duration producers []Producer + context context.Context } // newPeriodicReaderConfig returns a periodicReaderConfig configured with // options. func newPeriodicReaderConfig(options []PeriodicReaderOption) periodicReaderConfig { c := periodicReaderConfig{ + context: context.Background(), interval: envDuration(envInterval, defaultInterval), timeout: envDuration(envTimeout, defaultTimeout), } @@ -94,6 +96,17 @@ func WithInterval(d time.Duration) PeriodicReaderOption { }) } +// WithContext allows setting a context to be used when calling the collector. +// If no context is set, the PeriodicReader will use a background context. +// This can be used to pass context to the called collectors, such as logging +// or tracing information. +func WithContext(ctx context.Context) PeriodicReaderOption { + return periodicReaderOptionFunc(func(conf periodicReaderConfig) periodicReaderConfig { + conf.context = ctx + return conf + }) +} + // NewPeriodicReader returns a Reader that collects and exports metric data to // the exporter at a defined interval. By default, the returned Reader will // collect and export data every 60 seconds, and will cancel any attempts that @@ -105,7 +118,7 @@ func WithInterval(d time.Duration) PeriodicReaderOption { // exporter. That is left to the user to accomplish. func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) *PeriodicReader { conf := newPeriodicReaderConfig(options) - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(conf.context) r := &PeriodicReader{ interval: conf.interval, timeout: conf.timeout, diff --git a/sdk/metric/periodic_reader_test.go b/sdk/metric/periodic_reader_test.go index be67b2b5ac0..74c3800b45f 100644 --- a/sdk/metric/periodic_reader_test.go +++ b/sdk/metric/periodic_reader_test.go @@ -280,7 +280,7 @@ func TestPeriodicReaderRun(t *testing.T) { }, } - r := NewPeriodicReader(exp, WithProducer(testExternalProducer{})) + r := NewPeriodicReader(exp, WithProducer(testExternalProducer{}), WithContext(t.Context())) r.register(testSDKProducer{}) trigger <- time.Now() assert.Equal(t, assert.AnError, <-eh.Err) @@ -308,7 +308,7 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("ForceFlush", func(t *testing.T) { exp, called := expFunc(t) - r := NewPeriodicReader(exp, WithProducer(testExternalProducer{})) + r := NewPeriodicReader(exp, WithProducer(testExternalProducer{}), WithContext(t.Context())) r.register(testSDKProducer{}) assert.Equal(t, assert.AnError, r.ForceFlush(context.Background()), "export error not returned") assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed") @@ -320,7 +320,7 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("ForceFlush timeout on producer", func(t *testing.T) { exp, called := expFunc(t) timeout := time.Millisecond - r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{})) + r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{}), WithContext(t.Context())) r.register(testSDKProducer{ produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error { select { @@ -343,7 +343,7 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("ForceFlush timeout on external producer", func(t *testing.T) { exp, called := expFunc(t) timeout := time.Millisecond - r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{ + r := NewPeriodicReader(exp, WithTimeout(timeout), WithContext(t.Context()), WithProducer(testExternalProducer{ produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) { select { case <-time.After(timeout + time.Second): @@ -364,7 +364,7 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("Shutdown", func(t *testing.T) { exp, called := expFunc(t) - r := NewPeriodicReader(exp, WithProducer(testExternalProducer{})) + r := NewPeriodicReader(exp, WithProducer(testExternalProducer{}), WithContext(t.Context())) r.register(testSDKProducer{}) assert.Equal(t, assert.AnError, r.Shutdown(context.Background()), "export error not returned") assert.True(t, *called, "exporter Export method not called, pending telemetry not flushed") @@ -373,7 +373,7 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("Shutdown timeout on producer", func(t *testing.T) { exp, called := expFunc(t) timeout := time.Millisecond - r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{})) + r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{}), WithContext(t.Context())) r.register(testSDKProducer{ produceFunc: func(ctx context.Context, rm *metricdata.ResourceMetrics) error { select { @@ -393,7 +393,7 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { t.Run("Shutdown timeout on external producer", func(t *testing.T) { exp, called := expFunc(t) timeout := time.Millisecond - r := NewPeriodicReader(exp, WithTimeout(timeout), WithProducer(testExternalProducer{ + r := NewPeriodicReader(exp, WithTimeout(timeout), WithContext(t.Context()), WithProducer(testExternalProducer{ produceFunc: func(ctx context.Context) ([]metricdata.ScopeMetrics, error) { select { case <-time.After(timeout + time.Second): @@ -412,7 +412,7 @@ func TestPeriodicReaderFlushesPending(t *testing.T) { func TestPeriodicReaderMultipleForceFlush(t *testing.T) { ctx := context.Background() - r := NewPeriodicReader(new(fnExporter), WithProducer(testExternalProducer{})) + r := NewPeriodicReader(new(fnExporter), WithContext(t.Context()), WithProducer(testExternalProducer{})) r.register(testSDKProducer{}) require.NoError(t, r.ForceFlush(ctx)) require.NoError(t, r.ForceFlush(ctx)) @@ -420,7 +420,7 @@ func TestPeriodicReaderMultipleForceFlush(t *testing.T) { } func BenchmarkPeriodicReader(b *testing.B) { - r := NewPeriodicReader(new(fnExporter)) + r := NewPeriodicReader(new(fnExporter), WithContext(b.Context())) b.Run("Collect", benchReaderCollectFunc(r)) require.NoError(b, r.Shutdown(context.Background())) } @@ -453,7 +453,7 @@ func TestPeriodicReaderTemporality(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { var undefinedInstrument InstrumentKind - rdr := NewPeriodicReader(tt.exporter) + rdr := NewPeriodicReader(tt.exporter, WithContext(t.Context())) assert.Equal(t, tt.wantTemporality.String(), rdr.temporality(undefinedInstrument).String()) }) } @@ -482,7 +482,7 @@ func TestPeriodicReaderCollect(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - rdr := NewPeriodicReader(new(fnExporter)) + rdr := NewPeriodicReader(new(fnExporter), WithContext(t.Context())) mp := NewMeterProvider(WithReader(rdr)) meter := mp.Meter("test")