Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploggrpc`. (#7353)
- Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc`. (#7459)
- Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp`. (#7486)
- Add experimental observability metrics in `go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp`. (#7493)

### Fixed

Expand Down
24 changes: 21 additions & 3 deletions exporters/otlp/otlpmetric/otlpmetrichttp/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"google.golang.org/protobuf/proto"

"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/counter"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/observ"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/oconf"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/retry"
)
Expand All @@ -33,6 +35,9 @@ type client struct {
compression Compression
requestFunc retry.RequestFunc
httpClient *http.Client

instID int64
inst *observ.Instrumentation
}

// Keep it in sync with golang's DefaultTransport from net/http! We
Expand Down Expand Up @@ -98,12 +103,18 @@ func newClient(cfg oconf.Config) (*client, error) {
}
req.Header.Set("Content-Type", "application/x-protobuf")

// Initialize the instrumentation
instID := counter.NextExporterID()
inst, err := observ.NewInstrumentation(instID, cfg.Metrics.Endpoint)

return &client{
compression: Compression(cfg.Metrics.Compression),
req: req,
requestFunc: cfg.RetryConfig.RequestFunc(evaluate),
httpClient: httpClient,
}, nil
instID: instID,
inst: inst,
}, err
}

// Shutdown shuts down the client, freeing all resources.
Expand Down Expand Up @@ -138,6 +149,12 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou
return err
}

var statusCode int
if c.inst != nil {
op := c.inst.ExportMetrics(ctx, len(pbRequest.ResourceMetrics))
defer func() { op.End(uploadErr, statusCode) }()
}

return errors.Join(uploadErr, c.requestFunc(ctx, func(iCtx context.Context) error {
select {
case <-iCtx.Done():
Expand All @@ -162,7 +179,8 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou
}()
}

if sc := resp.StatusCode; sc >= 200 && sc <= 299 {
statusCode = resp.StatusCode
if statusCode >= 200 && statusCode <= 299 {
// Success, do not retry.

// Read the partial success message, if any.
Expand Down Expand Up @@ -207,7 +225,7 @@ func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.Resou
}
bodyErr := fmt.Errorf("body: %s", respStr)

switch resp.StatusCode {
switch statusCode {
case http.StatusTooManyRequests,
http.StatusBadGateway,
http.StatusServiceUnavailable,
Expand Down
218 changes: 218 additions & 0 deletions exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,23 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
colmetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/counter"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/observ"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/oconf"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp/internal/otest"
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
"go.opentelemetry.io/otel/semconv/v1.37.0/otelconv"
)

type clientShim struct {
Expand Down Expand Up @@ -314,3 +325,210 @@ func TestConfig(t *testing.T) {
assert.NoError(t, exCtx.Err())
})
}

func TestClientInstrumentation(t *testing.T) {
// Enable instrumentation for this test.
t.Setenv("OTEL_GO_X_OBSERVABILITY", "true")

// Reset client ID to be deterministic.
const id = 0
counter.SetExporterID(id)

// Save original meter provider and restore at end of test.
orig := otel.GetMeterProvider()
t.Cleanup(func() { otel.SetMeterProvider(orig) })

// Create a new meter provider to capture metrics.
reader := metric.NewManualReader()
mp := metric.NewMeterProvider(metric.WithReader(reader))
otel.SetMeterProvider(mp)

const n, msg = 2, "partially successful"
rCh := make(chan otest.ExportResult, 1)
// Test partial success - return HTTP 200 with partial success info
rCh <- otest.ExportResult{
Response: &colmetricpb.ExportMetricsServiceResponse{
PartialSuccess: &colmetricpb.ExportMetricsPartialSuccess{
RejectedDataPoints: n,
ErrorMessage: msg,
},
},
}

coll, err := otest.NewHTTPCollector("", rCh)
require.NoError(t, err)
t.Cleanup(func() {
//nolint:usetesting // required to avoid getting a canceled context at cleanup.
require.NoError(t, coll.Shutdown(context.Background()))
})
t.Cleanup(func() { close(rCh) })

addr := coll.Addr().String()
opts := []Option{WithEndpoint(addr), WithInsecure()}
ctx := t.Context()
exp, err := New(ctx, opts...)
require.NoError(t, err)

// Export some test data
err = exp.Export(ctx, &metricdata.ResourceMetrics{
Resource: resource.NewWithAttributes(semconv.SchemaURL, attribute.String("service.name", "test")),
ScopeMetrics: []metricdata.ScopeMetrics{
{
Scope: instrumentation.Scope{Name: "test"},
Metrics: []metricdata.Metrics{
{
Name: "test-metric",
Data: metricdata.Gauge[int64]{DataPoints: []metricdata.DataPoint[int64]{{Value: 42}}},
},
},
},
},
})

// Should get partial success error
wantErr := internal.MetricPartialSuccessError(n, msg)
require.ErrorIs(t, err, wantErr, "Expected partial success error")

//nolint:usetesting // required to avoid getting a canceled context at cleanup.
require.NoError(t, exp.Shutdown(context.Background()))
var got metricdata.ResourceMetrics
require.NoError(t, reader.Collect(ctx, &got))

attrs := observ.BaseAttrs(id, addr)

want := metricdata.ScopeMetrics{
Scope: instrumentation.Scope{
Name: observ.ScopeName,
Version: observ.Version,
SchemaURL: observ.SchemaURL,
},
Metrics: []metricdata.Metrics{
{
Name: otelconv.SDKExporterMetricDataPointInflight{}.Name(),
Description: otelconv.SDKExporterMetricDataPointInflight{}.Description(),
Unit: otelconv.SDKExporterMetricDataPointInflight{}.Unit(),
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{Attributes: attribute.NewSet(attrs...)},
},
Temporality: metricdata.CumulativeTemporality,
},
},
{
Name: otelconv.SDKExporterMetricDataPointExported{}.Name(),
Description: otelconv.SDKExporterMetricDataPointExported{}.Description(),
Unit: otelconv.SDKExporterMetricDataPointExported{}.Unit(),
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{Attributes: attribute.NewSet(attrs...)},
{Attributes: attribute.NewSet(append(
attrs,
otelconv.SDKExporterMetricDataPointExported{}.AttrErrorType("*errors.joinError"),
)...)},
},
Temporality: 0x1,
IsMonotonic: true,
},
},
{
Name: otelconv.SDKExporterOperationDuration{}.Name(),
Description: otelconv.SDKExporterOperationDuration{}.Description(),
Unit: otelconv.SDKExporterOperationDuration{}.Unit(),
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{Attributes: attribute.NewSet(append(
attrs,
otelconv.SDKExporterOperationDuration{}.AttrErrorType("*errors.joinError"),
otelconv.SDKExporterOperationDuration{}.AttrHTTPResponseStatusCode(200),
)...)},
},
Temporality: 0x1,
},
},
},
}
require.Len(t, got.ScopeMetrics, 1)
opt := []metricdatatest.Option{
metricdatatest.IgnoreTimestamp(),
metricdatatest.IgnoreExemplars(),
metricdatatest.IgnoreValue(),
}
metricdatatest.AssertEqual(t, want, got.ScopeMetrics[0], opt...)
}

func BenchmarkExporterExportMetrics(b *testing.B) {
const n = 10

run := func(b *testing.B) {
coll, err := otest.NewHTTPCollector("", nil)
require.NoError(b, err)
b.Cleanup(func() {
//nolint:usetesting // required to avoid getting a canceled context at cleanup.
require.NoError(b, coll.Shutdown(context.Background()))
})

opts := []Option{WithEndpoint(coll.Addr().String()), WithInsecure()}
ctx := b.Context()
exp, err := New(ctx, opts...)
require.NoError(b, err)
b.Cleanup(func() {
//nolint:usetesting // required to avoid getting a canceled context at cleanup.
assert.NoError(b, exp.Shutdown(context.Background()))
})

// Generate realistic test metric data with multiple metrics.
now := time.Now()
rm := &metricdata.ResourceMetrics{
ScopeMetrics: []metricdata.ScopeMetrics{
{
Scope: instrumentation.Scope{
Name: "test",
Version: "v1.0.0",
},
Metrics: make([]metricdata.Metrics, n),
},
},
}

for i := range rm.ScopeMetrics[0].Metrics {
rm.ScopeMetrics[0].Metrics[i] = metricdata.Metrics{
Name: fmt.Sprintf("test_counter_%d", i),
Description: fmt.Sprintf("A test counter %d", i),
Unit: "1",
Data: metricdata.Sum[int64]{
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(
attribute.String("test", "value"),
attribute.Int("counter", i),
),
StartTime: now,
Time: now,
Value: int64(i * 10),
},
},
},
}
}

b.ReportAllocs()
b.ResetTimer()

for b.Loop() {
err = exp.Export(b.Context(), rm)
}
_ = err
}

b.Run("Observability", func(b *testing.B) {
b.Setenv("OTEL_GO_X_OBSERVABILITY", "true")
run(b)
})

b.Run("NoObservability", func(b *testing.B) {
b.Setenv("OTEL_GO_X_OBSERVABILITY", "false")
run(b)
})
}
4 changes: 2 additions & 2 deletions exporters/otlp/otlpmetric/otlpmetrichttp/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ retract v0.32.2 // Contains unresolvable dependencies.

require (
github.com/cenkalti/backoff/v5 v5.0.3
github.com/go-logr/logr v1.4.3
github.com/google/go-cmp v0.7.0
github.com/stretchr/testify v1.11.1
go.opentelemetry.io/otel v1.38.0
go.opentelemetry.io/otel/metric v1.38.0
go.opentelemetry.io/otel/sdk v1.38.0
go.opentelemetry.io/otel/sdk/metric v1.38.0
go.opentelemetry.io/proto/otlp v1.8.0
Expand All @@ -18,13 +20,11 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/auto/sdk v1.2.1 // indirect
go.opentelemetry.io/otel/metric v1.38.0 // indirect
go.opentelemetry.io/otel/trace v1.38.0 // indirect
golang.org/x/net v0.45.0 // indirect
golang.org/x/sys v0.37.0 // indirect
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading