From 08f5f8c019684688a9ae569e872e77c498f67fac Mon Sep 17 00:00:00 2001 From: Robert Wu Date: Sat, 11 Oct 2025 10:06:52 -0400 Subject: [PATCH 1/8] automatically generate inst files --- .../internal/counter/counter.go | 31 ++++++++ .../internal/counter/counter_test.go | 65 ++++++++++++++++ .../otlpmetric/otlpmetrichttp/internal/gen.go | 6 ++ .../otlpmetric/otlpmetrichttp/internal/x/x.go | 58 ++++++++++++++ .../otlpmetrichttp/internal/x/x_test.go | 75 +++++++++++++++++++ 5 files changed, 235 insertions(+) create mode 100644 exporters/otlp/otlpmetric/otlpmetrichttp/internal/counter/counter.go create mode 100644 exporters/otlp/otlpmetric/otlpmetrichttp/internal/counter/counter_test.go create mode 100644 exporters/otlp/otlpmetric/otlpmetrichttp/internal/x/x.go create mode 100644 exporters/otlp/otlpmetric/otlpmetrichttp/internal/x/x_test.go diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/internal/counter/counter.go b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/counter/counter.go new file mode 100644 index 00000000000..267cb03d923 --- /dev/null +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/counter/counter.go @@ -0,0 +1,31 @@ +// Code generated by gotmpl. DO NOT MODIFY. +// source: internal/shared/counter/counter.go.tmpl + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package counter provides a simple counter for generating unique IDs. +// +// This package is used to generate unique IDs while allowing testing packages +// to reset the counter. +package counter // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/counter" + +import "sync/atomic" + +// exporterN is a global 0-based count of the number of exporters created. +var exporterN atomic.Int64 + +// NextExporterID returns the next unique ID for an exporter. +func NextExporterID() int64 { + const inc = 1 + return exporterN.Add(inc) - inc +} + +// SetExporterID sets the exporter ID counter to v and returns the previous +// value. +// +// This function is useful for testing purposes, allowing you to reset the +// counter. It should not be used in production code. +func SetExporterID(v int64) int64 { + return exporterN.Swap(v) +} diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/internal/counter/counter_test.go b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/counter/counter_test.go new file mode 100644 index 00000000000..f3e380d3325 --- /dev/null +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/counter/counter_test.go @@ -0,0 +1,65 @@ +// Code generated by gotmpl. DO NOT MODIFY. +// source: internal/shared/counter/counter_test.go.tmpl + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package counter + +import ( + "sync" + "testing" +) + +func TestNextExporterID(t *testing.T) { + SetExporterID(0) + + var expected int64 + for range 10 { + id := NextExporterID() + if id != expected { + t.Errorf("NextExporterID() = %d; want %d", id, expected) + } + expected++ + } +} + +func TestSetExporterID(t *testing.T) { + SetExporterID(0) + + prev := SetExporterID(42) + if prev != 0 { + t.Errorf("SetExporterID(42) returned %d; want 0", prev) + } + + id := NextExporterID() + if id != 42 { + t.Errorf("NextExporterID() = %d; want 42", id) + } +} + +func TestNextExporterIDConcurrentSafe(t *testing.T) { + SetExporterID(0) + + const goroutines = 100 + const increments = 10 + + var wg sync.WaitGroup + wg.Add(goroutines) + + for range goroutines { + go func() { + defer wg.Done() + for range increments { + NextExporterID() + } + }() + } + + wg.Wait() + + expected := int64(goroutines * increments) + if id := NextExporterID(); id != expected { + t.Errorf("NextExporterID() = %d; want %d", id, expected) + } +} \ No newline at end of file diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/internal/gen.go b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/gen.go index a8b709268d1..02ffe47c65a 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/internal/gen.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/gen.go @@ -30,3 +30,9 @@ package internal // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/o //go:generate gotmpl --body=../../../../../internal/shared/otlp/otlpmetric/transform/error_test.go.tmpl "--data={}" --out=transform/error_test.go //go:generate gotmpl --body=../../../../../internal/shared/otlp/otlpmetric/transform/metricdata.go.tmpl "--data={}" --out=transform/metricdata.go //go:generate gotmpl --body=../../../../../internal/shared/otlp/otlpmetric/transform/metricdata_test.go.tmpl "--data={}" --out=transform/metricdata_test.go + +//go:generate gotmpl --body=../../../../../internal/shared/counter/counter.go.tmpl "--data={ \"pkg\": \"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/counter\" }" --out=counter/counter.go +//go:generate gotmpl --body=../../../../../internal/shared/counter/counter_test.go.tmpl "--data={}" --out=counter/counter_test.go + +//go:generate gotmpl --body=../../../../../internal/shared/x/x.go.tmpl "--data={ \"pkg\": \"go.opentelemetry.io/otel/exporters/otlp/otlpmetric\" }" --out=x/x.go +//go:generate gotmpl --body=../../../../../internal/shared/x/x_test.go.tmpl "--data={}" --out=x/x_test.go diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/internal/x/x.go b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/x/x.go new file mode 100644 index 00000000000..4cdadad6bc7 --- /dev/null +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/x/x.go @@ -0,0 +1,58 @@ +// Code generated by gotmpl. DO NOT MODIFY. +// source: internal/shared/x/x.go.tmpl + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package x documents experimental features for [go.opentelemetry.io/otel/exporters/otlp/otlpmetric]. +package x // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/x" + +import ( + "os" +) + +// Feature is an experimental feature control flag. It provides a uniform way +// to interact with these feature flags and parse their values. +type Feature[T any] struct { + keys []string + parse func(v string) (T, bool) +} + +func newFeature[T any](suffix []string, parse func(string) (T, bool)) Feature[T] { + const envKeyRoot = "OTEL_GO_X_" + keys := make([]string, 0, len(suffix)) + for _, s := range suffix { + keys = append(keys, envKeyRoot+s) + } + return Feature[T]{ + keys: keys, + parse: parse, + } +} + +// Keys returns the environment variable keys that can be set to enable the +// feature. +func (f Feature[T]) Keys() []string { return f.keys } + +// Lookup returns the user configured value for the feature and true if the +// user has enabled the feature. Otherwise, if the feature is not enabled, a +// zero-value and false are returned. +func (f Feature[T]) Lookup() (v T, ok bool) { + // https://github.com/open-telemetry/opentelemetry-specification/blob/62effed618589a0bec416a87e559c0a9d96289bb/specification/configuration/sdk-environment-variables.md#parsing-empty-value + // + // > The SDK MUST interpret an empty value of an environment variable the + // > same way as when the variable is unset. + for _, key := range f.keys { + vRaw := os.Getenv(key) + if vRaw != "" { + return f.parse(vRaw) + } + } + return v, ok +} + +// Enabled reports whether the feature is enabled. +func (f Feature[T]) Enabled() bool { + _, ok := f.Lookup() + return ok +} diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/internal/x/x_test.go b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/x/x_test.go new file mode 100644 index 00000000000..a715d7608a7 --- /dev/null +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/x/x_test.go @@ -0,0 +1,75 @@ +// Code generated by gotmpl. DO NOT MODIFY. +// source: internal/shared/x/x_text.go.tmpl + +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package x + +import ( + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const ( + mockKey = "OTEL_GO_X_MOCK_FEATURE" + mockKey2 = "OTEL_GO_X_MOCK_FEATURE2" +) + +var mockFeature = newFeature([]string{"MOCK_FEATURE", "MOCK_FEATURE2"}, func(v string) (string, bool) { + if strings.EqualFold(v, "true") { + return v, true + } + return "", false +}) + +func TestFeature(t *testing.T) { + require.Contains(t, mockFeature.Keys(), mockKey) + require.Contains(t, mockFeature.Keys(), mockKey2) + + t.Run("100", run(setenv(mockKey, "100"), assertDisabled(mockFeature))) + t.Run("true", run(setenv(mockKey, "true"), assertEnabled(mockFeature, "true"))) + t.Run("True", run(setenv(mockKey, "True"), assertEnabled(mockFeature, "True"))) + t.Run("false", run(setenv(mockKey, "false"), assertDisabled(mockFeature))) + t.Run("empty", run(assertDisabled(mockFeature))) +} + +func run(steps ...func(*testing.T)) func(*testing.T) { + return func(t *testing.T) { + t.Helper() + for _, step := range steps { + step(t) + } + } +} + +func setenv(k, v string) func(t *testing.T) { //nolint:unparam // This is a reusable test utility function. + return func(t *testing.T) { t.Setenv(k, v) } +} + +func assertEnabled[T any](f Feature[T], want T) func(*testing.T) { + return func(t *testing.T) { + t.Helper() + assert.True(t, f.Enabled(), "not enabled") + + v, ok := f.Lookup() + assert.True(t, ok, "Lookup state") + assert.Equal(t, want, v, "Lookup value") + } +} + +func assertDisabled[T any](f Feature[T]) func(*testing.T) { + var zero T + return func(t *testing.T) { + t.Helper() + + assert.False(t, f.Enabled(), "enabled") + + v, ok := f.Lookup() + assert.False(t, ok, "Lookup state") + assert.Equal(t, zero, v, "Lookup value") + } +} From 2f1342d930fabd1fdf363e37770c19dda0bc797c Mon Sep 17 00:00:00 2001 From: Robert Wu Date: Sat, 11 Oct 2025 12:53:34 -0400 Subject: [PATCH 2/8] add inst to client.go --- .../otlp/otlpmetric/otlpmetrichttp/client.go | 24 +- .../otlp/otlpmetric/otlpmetrichttp/go.mod | 2 +- .../internal/observ/instrumentation.go | 397 ++++++++++++++++++ .../otlpmetrichttp/internal/version.go | 8 + .../otlpmetrichttp/internal/x/observ.go | 22 + .../otlpmetrichttp/internal/x/observ_test.go | 21 + 6 files changed, 470 insertions(+), 4 deletions(-) create mode 100644 exporters/otlp/otlpmetric/otlpmetrichttp/internal/observ/instrumentation.go create mode 100644 exporters/otlp/otlpmetric/otlpmetrichttp/internal/version.go create mode 100644 exporters/otlp/otlpmetric/otlpmetrichttp/internal/x/observ.go create mode 100644 exporters/otlp/otlpmetric/otlpmetrichttp/internal/x/observ_test.go diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/client.go b/exporters/otlp/otlpmetric/otlpmetrichttp/client.go index 76381b56ee8..71dff79c16d 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/client.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/client.go @@ -23,6 +23,8 @@ import ( "google.golang.org/protobuf/proto" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/counter" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/observ" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/oconf" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/retry" ) @@ -33,6 +35,9 @@ type client struct { compression Compression requestFunc retry.RequestFunc httpClient *http.Client + + instID int64 + inst *observ.Instrumentation } // Keep it in sync with golang's DefaultTransport from net/http! We @@ -98,12 +103,18 @@ func newClient(cfg oconf.Config) (*client, error) { } req.Header.Set("Content-Type", "application/x-protobuf") + // Initialize the instrumentation + instID := counter.NextExporterID() + inst, err := observ.NewInstrumentation(instID, cfg.Metrics.Endpoint) + return &client{ compression: Compression(cfg.Metrics.Compression), req: req, requestFunc: cfg.RetryConfig.RequestFunc(evaluate), httpClient: httpClient, - }, nil + instID: instID, + inst: inst, + }, err } // Shutdown shuts down the client, freeing all resources. @@ -138,6 +149,12 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou return err } + var statusCode int + if c.inst != nil { + op := c.inst.ExportMetrics(ctx, len(pbRequest.ResourceMetrics)) + defer func() { op.End(uploadErr, statusCode) }() + } + return errors.Join(uploadErr, c.requestFunc(ctx, func(iCtx context.Context) error { select { case <-iCtx.Done(): @@ -162,7 +179,8 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou }() } - if sc := resp.StatusCode; sc >= 200 && sc <= 299 { + statusCode = resp.StatusCode + if statusCode >= 200 && statusCode <= 299 { // Success, do not retry. // Read the partial success message, if any. @@ -207,7 +225,7 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou } bodyErr := fmt.Errorf("body: %s", respStr) - switch resp.StatusCode { + switch statusCode { case http.StatusTooManyRequests, http.StatusBadGateway, http.StatusServiceUnavailable, diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod b/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod index 2a0b242084c..5e4e87a69f4 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod @@ -9,6 +9,7 @@ require ( github.com/google/go-cmp v0.7.0 github.com/stretchr/testify v1.11.1 go.opentelemetry.io/otel v1.38.0 + go.opentelemetry.io/otel/metric v1.38.0 go.opentelemetry.io/otel/sdk v1.38.0 go.opentelemetry.io/otel/sdk/metric v1.38.0 go.opentelemetry.io/proto/otlp v1.8.0 @@ -24,7 +25,6 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect - go.opentelemetry.io/otel/metric v1.38.0 // indirect go.opentelemetry.io/otel/trace v1.38.0 // indirect golang.org/x/net v0.45.0 // indirect golang.org/x/sys v0.37.0 // indirect diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/internal/observ/instrumentation.go b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/observ/instrumentation.go new file mode 100644 index 00000000000..c46630be7de --- /dev/null +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/observ/instrumentation.go @@ -0,0 +1,397 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package observ provides experimental observability instrumentation for the +// otlpmetrichttp exporter. +package observ // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/observ" + +import ( + "context" + "errors" + "fmt" + "net" + "net/http" + "net/netip" + "strconv" + "strings" + "sync" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/x" + "go.opentelemetry.io/otel/internal/global" + "go.opentelemetry.io/otel/metric" + semconv "go.opentelemetry.io/otel/semconv/v1.37.0" + "go.opentelemetry.io/otel/semconv/v1.37.0/otelconv" +) + +const ( + // ScopeName is the unique name of the meter used for instrumentation. + ScopeName = "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/observ" + + // SchemaURL is the schema URL of the metrics produced by this + // instrumentation. + SchemaURL = semconv.SchemaURL + + // Version is the current version of this instrumentation. + // + // This matches the version of the exporter. + Version = internal.Version +) + +var ( + measureAttrsPool = &sync.Pool{ + New: func() any { + const n = 1 + // component.name + 1 + // component.type + 1 + // server.addr + 1 + // server.port + 1 + // error.type + 1 // http.response.status_code + s := make([]attribute.KeyValue, 0, n) + // Return a pointer to a slice instead of a slice itself + // to avoid allocations on every call. + return &s + }, + } + + addOptPool = &sync.Pool{ + New: func() any { + const n = 1 // WithAttributeSet + o := make([]metric.AddOption, 0, n) + return &o + }, + } + + recordOptPool = &sync.Pool{ + New: func() any { + const n = 1 // WithAttributeSet + o := make([]metric.RecordOption, 0, n) + return &o + }, + } +) + +func get[T any](p *sync.Pool) *[]T { return p.Get().(*[]T) } + +func put[T any](p *sync.Pool, s *[]T) { + *s = (*s)[:0] // Reset. + p.Put(s) +} + +// ComponentName returns the component name for the exporter with the +// provided ID. +func ComponentName(id int64) string { + t := semconv.OTelComponentTypeOtlpHTTPMetricExporter.Value.AsString() + return fmt.Sprintf("%s/%d", t, id) +} + +// Instrumentation is experimental instrumentation for the exporter. +type Instrumentation struct { + inflightMetric metric.Int64UpDownCounter + exportedMetric metric.Int64Counter + opDuration metric.Float64Histogram + + attrs []attribute.KeyValue + addOpt metric.AddOption + recOpt metric.RecordOption +} + +// NewInstrumentation returns instrumentation for an OTLP over HTTP metric +// exporter with the provided ID and endpoint. It uses the global +// MeterProvider to create the instrumentation. +// +// The id should be the unique exporter instance ID. It is used +// to set the "component.name" attribute. +// +// The endpoint is the HTTP endpoint the exporter is exporting to. +// +// If the experimental observability is disabled, nil is returned. +func NewInstrumentation(id int64, endpoint string) (*Instrumentation, error) { + if !x.Observability.Enabled() { + return nil, nil + } + + attrs := BaseAttrs(id, endpoint) + i := &Instrumentation{ + attrs: attrs, + addOpt: metric.WithAttributeSet(attribute.NewSet(attrs...)), + + // Do not modify attrs (NewSet sorts in-place), make a new slice. + recOpt: metric.WithAttributeSet(attribute.NewSet(append( + // Default to OK status code (200). + []attribute.KeyValue{semconv.HTTPResponseStatusCode(http.StatusOK)}, + attrs..., + )...)), + } + + mp := otel.GetMeterProvider() + m := mp.Meter( + ScopeName, + metric.WithInstrumentationVersion(Version), + metric.WithSchemaURL(SchemaURL), + ) + + var err error + + inflightMetric, e := otelconv.NewSDKExporterMetricDataPointInflight(m) + if e != nil { + e = fmt.Errorf("failed to create inflight metric: %w", e) + err = errors.Join(err, e) + } + i.inflightMetric = inflightMetric.Inst() + + exportedMetric, e := otelconv.NewSDKExporterMetricDataPointExported(m) + if e != nil { + e = fmt.Errorf("failed to create exported metric: %w", e) + err = errors.Join(err, e) + } + i.exportedMetric = exportedMetric.Inst() + + opDuration, e := otelconv.NewSDKExporterOperationDuration(m) + if e != nil { + e = fmt.Errorf("failed to create operation duration metric: %w", e) + err = errors.Join(err, e) + } + i.opDuration = opDuration.Inst() + + return i, err +} + +// BaseAttrs returns the base attributes for the exporter with the provided ID +// and endpoint. +// +// The id should be the unique exporter instance ID. It is used +// to set the "component.name" attribute. +// +// The endpoint is the HTTP endpoint the exporter is exporting to. It should be +// in the format "host:port" or a full URL. +func BaseAttrs(id int64, endpoint string) []attribute.KeyValue { + host, port, err := parseEndpoint(endpoint) + if err != nil || (host == "" && port < 0) { + if err != nil { + global.Debug("failed to parse endpoint", "endpoint", endpoint, "error", err) + } + return []attribute.KeyValue{ + semconv.OTelComponentName(ComponentName(id)), + semconv.OTelComponentTypeOtlpHTTPMetricExporter, + } + } + + // Do not use append so the slice is exactly allocated. + + if port < 0 { + return []attribute.KeyValue{ + semconv.OTelComponentName(ComponentName(id)), + semconv.OTelComponentTypeOtlpHTTPMetricExporter, + semconv.ServerAddress(host), + } + } + + if host == "" { + return []attribute.KeyValue{ + semconv.OTelComponentName(ComponentName(id)), + semconv.OTelComponentTypeOtlpHTTPMetricExporter, + semconv.ServerPort(port), + } + } + + return []attribute.KeyValue{ + semconv.OTelComponentName(ComponentName(id)), + semconv.OTelComponentTypeOtlpHTTPMetricExporter, + semconv.ServerAddress(host), + semconv.ServerPort(port), + } +} + +// parseEndpoint parses the host and port from endpoint that has the form +// "host[:port]", or it returns an error if the endpoint is not parsable. +// +// If no port is specified, -1 is returned. +// +// If no host is specified, an empty string is returned. +func parseEndpoint(endpoint string) (string, int, error) { + // First check if the endpoint is just an IP address. + if ip := parseIP(endpoint); ip != "" { + return ip, -1, nil + } + + // If there's no colon, there is no port (IPv6 with no port checked above). + if !strings.Contains(endpoint, ":") { + return endpoint, -1, nil + } + + // Otherwise, parse as host:port. + host, portStr, err := net.SplitHostPort(endpoint) + if err != nil { + return "", -1, fmt.Errorf("invalid host:port %q: %w", endpoint, err) + } + + const base, bitSize = 10, 16 + port16, err := strconv.ParseUint(portStr, base, bitSize) + if err != nil { + return "", -1, fmt.Errorf("invalid port %q: %w", portStr, err) + } + port := int(port16) // port is guaranteed to be in the range [0, 65535]. + + return host, port, nil +} + +// parseIP attempts to parse the entire endpoint as an IP address. +// It returns the normalized string form of the IP if successful, +// or an empty string if parsing fails. +func parseIP(ip string) string { + // Strip leading and trailing brackets for IPv6 addresses. + if len(ip) >= 2 && ip[0] == '[' && ip[len(ip)-1] == ']' { + ip = ip[1 : len(ip)-1] + } + addr, err := netip.ParseAddr(ip) + if err != nil { + return "" + } + // Return the normalized string form of the IP. + return addr.String() +} + +// ExportMetrics instruments the UploadMetrics method of the client. It returns an +// [ExportOp] that must have its [ExportOp.End] method called when the +// operation end. +func (i *Instrumentation) ExportMetrics(ctx context.Context, nMetrics int) ExportOp { + start := time.Now() + + addOpt := get[metric.AddOption](addOptPool) + defer put(addOptPool, addOpt) + *addOpt = append(*addOpt, i.addOpt) + i.inflightMetric.Add(ctx, int64(nMetrics), *addOpt...) + + return ExportOp{ + ctx: ctx, + start: start, + nMetrics: int64(nMetrics), + inst: i, + } +} + +// ExportOp tracks the export operation being observed by +// [Instrumentation.ExportMetrics]. +type ExportOp struct { + ctx context.Context + start time.Time + nMetrics int64 + + inst *Instrumentation +} + +// End completes the observation of the operation being observed by a call to +// [Instrumentation.ExportMetrics]. +// +// Any error that is encountered is provided as err. +// The HTTP status code from the response is provided as status. +// +// If err is not nil, all metrics will be recorded as failures unless error is of +// type [internal.PartialSuccess]. In the case of a PartialSuccess, the number +// of successfully exported metrics will be determined by inspecting the +// RejectedItems field of the PartialSuccess. +func (e ExportOp) End(err error, status int) { + addOpt := get[metric.AddOption](addOptPool) + defer put(addOptPool, addOpt) + *addOpt = append(*addOpt, e.inst.addOpt) + + e.inst.inflightMetric.Add(e.ctx, -e.nMetrics, *addOpt...) + + success := successful(e.nMetrics, err) + // Record successfully exported metrics, even if the value is 0 which are + // meaningful to distribution aggregations. + e.inst.exportedMetric.Add(e.ctx, success, *addOpt...) + + if err != nil { + attrs := get[attribute.KeyValue](measureAttrsPool) + defer put(measureAttrsPool, attrs) + *attrs = append(*attrs, e.inst.attrs...) + *attrs = append(*attrs, semconv.ErrorType(err)) + + // Do not inefficiently make a copy of attrs by using + // WithAttributes instead of WithAttributeSet. + o := metric.WithAttributeSet(attribute.NewSet(*attrs...)) + // Reset addOpt with new attribute set. + *addOpt = append((*addOpt)[:0], o) + + e.inst.exportedMetric.Add(e.ctx, e.nMetrics-success, *addOpt...) + } + + recOpt := get[metric.RecordOption](recordOptPool) + defer put(recordOptPool, recOpt) + *recOpt = append(*recOpt, e.inst.recordOption(err, status)) + + d := time.Since(e.start).Seconds() + e.inst.opDuration.Record(e.ctx, d, *recOpt...) +} + +// recordOption returns a RecordOption with attributes representing the +// outcome of the operation being recorded. +// +// If err is nil and status is 200, the default recOpt of the +// Instrumentation is returned. +// +// Otherwise, a new RecordOption is returned with the base attributes of the +// Instrumentation plus the http.response.status_code attribute set to the +// provided status, and if err is not nil, the error.type attribute set +// to the type of the error. +func (i *Instrumentation) recordOption(err error, status int) metric.RecordOption { + if err == nil && status == http.StatusOK { + return i.recOpt + } + + attrs := get[attribute.KeyValue](measureAttrsPool) + defer put(measureAttrsPool, attrs) + *attrs = append(*attrs, i.attrs...) + + *attrs = append(*attrs, semconv.HTTPResponseStatusCode(status)) + if err != nil { + *attrs = append(*attrs, semconv.ErrorType(err)) + } + + // Do not inefficiently make a copy of attrs by using WithAttributes + // instead of WithAttributeSet. + return metric.WithAttributeSet(attribute.NewSet(*attrs...)) +} + +// successful returns the number of successfully exported metrics out of the n +// that were exported based on the provided error. +// +// If err is nil, n is returned. All metrics were successfully exported. +// +// If err is not nil and not an [internal.PartialSuccess] error, 0 is returned. +// It is assumed all metrics failed to be exported. +// +// If err is an [internal.PartialSuccess] error, the number of successfully +// exported metrics is computed by subtracting the RejectedItems field from n. If +// RejectedItems is negative, n is returned. If RejectedItems is greater than +// n, 0 is returned. +func successful(n int64, err error) int64 { + if err == nil { + return n // All metrics successfully exported. + } + // Split rejection calculation so successful is inlinable. + return n - rejected(n, err) +} + +var errPartialPool = &sync.Pool{ + New: func() any { return new(internal.PartialSuccess) }, +} + +// rejected returns how many out of the n metrics exporter were rejected based on +// the provided non-nil err. +func rejected(n int64, err error) int64 { + ps := errPartialPool.Get().(*internal.PartialSuccess) + defer errPartialPool.Put(ps) + // Check for partial success. + if errors.As(err, ps) { + // Bound RejectedItems to [0, n]. This should not be needed, + // but be defensive as this is from an external source. + return min(max(ps.RejectedItems, 0), n) + } + return n // All metrics rejected. +} diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/internal/version.go b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/version.go new file mode 100644 index 00000000000..d7b0d396025 --- /dev/null +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/version.go @@ -0,0 +1,8 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal" + +// Version is the current release version of the OpenTelemetry OTLP HTTP metric +// exporter in use. +const Version = "1.38.0" diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/internal/x/observ.go b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/x/observ.go new file mode 100644 index 00000000000..296946a3b7b --- /dev/null +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/x/observ.go @@ -0,0 +1,22 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package x // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/x" + +import "strings" + +// Observability is an experimental feature flag that determines if exporter +// observability metrics are enabled. +// +// To enable this feature set the OTEL_GO_X_OBSERVABILITY environment variable +// to the case-insensitive string value of "true" (i.e. "True" and "TRUE" +// will also enable this). +var Observability = newFeature( + []string{"OBSERVABILITY"}, + func(v string) (string, bool) { + if strings.EqualFold(v, "true") { + return v, true + } + return "", false + }, +) diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/internal/x/observ_test.go b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/x/observ_test.go new file mode 100644 index 00000000000..a8d3fb06ed4 --- /dev/null +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/x/observ_test.go @@ -0,0 +1,21 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package x + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestObservability(t *testing.T) { + const key = "OTEL_GO_X_OBSERVABILITY" + require.Contains(t, Observability.Keys(), key) + + t.Run("100", run(setenv(key, "100"), assertDisabled(Observability))) + t.Run("true", run(setenv(key, "true"), assertEnabled(Observability, "true"))) + t.Run("True", run(setenv(key, "True"), assertEnabled(Observability, "True"))) + t.Run("false", run(setenv(key, "false"), assertDisabled(Observability))) + t.Run("empty", run(assertDisabled(Observability))) +} From eb26b0b05e2559be7aa6276963ed8a7b8491c719 Mon Sep 17 00:00:00 2001 From: Robert Wu Date: Sat, 11 Oct 2025 16:26:52 -0400 Subject: [PATCH 3/8] add unit test and benchmark test --- .../otlpmetric/otlpmetrichttp/client_test.go | 210 ++++++++++++++++++ 1 file changed, 210 insertions(+) diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go b/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go index 65d7f4ccf3c..3975ff35743 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go @@ -16,12 +16,23 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" mpb "go.opentelemetry.io/proto/otlp/metrics/v1" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/counter" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/observ" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/oconf" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/otest" + "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + "go.opentelemetry.io/otel/sdk/resource" + semconv "go.opentelemetry.io/otel/semconv/v1.37.0" + "go.opentelemetry.io/otel/semconv/v1.37.0/otelconv" ) type clientShim struct { @@ -314,3 +325,202 @@ func TestConfig(t *testing.T) { assert.NoError(t, exCtx.Err()) }) } + +func TestClientInstrumentation(t *testing.T) { + // Enable instrumentation for this test. + t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + + // Reset client ID to be deterministic. + const id = 0 + counter.SetExporterID(id) + + // Save original meter provider and restore at end of test. + orig := otel.GetMeterProvider() + t.Cleanup(func() { otel.SetMeterProvider(orig) }) + + // Create a new meter provider to capture metrics. + reader := metric.NewManualReader() + mp := metric.NewMeterProvider(metric.WithReader(reader)) + otel.SetMeterProvider(mp) + + const n, msg = 2, "partially successful" + rCh := make(chan otest.ExportResult, 1) + // Test partial success - return HTTP 200 with partial success info + rCh <- otest.ExportResult{ + Response: &colmetricpb.ExportMetricsServiceResponse{ + PartialSuccess: &colmetricpb.ExportMetricsPartialSuccess{ + RejectedDataPoints: n, + ErrorMessage: msg, + }, + }, + } + + coll, err := otest.NewHTTPCollector("", rCh) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, coll.Shutdown(t.Context())) }) + t.Cleanup(func() { close(rCh) }) + + addr := coll.Addr().String() + opts := []Option{WithEndpoint(addr), WithInsecure()} + ctx := t.Context() + exp, err := New(ctx, opts...) + require.NoError(t, err) + + // Export some test data + err = exp.Export(ctx, &metricdata.ResourceMetrics{ + Resource: resource.NewWithAttributes(semconv.SchemaURL, attribute.String("service.name", "test")), + ScopeMetrics: []metricdata.ScopeMetrics{ + { + Scope: instrumentation.Scope{Name: "test"}, + Metrics: []metricdata.Metrics{ + { + Name: "test-metric", + Data: metricdata.Gauge[int64]{DataPoints: []metricdata.DataPoint[int64]{{Value: 42}}}, + }, + }, + }, + }, + }) + + // Should get partial success error + wantErr := internal.MetricPartialSuccessError(n, msg) + require.ErrorIs(t, err, wantErr, "Expected partial success error") + + require.NoError(t, exp.Shutdown(ctx)) + var got metricdata.ResourceMetrics + require.NoError(t, reader.Collect(ctx, &got)) + + attrs := observ.BaseAttrs(id, addr) + + want := metricdata.ScopeMetrics{ + Scope: instrumentation.Scope{ + Name: observ.ScopeName, + Version: observ.Version, + SchemaURL: observ.SchemaURL, + }, + Metrics: []metricdata.Metrics{ + { + Name: otelconv.SDKExporterMetricDataPointInflight{}.Name(), + Description: otelconv.SDKExporterMetricDataPointInflight{}.Description(), + Unit: otelconv.SDKExporterMetricDataPointInflight{}.Unit(), + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: attribute.NewSet(attrs...)}, + }, + Temporality: metricdata.CumulativeTemporality, + }, + }, + { + Name: otelconv.SDKExporterMetricDataPointExported{}.Name(), + Description: otelconv.SDKExporterMetricDataPointExported{}.Description(), + Unit: otelconv.SDKExporterMetricDataPointExported{}.Unit(), + Data: metricdata.Sum[int64]{ + DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: attribute.NewSet(attrs...)}, + {Attributes: attribute.NewSet(append( + attrs, + otelconv.SDKExporterMetricDataPointExported{}.AttrErrorType("*errors.joinError"), + )...)}, + }, + Temporality: 0x1, + IsMonotonic: true, + }, + }, + { + Name: otelconv.SDKExporterOperationDuration{}.Name(), + Description: otelconv.SDKExporterOperationDuration{}.Description(), + Unit: otelconv.SDKExporterOperationDuration{}.Unit(), + Data: metricdata.Histogram[float64]{ + DataPoints: []metricdata.HistogramDataPoint[float64]{ + {Attributes: attribute.NewSet(append( + attrs, + otelconv.SDKExporterOperationDuration{}.AttrErrorType("*errors.joinError"), + otelconv.SDKExporterOperationDuration{}.AttrHTTPResponseStatusCode(200), + )...)}, + }, + Temporality: 0x1, + }, + }, + }, + } + require.Len(t, got.ScopeMetrics, 1) + opt := []metricdatatest.Option{ + metricdatatest.IgnoreTimestamp(), + metricdatatest.IgnoreExemplars(), + metricdatatest.IgnoreValue(), + } + metricdatatest.AssertEqual(t, want, got.ScopeMetrics[0], opt...) +} + +func BenchmarkExporterExportMetrics(b *testing.B) { + const n = 10 + + run := func(b *testing.B) { + coll, err := otest.NewHTTPCollector("", nil) + require.NoError(b, err) + b.Cleanup(func() { require.NoError(b, coll.Shutdown(b.Context())) }) + + opts := []Option{WithEndpoint(coll.Addr().String()), WithInsecure()} + ctx := b.Context() + exp, err := New(ctx, opts...) + require.NoError(b, err) + b.Cleanup(func() { + assert.NoError(b, exp.Shutdown(b.Context())) + }) + + // Generate realistic test metric data with multiple metrics. + now := time.Now() + rm := &metricdata.ResourceMetrics{ + ScopeMetrics: []metricdata.ScopeMetrics{ + { + Scope: instrumentation.Scope{ + Name: "test", + Version: "v1.0.0", + }, + Metrics: make([]metricdata.Metrics, n), + }, + }, + } + + for i := range rm.ScopeMetrics[0].Metrics { + rm.ScopeMetrics[0].Metrics[i] = metricdata.Metrics{ + Name: fmt.Sprintf("test_counter_%d", i), + Description: fmt.Sprintf("A test counter %d", i), + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("test", "value"), + attribute.Int("counter", i), + ), + StartTime: now, + Time: now, + Value: int64(i * 10), + }, + }, + }, + } + } + + b.ReportAllocs() + b.ResetTimer() + + for b.Loop() { + err = exp.Export(b.Context(), rm) + } + _ = err + } + + b.Run("Observability", func(b *testing.B) { + b.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + run(b) + }) + + b.Run("NoObservability", func(b *testing.B) { + b.Setenv("OTEL_GO_X_OBSERVABILITY", "false") + run(b) + }) +} From 00dcf9d7f2f98c8c1482d65659810451762453a6 Mon Sep 17 00:00:00 2001 From: Robert Wu Date: Sat, 11 Oct 2025 16:36:02 -0400 Subject: [PATCH 4/8] update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9903d131a56..a29a4d48087 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc`. (#7353) - Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc`. (#7459) - Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#7486) +- Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#7493) ### Fixed From da9fe4df47d600bba5b768a298000bfc02efbcb4 Mon Sep 17 00:00:00 2001 From: Robert Wu Date: Sat, 11 Oct 2025 16:49:21 -0400 Subject: [PATCH 5/8] add tests for instrumentation.go --- .../otlp/otlpmetric/otlpmetrichttp/go.mod | 2 +- .../internal/observ/instrumentation_test.go | 394 ++++++++++++++++++ 2 files changed, 395 insertions(+), 1 deletion(-) create mode 100644 exporters/otlp/otlpmetric/otlpmetrichttp/internal/observ/instrumentation_test.go diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod b/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod index 5e4e87a69f4..c9216e5276e 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod @@ -6,6 +6,7 @@ retract v0.32.2 // Contains unresolvable dependencies. require ( github.com/cenkalti/backoff/v5 v5.0.3 + github.com/go-logr/logr v1.4.3 github.com/google/go-cmp v0.7.0 github.com/stretchr/testify v1.11.1 go.opentelemetry.io/otel v1.38.0 @@ -19,7 +20,6 @@ require ( require ( github.com/davecgh/go-spew v1.1.1 // indirect - github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/google/uuid v1.6.0 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 // indirect diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/internal/observ/instrumentation_test.go b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/observ/instrumentation_test.go new file mode 100644 index 00000000000..c274d01f046 --- /dev/null +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/observ/instrumentation_test.go @@ -0,0 +1,394 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package observ_test + +import ( + "errors" + "net/http" + "strconv" + "testing" + + "github.com/go-logr/logr" + "github.com/go-logr/logr/testr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/observ" + "go.opentelemetry.io/otel/internal/global" + mapi "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/sdk/instrumentation" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + semconv "go.opentelemetry.io/otel/semconv/v1.37.0" + "go.opentelemetry.io/otel/semconv/v1.37.0/otelconv" +) + +const ( + ID = 0 + ServerAddr = "localhost" + ServerPort = 4318 +) + +var Endpoint = ServerAddr + ":" + strconv.Itoa(ServerPort) + +var Scope = instrumentation.Scope{ + Name: observ.ScopeName, + Version: observ.Version, + SchemaURL: observ.SchemaURL, +} + +type errMeterProvider struct { + mapi.MeterProvider + + err error +} + +func (m *errMeterProvider) Meter(string, ...mapi.MeterOption) mapi.Meter { + return &errMeter{err: m.err} +} + +type errMeter struct { + mapi.Meter + + err error +} + +func (m *errMeter) Int64UpDownCounter(string, ...mapi.Int64UpDownCounterOption) (mapi.Int64UpDownCounter, error) { + return nil, m.err +} + +func (m *errMeter) Int64Counter(string, ...mapi.Int64CounterOption) (mapi.Int64Counter, error) { + return nil, m.err +} + +func (m *errMeter) Float64Histogram(string, ...mapi.Float64HistogramOption) (mapi.Float64Histogram, error) { + return nil, m.err +} + +func TestNewInstrumentationObservabilityErrors(t *testing.T) { + orig := otel.GetMeterProvider() + t.Cleanup(func() { otel.SetMeterProvider(orig) }) + mp := &errMeterProvider{err: assert.AnError} + otel.SetMeterProvider(mp) + + t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + + _, err := observ.NewInstrumentation(ID, Endpoint) + require.ErrorIs(t, err, assert.AnError, "new instrument errors") + + assert.ErrorContains(t, err, "inflight metric") + assert.ErrorContains(t, err, "exported metric") + assert.ErrorContains(t, err, "operation duration metric") +} + +func TestNewInstrumentationObservabilityDisabled(t *testing.T) { + // Do not set OTEL_GO_X_OBSERVABILITY. + got, err := observ.NewInstrumentation(ID, Endpoint) + assert.NoError(t, err) + assert.Nil(t, got) +} + +func setup(t *testing.T) (*observ.Instrumentation, func() metricdata.ScopeMetrics) { + t.Helper() + + t.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + + original := otel.GetMeterProvider() + t.Cleanup(func() { otel.SetMeterProvider(original) }) + + r := metric.NewManualReader() + mp := metric.NewMeterProvider(metric.WithReader(r)) + otel.SetMeterProvider(mp) + + inst, err := observ.NewInstrumentation(ID, Endpoint) + require.NoError(t, err) + require.NotNil(t, inst) + + return inst, func() metricdata.ScopeMetrics { + var rm metricdata.ResourceMetrics + require.NoError(t, r.Collect(t.Context(), &rm)) + + require.Len(t, rm.ScopeMetrics, 1) + return rm.ScopeMetrics[0] + } +} + +func baseAttrs(err error) []attribute.KeyValue { + attrs := []attribute.KeyValue{ + semconv.OTelComponentName(observ.ComponentName(ID)), + semconv.OTelComponentTypeOtlpHTTPMetricExporter, + semconv.ServerAddress(ServerAddr), + semconv.ServerPort(ServerPort), + } + if err != nil { + attrs = append(attrs, semconv.ErrorType(err)) + } + return attrs +} + +func set(err error) attribute.Set { + return attribute.NewSet(baseAttrs(err)...) +} + +func metricInflight() metricdata.Metrics { + return metricdata.Metrics{ + Name: otelconv.SDKExporterMetricDataPointInflight{}.Name(), + Description: otelconv.SDKExporterMetricDataPointInflight{}.Description(), + Unit: otelconv.SDKExporterMetricDataPointInflight{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.DataPoint[int64]{ + {Attributes: set(nil), Value: 0}, + }, + }, + } +} + +func metricExported(success, total int64, err error) metricdata.Metrics { + dp := []metricdata.DataPoint[int64]{ + {Attributes: set(nil), Value: success}, + } + if err != nil { + dp = append(dp, metricdata.DataPoint[int64]{ + Attributes: set(err), + Value: total - success, + }) + } + return metricdata.Metrics{ + Name: otelconv.SDKExporterMetricDataPointExported{}.Name(), + Description: otelconv.SDKExporterMetricDataPointExported{}.Description(), + Unit: otelconv.SDKExporterMetricDataPointExported{}.Unit(), + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dp, + }, + } +} + +func operationDuration(err error, statusCode int) metricdata.Metrics { + httpSet := func(err error, statusCode int) attribute.Set { + attrs := baseAttrs(err) + attrs = append(attrs, semconv.HTTPResponseStatusCode(statusCode)) + return attribute.NewSet(attrs...) + } + return metricdata.Metrics{ + Name: otelconv.SDKExporterOperationDuration{}.Name(), + Description: otelconv.SDKExporterOperationDuration{}.Description(), + Unit: otelconv.SDKExporterOperationDuration{}.Unit(), + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: []metricdata.HistogramDataPoint[float64]{ + {Attributes: httpSet(err, statusCode)}, + }, + }, + } +} + +func assertMetrics(t *testing.T, got metricdata.ScopeMetrics, metrics, success int64, err error, statusCode int) { + t.Helper() + + assert.Equal(t, Scope, got.Scope, "unexpected scope") + + m := got.Metrics + require.Len(t, m, 3, "expected 3 metrics") + + o := metricdatatest.IgnoreTimestamp() + want := metricInflight() + metricdatatest.AssertEqual(t, want, m[0], o) + + want = metricExported(success, metrics, err) + metricdatatest.AssertEqual(t, want, m[1], o) + + want = operationDuration(err, statusCode) + metricdatatest.AssertEqual(t, want, m[2], o, metricdatatest.IgnoreValue()) +} + +func TestInstrumentationExportMetrics(t *testing.T) { + inst, collect := setup(t) + + const n = 10 + inst.ExportMetrics(t.Context(), n).End(nil, http.StatusOK) + + assertMetrics(t, collect(), n, n, nil, http.StatusOK) +} + +func TestInstrumentationExportMetricsAllErrored(t *testing.T) { + inst, collect := setup(t) + + const n = 10 + err := errors.New("http error") + inst.ExportMetrics(t.Context(), n).End(err, http.StatusInternalServerError) + + const success = 0 + assertMetrics(t, collect(), n, success, err, http.StatusInternalServerError) +} + +func TestInstrumentationExportMetricsPartialErrored(t *testing.T) { + inst, collect := setup(t) + + const n = 10 + const success = n - 5 + + err := errors.New("partial failure") + err = errors.Join(err, &internal.PartialSuccess{RejectedItems: 5}) + inst.ExportMetrics(t.Context(), n).End(err, http.StatusOK) + + assertMetrics(t, collect(), n, success, err, http.StatusOK) +} + +func TestInstrumentationExportMetricsInvalidPartialErrored(t *testing.T) { + inst, collect := setup(t) + + const n = 10 + pErr := &internal.PartialSuccess{RejectedItems: -5} + err := errors.Join(errors.New("temporary"), pErr) + inst.ExportMetrics(t.Context(), n).End(err, http.StatusServiceUnavailable) + + // Round -5 to 0. + success := int64(n) // (n - 0) + assertMetrics(t, collect(), n, success, err, http.StatusServiceUnavailable) + + // Note: the metrics are cumulative, so account for the previous + // ExportMetrics call. + pErr.RejectedItems = n + 5 + inst.ExportMetrics(t.Context(), n).End(err, http.StatusServiceUnavailable) + + // Round n+5 to n. + success += 0 // success + (n - n) + assertMetrics(t, collect(), n+n, success, err, http.StatusServiceUnavailable) +} + +func TestBaseAttrs(t *testing.T) { + tests := []struct { + endpoint string + host string + port int + }{ + // Empty. + {endpoint: "", host: "", port: -1}, + + // Only a port. + {endpoint: ":4318", host: "", port: 4318}, + + // Hostname. + {endpoint: "localhost:4318", host: "localhost", port: 4318}, + {endpoint: "localhost", host: "localhost", port: -1}, + + // IPv4 address. + {endpoint: "127.0.0.1:4318", host: "127.0.0.1", port: 4318}, + {endpoint: "127.0.0.1", host: "127.0.0.1", port: -1}, + + // IPv6 address. + {endpoint: "2001:0db8:85a3:0000:0000:8a2e:0370:7334", host: "2001:db8:85a3::8a2e:370:7334", port: -1}, + {endpoint: "2001:db8:85a3:0:0:8a2e:370:7334", host: "2001:db8:85a3::8a2e:370:7334", port: -1}, + {endpoint: "2001:db8:85a3::8a2e:370:7334", host: "2001:db8:85a3::8a2e:370:7334", port: -1}, + {endpoint: "[2001:db8:85a3::8a2e:370:7334]", host: "2001:db8:85a3::8a2e:370:7334", port: -1}, + {endpoint: "[::1]:9090", host: "::1", port: 9090}, + + // Port edge cases. + {endpoint: "example.com:0", host: "example.com", port: 0}, + {endpoint: "example.com:65535", host: "example.com", port: 65535}, + + // Case insensitive. + {endpoint: "ExAmPlE.COM:8080", host: "ExAmPlE.COM", port: 8080}, + } + for _, tt := range tests { + got := observ.BaseAttrs(ID, tt.endpoint) + want := []attribute.KeyValue{ + semconv.OTelComponentName(observ.ComponentName(ID)), + semconv.OTelComponentTypeOtlpHTTPMetricExporter, + } + + if tt.host != "" { + want = append(want, semconv.ServerAddress(tt.host)) + } + if tt.port != -1 { + want = append(want, semconv.ServerPort(tt.port)) + } + assert.Equal(t, want, got) + } +} + +type logSink struct { + logr.LogSink + + level int + msg string + keysAndValues []any +} + +func (*logSink) Enabled(int) bool { return true } + +func (l *logSink) Info(level int, msg string, keysAndValues ...any) { + l.level, l.msg, l.keysAndValues = level, msg, keysAndValues + l.LogSink.Info(level, msg, keysAndValues...) +} + +func TestBaseAttrsError(t *testing.T) { + endpoints := []string{ + "example.com:invalid", // Non-numeric port. + "example.com:8080:9090", // Multiple colons in port. + "example.com:99999", // Port out of range. + "example.com:-1", // Port out of range. + } + for _, endpoint := range endpoints { + l := &logSink{LogSink: testr.New(t).GetSink()} + t.Cleanup(func(orig logr.Logger) func() { + global.SetLogger(logr.New(l)) + return func() { global.SetLogger(orig) } + }(global.GetLogger())) + + // Set the logger as global so BaseAttrs can log the error. + got := observ.BaseAttrs(ID, endpoint) + want := []attribute.KeyValue{ + semconv.OTelComponentName(observ.ComponentName(ID)), + semconv.OTelComponentTypeOtlpHTTPMetricExporter, + } + assert.Equal(t, want, got) + + assert.Equal(t, 8, l.level, "expected Debug log level") + assert.Equal(t, "failed to parse endpoint", l.msg) + } +} + +func BenchmarkInstrumentationExportMetrics(b *testing.B) { + setup := func(b *testing.B) *observ.Instrumentation { + b.Helper() + b.Setenv("OTEL_GO_X_OBSERVABILITY", "true") + + original := otel.GetMeterProvider() + b.Cleanup(func() { otel.SetMeterProvider(original) }) + + r := metric.NewManualReader() + mp := metric.NewMeterProvider(metric.WithReader(r)) + otel.SetMeterProvider(mp) + + inst, err := observ.NewInstrumentation(ID, Endpoint) + if err != nil { + b.Fatalf("failed to create instrumentation: %v", err) + } + return inst + } + + run := func(err error, statusCode int) func(*testing.B) { + return func(b *testing.B) { + inst := setup(b) + b.ReportAllocs() + b.ResetTimer() + for b.Loop() { + inst.ExportMetrics(b.Context(), 10).End(err, statusCode) + } + } + } + + b.Run("NoError", run(nil, http.StatusOK)) + err := &internal.PartialSuccess{RejectedItems: 6} + b.Run("PartialError", run(err, http.StatusOK)) + b.Run("FullError", run(assert.AnError, http.StatusInternalServerError)) +} From 47a0cf084fa2eaf19e5265ae522172b8fa2a5159 Mon Sep 17 00:00:00 2001 From: Robert Wu Date: Sat, 11 Oct 2025 17:42:26 -0400 Subject: [PATCH 6/8] fix benchmark testing with nolint --- .../otlpmetric/otlpmetrichttp/client_test.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go b/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go index 3975ff35743..7dfa4cc1c5e 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go @@ -357,7 +357,10 @@ func TestClientInstrumentation(t *testing.T) { coll, err := otest.NewHTTPCollector("", rCh) require.NoError(t, err) - t.Cleanup(func() { require.NoError(t, coll.Shutdown(t.Context())) }) + t.Cleanup(func() { + //nolint:usetesting // required to avoid getting a canceled context at cleanup. + require.NoError(t, coll.Shutdown(context.Background())) + }) t.Cleanup(func() { close(rCh) }) addr := coll.Addr().String() @@ -386,7 +389,8 @@ func TestClientInstrumentation(t *testing.T) { wantErr := internal.MetricPartialSuccessError(n, msg) require.ErrorIs(t, err, wantErr, "Expected partial success error") - require.NoError(t, exp.Shutdown(ctx)) + //nolint:usetesting // required to avoid getting a canceled context at cleanup. + require.NoError(t, exp.Shutdown(context.Background())) var got metricdata.ResourceMetrics require.NoError(t, reader.Collect(ctx, &got)) @@ -458,14 +462,18 @@ func BenchmarkExporterExportMetrics(b *testing.B) { run := func(b *testing.B) { coll, err := otest.NewHTTPCollector("", nil) require.NoError(b, err) - b.Cleanup(func() { require.NoError(b, coll.Shutdown(b.Context())) }) + b.Cleanup(func() { + //nolint:usetesting // required to avoid getting a canceled context at cleanup. + require.NoError(b, coll.Shutdown(context.Background())) + }) opts := []Option{WithEndpoint(coll.Addr().String()), WithInsecure()} ctx := b.Context() exp, err := New(ctx, opts...) require.NoError(b, err) b.Cleanup(func() { - assert.NoError(b, exp.Shutdown(b.Context())) + //nolint:usetesting // required to avoid getting a canceled context at cleanup. + assert.NoError(b, exp.Shutdown(context.Background())) }) // Generate realistic test metric data with multiple metrics. From f3057bc16c977af1b26d44339d2c91118b3756fa Mon Sep 17 00:00:00 2001 From: Robert Wu Date: Sat, 11 Oct 2025 22:55:24 -0400 Subject: [PATCH 7/8] address pr comments --- .../otlp/otlpmetric/otlpmetrichttp/client.go | 9 ++--- .../otlpmetrichttp/internal/x/README.md | 36 +++++++++++++++++++ versions.yaml | 3 ++ 3 files changed, 42 insertions(+), 6 deletions(-) create mode 100644 exporters/otlp/otlpmetric/otlpmetrichttp/internal/x/README.md diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/client.go b/exporters/otlp/otlpmetric/otlpmetrichttp/client.go index 71dff79c16d..2b98d687f4e 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/client.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/client.go @@ -36,8 +36,7 @@ type client struct { requestFunc retry.RequestFunc httpClient *http.Client - instID int64 - inst *observ.Instrumentation + inst *observ.Instrumentation } // Keep it in sync with golang's DefaultTransport from net/http! We @@ -103,16 +102,14 @@ func newClient(cfg oconf.Config) (*client, error) { } req.Header.Set("Content-Type", "application/x-protobuf") - // Initialize the instrumentation - instID := counter.NextExporterID() - inst, err := observ.NewInstrumentation(instID, cfg.Metrics.Endpoint) + // Initialize the instrumentation. + inst, err := observ.NewInstrumentation(counter.NextExporterID(), cfg.Metrics.Endpoint) return &client{ compression: Compression(cfg.Metrics.Compression), req: req, requestFunc: cfg.RetryConfig.RequestFunc(evaluate), httpClient: httpClient, - instID: instID, inst: inst, }, err } diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/internal/x/README.md b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/x/README.md new file mode 100644 index 00000000000..13360dcb919 --- /dev/null +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/x/README.md @@ -0,0 +1,36 @@ +# Experimental Features + +The `otlpmetrichttp` exporter contains features that have not yet stabilized in the OpenTelemetry specification. +These features are added to the `otlpmetrichttp` exporter prior to stabilization in the specification so that users can start experimenting with them and provide feedback. + +These features may change in backwards incompatible ways as feedback is applied. +See the [Compatibility and Stability](#compatibility-and-stability) section for more information. + +## Features + +- [Observability](#observability) + +### Observability + +The `otlpmetrichttp` exporter can be configured to provide observability about itself using OpenTelemetry metrics. + +To opt-in, set the environment variable `OTEL_GO_X_OBSERVABILITY` to `true`. + +When enabled, the SDK will create the following metrics using the global `MeterProvider`: + +- `otel.sdk.exporter.metric_data_point.inflight` +- `otel.sdk.exporter.metric_data_point.exported` +- `otel.sdk.exporter.operation.duration` + +Please see the [Semantic conventions for OpenTelemetry SDK metrics] documentation for more details on these metrics. + +[Semantic conventions for OpenTelemetry SDK metrics]: https://github.com/open-telemetry/semantic-conventions/blob/v1.36.0/docs/otel/sdk-metrics.md + +## Compatibility and Stability + +Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../../../../VERSIONING.md). +These features may be removed or modified in successive version releases, including patch versions. + +When an experimental feature is promoted to a stable feature, a migration path will be included in the changelog entry of the release. +There is no guarantee that any environment variable feature flags that enabled the experimental feature will be supported by the stable version. +If they are supported, they may be accompanied with a deprecation notice stating a timeline for the removal of that support. diff --git a/versions.yaml b/versions.yaml index f0ec3c8a570..5dbca111d36 100644 --- a/versions.yaml +++ b/versions.yaml @@ -58,3 +58,6 @@ modules: go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp: version-refs: - ./exporters/otlp/otlptrace/otlptracehttp/internal/version.go + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp: + version-refs: + - ./exporters/otlp/otlpmetric/otlpmetrichttp/internal/version.go From 6eab2f3478b9d22424d2d9c854de7b3f19b8c5dd Mon Sep 17 00:00:00 2001 From: Robert Wu Date: Sat, 11 Oct 2025 23:21:41 -0400 Subject: [PATCH 8/8] fix link address in README.md --- exporters/otlp/otlpmetric/otlpmetrichttp/internal/x/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/internal/x/README.md b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/x/README.md index 13360dcb919..30ab7105900 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/internal/x/README.md +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/internal/x/README.md @@ -28,7 +28,7 @@ Please see the [Semantic conventions for OpenTelemetry SDK metrics] documentatio ## Compatibility and Stability -Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../../../../VERSIONING.md). +Experimental features do not fall within the scope of the OpenTelemetry Go versioning and stability [policy](../../../../../../VERSIONING.md). These features may be removed or modified in successive version releases, including patch versions. When an experimental feature is promoted to a stable feature, a migration path will be included in the changelog entry of the release.