diff --git a/.chloggen/otlpreceiver-profile-metrics.yaml b/.chloggen/otlpreceiver-profile-metrics.yaml new file mode 100644 index 00000000000..f429ff57311 --- /dev/null +++ b/.chloggen/otlpreceiver-profile-metrics.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/otlp) +component: receiver/otlp + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add metrics tracking the number of receiver, refused and failed profile samples + +# One or more tracking issues or pull requests related to the change +issues: [14226] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/.chloggen/receiverhelper-profile-metrics.yaml b/.chloggen/receiverhelper-profile-metrics.yaml new file mode 100644 index 00000000000..b6ee683bf7f --- /dev/null +++ b/.chloggen/receiverhelper-profile-metrics.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/otlp) +component: pkg/receiverhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for profile samples metrics + +# One or more tracking issues or pull requests related to the change +issues: [14226] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/receiver/otlpreceiver/go.mod b/receiver/otlpreceiver/go.mod index 696c872aff9..3941ecadf98 100644 --- a/receiver/otlpreceiver/go.mod +++ b/receiver/otlpreceiver/go.mod @@ -74,6 +74,7 @@ require ( go.opentelemetry.io/collector/extension/extensionmiddleware v0.140.0 // indirect go.opentelemetry.io/collector/featuregate v1.46.0 // indirect go.opentelemetry.io/collector/pipeline v1.46.0 // indirect + go.opentelemetry.io/collector/pipeline/xpipeline v0.140.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.63.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.63.0 // indirect go.opentelemetry.io/otel/metric v1.38.0 // indirect @@ -166,3 +167,5 @@ replace go.opentelemetry.io/collector/config/configmiddleware => ../../config/co replace go.opentelemetry.io/collector/extension/extensionmiddleware/extensionmiddlewaretest => ../../extension/extensionmiddleware/extensionmiddlewaretest replace go.opentelemetry.io/collector/internal/testutil => ../../internal/testutil + +replace go.opentelemetry.io/collector/pipeline/xpipeline => ../../pipeline/xpipeline diff --git a/receiver/otlpreceiver/internal/profiles/otlp.go b/receiver/otlpreceiver/internal/profiles/otlp.go index 269769b6824..d111618e6ad 100644 --- a/receiver/otlpreceiver/internal/profiles/otlp.go +++ b/receiver/otlpreceiver/internal/profiles/otlp.go @@ -9,18 +9,23 @@ import ( "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/pdata/pprofile/pprofileotlp" "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/errors" + "go.opentelemetry.io/collector/receiver/receiverhelper" ) +const dataFormatProtobuf = "protobuf" + // Receiver is the type used to handle spans from OpenTelemetry exporters. type Receiver struct { pprofileotlp.UnimplementedGRPCServer nextConsumer xconsumer.Profiles + obsreport *receiverhelper.ObsReport } // New creates a new Receiver reference. -func New(nextConsumer xconsumer.Profiles) *Receiver { +func New(nextConsumer xconsumer.Profiles, obsreport *receiverhelper.ObsReport) *Receiver { return &Receiver{ nextConsumer: nextConsumer, + obsreport: obsreport, } } @@ -28,12 +33,15 @@ func New(nextConsumer xconsumer.Profiles) *Receiver { func (r *Receiver) Export(ctx context.Context, req pprofileotlp.ExportRequest) (pprofileotlp.ExportResponse, error) { td := req.Profiles() // We need to ensure that it propagates the receiver name as a tag - numProfiles := td.SampleCount() - if numProfiles == 0 { + numSamples := td.SampleCount() + if numSamples == 0 { return pprofileotlp.NewExportResponse(), nil } + ctx = r.obsreport.StartTracesOp(ctx) err := r.nextConsumer.ConsumeProfiles(ctx, td) + r.obsreport.EndTracesOp(ctx, dataFormatProtobuf, numSamples, err) + // Use appropriate status codes for permanent/non-permanent errors // If we return the error straightaway, then the grpc implementation will set status code to Unknown // Refer: https://github.com/grpc/grpc-go/blob/v1.59.0/server.go#L1345 diff --git a/receiver/otlpreceiver/internal/profiles/otlp_test.go b/receiver/otlpreceiver/internal/profiles/otlp_test.go index 1a70049944d..204c3cd981d 100644 --- a/receiver/otlpreceiver/internal/profiles/otlp_test.go +++ b/receiver/otlpreceiver/internal/profiles/otlp_test.go @@ -16,11 +16,15 @@ import ( "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/pdata/pprofile/pprofileotlp" "go.opentelemetry.io/collector/pdata/testdata" + "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/metadata" + "go.opentelemetry.io/collector/receiver/receiverhelper" + "go.opentelemetry.io/collector/receiver/receivertest" ) func TestExport(t *testing.T) { @@ -86,7 +90,16 @@ func otlpReceiverOnGRPCServer(t *testing.T, tc xconsumer.Profiles) net.Addr { require.NoError(t, ln.Close()) }) - r := New(tc) + set := receivertest.NewNopSettings(metadata.Type) + set.ID = component.MustNewIDWithName("otlp", "profiles") + obsreport, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ + ReceiverID: set.ID, + Transport: "grpc", + ReceiverCreateSettings: set, + }) + require.NoError(t, err) + + r := New(tc, obsreport) // Now run it as a gRPC server srv := grpc.NewServer() pprofileotlp.RegisterGRPCServer(srv, r) diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index b18d5323d4c..7481b5ec002 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -110,7 +110,7 @@ func (r *otlpReceiver) startGRPCServer(ctx context.Context, host component.Host) } if r.nextProfiles != nil { - pprofileotlp.RegisterGRPCServer(r.serverGRPC, profiles.New(r.nextProfiles)) + pprofileotlp.RegisterGRPCServer(r.serverGRPC, profiles.New(r.nextProfiles, r.obsrepGRPC)) } var gln net.Listener @@ -160,7 +160,7 @@ func (r *otlpReceiver) startHTTPServer(ctx context.Context, host component.Host) } if r.nextProfiles != nil { - httpProfilesReceiver := profiles.New(r.nextProfiles) + httpProfilesReceiver := profiles.New(r.nextProfiles, r.obsrepHTTP) httpMux.HandleFunc(defaultProfilesURLPath, func(resp http.ResponseWriter, req *http.Request) { handleProfiles(resp, req, httpProfilesReceiver) }) diff --git a/receiver/receiverhelper/documentation.md b/receiver/receiverhelper/documentation.md index e25eb5b9219..0a839bf28cd 100644 --- a/receiver/receiverhelper/documentation.md +++ b/receiver/receiverhelper/documentation.md @@ -22,6 +22,14 @@ Number of metric points successfully pushed into the pipeline. [Alpha] | ---- | ----------- | ---------- | --------- | --------- | | {datapoints} | Sum | Int | true | Alpha | +### otelcol_receiver_accepted_profile_samples + +Number of profile samples successfully pushed into the pipeline. [Alpha] + +| Unit | Metric Type | Value Type | Monotonic | Stability | +| ---- | ----------- | ---------- | --------- | --------- | +| {samples} | Sum | Int | true | Alpha | + ### otelcol_receiver_accepted_spans Number of spans successfully pushed into the pipeline. [Alpha] @@ -46,6 +54,14 @@ The number of metric points that failed to be processed by the receiver due to i | ---- | ----------- | ---------- | --------- | --------- | | {datapoints} | Sum | Int | true | Alpha | +### otelcol_receiver_failed_profile_samples + +The number of profile samples that failed to be processed by the receiver due to internal errors. [Alpha] + +| Unit | Metric Type | Value Type | Monotonic | Stability | +| ---- | ----------- | ---------- | --------- | --------- | +| {samples} | Sum | Int | true | Alpha | + ### otelcol_receiver_failed_spans The number of spans that failed to be processed by the receiver due to internal errors. [Alpha] @@ -70,6 +86,14 @@ Number of metric points that could not be pushed into the pipeline. [Alpha] | ---- | ----------- | ---------- | --------- | --------- | | {datapoints} | Sum | Int | true | Alpha | +### otelcol_receiver_refused_profile_samples + +Number of profile samples that could not be pushed into the pipeline. [Alpha] + +| Unit | Metric Type | Value Type | Monotonic | Stability | +| ---- | ----------- | ---------- | --------- | --------- | +| {samples} | Sum | Int | true | Alpha | + ### otelcol_receiver_refused_spans Number of spans that could not be pushed into the pipeline. [Alpha] diff --git a/receiver/receiverhelper/go.mod b/receiver/receiverhelper/go.mod index 9beaa74d63b..3ec873fda2d 100644 --- a/receiver/receiverhelper/go.mod +++ b/receiver/receiverhelper/go.mod @@ -9,6 +9,7 @@ require ( go.opentelemetry.io/collector/consumer/consumererror v0.140.0 go.opentelemetry.io/collector/featuregate v1.46.0 go.opentelemetry.io/collector/pipeline v1.46.0 + go.opentelemetry.io/collector/pipeline/xpipeline v0.140.0 go.opentelemetry.io/collector/receiver v1.46.0 go.opentelemetry.io/otel v1.38.0 go.opentelemetry.io/otel/metric v1.38.0 @@ -66,3 +67,5 @@ replace go.opentelemetry.io/collector/featuregate => ../../featuregate replace go.opentelemetry.io/collector/consumer/consumererror => ../../consumer/consumererror replace go.opentelemetry.io/collector/internal/testutil => ../../internal/testutil + +replace go.opentelemetry.io/collector/pipeline/xpipeline => ../../pipeline/xpipeline diff --git a/receiver/receiverhelper/internal/metadata/generated_telemetry.go b/receiver/receiverhelper/internal/metadata/generated_telemetry.go index 4409c14891f..1feff12bc62 100644 --- a/receiver/receiverhelper/internal/metadata/generated_telemetry.go +++ b/receiver/receiverhelper/internal/metadata/generated_telemetry.go @@ -23,19 +23,22 @@ func Tracer(settings component.TelemetrySettings) trace.Tracer { // TelemetryBuilder provides an interface for components to report telemetry // as defined in metadata and user config. type TelemetryBuilder struct { - meter metric.Meter - mu sync.Mutex - registrations []metric.Registration - ReceiverAcceptedLogRecords metric.Int64Counter - ReceiverAcceptedMetricPoints metric.Int64Counter - ReceiverAcceptedSpans metric.Int64Counter - ReceiverFailedLogRecords metric.Int64Counter - ReceiverFailedMetricPoints metric.Int64Counter - ReceiverFailedSpans metric.Int64Counter - ReceiverRefusedLogRecords metric.Int64Counter - ReceiverRefusedMetricPoints metric.Int64Counter - ReceiverRefusedSpans metric.Int64Counter - ReceiverRequests metric.Int64Counter + meter metric.Meter + mu sync.Mutex + registrations []metric.Registration + ReceiverAcceptedLogRecords metric.Int64Counter + ReceiverAcceptedMetricPoints metric.Int64Counter + ReceiverAcceptedProfileSamples metric.Int64Counter + ReceiverAcceptedSpans metric.Int64Counter + ReceiverFailedLogRecords metric.Int64Counter + ReceiverFailedMetricPoints metric.Int64Counter + ReceiverFailedProfileSamples metric.Int64Counter + ReceiverFailedSpans metric.Int64Counter + ReceiverRefusedLogRecords metric.Int64Counter + ReceiverRefusedMetricPoints metric.Int64Counter + ReceiverRefusedProfileSamples metric.Int64Counter + ReceiverRefusedSpans metric.Int64Counter + ReceiverRequests metric.Int64Counter } // TelemetryBuilderOption applies changes to default builder. @@ -79,6 +82,12 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme metric.WithUnit("{datapoints}"), ) errs = errors.Join(errs, err) + builder.ReceiverAcceptedProfileSamples, err = builder.meter.Int64Counter( + "otelcol_receiver_accepted_profile_samples", + metric.WithDescription("Number of profile samples successfully pushed into the pipeline. [Alpha]"), + metric.WithUnit("{samples}"), + ) + errs = errors.Join(errs, err) builder.ReceiverAcceptedSpans, err = builder.meter.Int64Counter( "otelcol_receiver_accepted_spans", metric.WithDescription("Number of spans successfully pushed into the pipeline. [Alpha]"), @@ -97,6 +106,12 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme metric.WithUnit("{datapoints}"), ) errs = errors.Join(errs, err) + builder.ReceiverFailedProfileSamples, err = builder.meter.Int64Counter( + "otelcol_receiver_failed_profile_samples", + metric.WithDescription("The number of profile samples that failed to be processed by the receiver due to internal errors. [Alpha]"), + metric.WithUnit("{samples}"), + ) + errs = errors.Join(errs, err) builder.ReceiverFailedSpans, err = builder.meter.Int64Counter( "otelcol_receiver_failed_spans", metric.WithDescription("The number of spans that failed to be processed by the receiver due to internal errors. [Alpha]"), @@ -115,6 +130,12 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme metric.WithUnit("{datapoints}"), ) errs = errors.Join(errs, err) + builder.ReceiverRefusedProfileSamples, err = builder.meter.Int64Counter( + "otelcol_receiver_refused_profile_samples", + metric.WithDescription("Number of profile samples that could not be pushed into the pipeline. [Alpha]"), + metric.WithUnit("{samples}"), + ) + errs = errors.Join(errs, err) builder.ReceiverRefusedSpans, err = builder.meter.Int64Counter( "otelcol_receiver_refused_spans", metric.WithDescription("Number of spans that could not be pushed into the pipeline. [Alpha]"), diff --git a/receiver/receiverhelper/internal/metadatatest/generated_telemetrytest.go b/receiver/receiverhelper/internal/metadatatest/generated_telemetrytest.go index d2ad658b53a..00818ffa016 100644 --- a/receiver/receiverhelper/internal/metadatatest/generated_telemetrytest.go +++ b/receiver/receiverhelper/internal/metadatatest/generated_telemetrytest.go @@ -44,6 +44,22 @@ func AssertEqualReceiverAcceptedMetricPoints(t *testing.T, tt *componenttest.Tel metricdatatest.AssertEqual(t, want, got, opts...) } +func AssertEqualReceiverAcceptedProfileSamples(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_receiver_accepted_profile_samples", + Description: "Number of profile samples successfully pushed into the pipeline. [Alpha]", + Unit: "{samples}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_receiver_accepted_profile_samples") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + func AssertEqualReceiverAcceptedSpans(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { want := metricdata.Metrics{ Name: "otelcol_receiver_accepted_spans", @@ -92,6 +108,22 @@ func AssertEqualReceiverFailedMetricPoints(t *testing.T, tt *componenttest.Telem metricdatatest.AssertEqual(t, want, got, opts...) } +func AssertEqualReceiverFailedProfileSamples(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_receiver_failed_profile_samples", + Description: "The number of profile samples that failed to be processed by the receiver due to internal errors. [Alpha]", + Unit: "{samples}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_receiver_failed_profile_samples") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + func AssertEqualReceiverFailedSpans(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { want := metricdata.Metrics{ Name: "otelcol_receiver_failed_spans", @@ -140,6 +172,22 @@ func AssertEqualReceiverRefusedMetricPoints(t *testing.T, tt *componenttest.Tele metricdatatest.AssertEqual(t, want, got, opts...) } +func AssertEqualReceiverRefusedProfileSamples(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_receiver_refused_profile_samples", + Description: "Number of profile samples that could not be pushed into the pipeline. [Alpha]", + Unit: "{samples}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_receiver_refused_profile_samples") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + func AssertEqualReceiverRefusedSpans(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { want := metricdata.Metrics{ Name: "otelcol_receiver_refused_spans", diff --git a/receiver/receiverhelper/internal/metadatatest/generated_telemetrytest_test.go b/receiver/receiverhelper/internal/metadatatest/generated_telemetrytest_test.go index e34c4eceb60..1726f12f62e 100644 --- a/receiver/receiverhelper/internal/metadatatest/generated_telemetrytest_test.go +++ b/receiver/receiverhelper/internal/metadatatest/generated_telemetrytest_test.go @@ -21,12 +21,15 @@ func TestSetupTelemetry(t *testing.T) { defer tb.Shutdown() tb.ReceiverAcceptedLogRecords.Add(context.Background(), 1) tb.ReceiverAcceptedMetricPoints.Add(context.Background(), 1) + tb.ReceiverAcceptedProfileSamples.Add(context.Background(), 1) tb.ReceiverAcceptedSpans.Add(context.Background(), 1) tb.ReceiverFailedLogRecords.Add(context.Background(), 1) tb.ReceiverFailedMetricPoints.Add(context.Background(), 1) + tb.ReceiverFailedProfileSamples.Add(context.Background(), 1) tb.ReceiverFailedSpans.Add(context.Background(), 1) tb.ReceiverRefusedLogRecords.Add(context.Background(), 1) tb.ReceiverRefusedMetricPoints.Add(context.Background(), 1) + tb.ReceiverRefusedProfileSamples.Add(context.Background(), 1) tb.ReceiverRefusedSpans.Add(context.Background(), 1) tb.ReceiverRequests.Add(context.Background(), 1) AssertEqualReceiverAcceptedLogRecords(t, testTel, @@ -35,6 +38,9 @@ func TestSetupTelemetry(t *testing.T) { AssertEqualReceiverAcceptedMetricPoints(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) + AssertEqualReceiverAcceptedProfileSamples(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) AssertEqualReceiverAcceptedSpans(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) @@ -44,6 +50,9 @@ func TestSetupTelemetry(t *testing.T) { AssertEqualReceiverFailedMetricPoints(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) + AssertEqualReceiverFailedProfileSamples(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) AssertEqualReceiverFailedSpans(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) @@ -53,6 +62,9 @@ func TestSetupTelemetry(t *testing.T) { AssertEqualReceiverRefusedMetricPoints(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) + AssertEqualReceiverRefusedProfileSamples(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) AssertEqualReceiverRefusedSpans(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) diff --git a/receiver/receiverhelper/internal/obsmetrics.go b/receiver/receiverhelper/internal/obsmetrics.go index c44aa2df47a..79d335d5cdc 100644 --- a/receiver/receiverhelper/internal/obsmetrics.go +++ b/receiver/receiverhelper/internal/obsmetrics.go @@ -38,7 +38,15 @@ const ( // FailedLogRecordsKey used to identify log records failed to be processed by the Collector. FailedLogRecordsKey = "failed_log_records" + // AcceptedProfileSamplesKey used to identify profile samples accepted by the Collector. + AcceptedProfileSamplesKey = "accepted_profile_samples" + // RefusedProfileSamplesKey used to identify profile samples refused (ie.: not ingested) by the Collector. + RefusedProfileSamplesKey = "refused_profile_samples" + // FailedProfileSamplesKey used to identify profile samples failed to be processed by the Collector. + FailedProfileSamplesKey = "failed_profile_samples" + ReceiveTraceDataOperationSuffix = SpanNameSep + "TraceDataReceived" ReceiverMetricsOperationSuffix = SpanNameSep + "MetricsReceived" ReceiverLogsOperationSuffix = SpanNameSep + "LogsReceived" + ReceiverProfilesOperationSuffix = SpanNameSep + "ProfilesReceived" ) diff --git a/receiver/receiverhelper/metadata.yaml b/receiver/receiverhelper/metadata.yaml index e5854f996b5..6696d5b68a0 100644 --- a/receiver/receiverhelper/metadata.yaml +++ b/receiver/receiverhelper/metadata.yaml @@ -25,6 +25,15 @@ telemetry: sum: value_type: int monotonic: true + receiver_accepted_profile_samples: + enabled: true + stability: + level: alpha + description: Number of profile samples successfully pushed into the pipeline. + unit: "{samples}" + sum: + value_type: int + monotonic: true receiver_accepted_spans: enabled: true stability: @@ -52,6 +61,15 @@ telemetry: sum: value_type: int monotonic: true + receiver_failed_profile_samples: + enabled: true + stability: + level: alpha + description: The number of profile samples that failed to be processed by the receiver due to internal errors. + unit: "{samples}" + sum: + value_type: int + monotonic: true receiver_failed_spans: enabled: true stability: @@ -79,6 +97,15 @@ telemetry: sum: value_type: int monotonic: true + receiver_refused_profile_samples: + enabled: true + stability: + level: alpha + description: Number of profile samples that could not be pushed into the pipeline. + unit: "{samples}" + sum: + value_type: int + monotonic: true receiver_refused_spans: enabled: true stability: diff --git a/receiver/receiverhelper/obsreport.go b/receiver/receiverhelper/obsreport.go index eb7fa68043a..ece4d7efac0 100644 --- a/receiver/receiverhelper/obsreport.go +++ b/receiver/receiverhelper/obsreport.go @@ -16,6 +16,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/pipeline" + "go.opentelemetry.io/collector/pipeline/xpipeline" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper/internal" "go.opentelemetry.io/collector/receiver/receiverhelper/internal/metadata" @@ -126,6 +127,24 @@ func (rec *ObsReport) EndMetricsOp( rec.endOp(receiverCtx, format, numReceivedPoints, err, pipeline.SignalMetrics) } +// StartProfilesOp is called when a request is received from a client. +// The returned context should be used in other calls to the obsreport functions +// dealing with the same receive operation. +func (rec *ObsReport) StartProfilesOp(operationCtx context.Context) context.Context { + return rec.startOp(operationCtx, internal.ReceiverProfilesOperationSuffix) +} + +// EndProfilesOp completes the receive operation that was started with +// StartProfilesOp. +func (rec *ObsReport) EndProfilesOp( + receiverCtx context.Context, + format string, + numReceivedProfileSamples int, + err error, +) { + rec.endOp(receiverCtx, format, numReceivedProfileSamples, err, xpipeline.SignalProfiles) +} + // startOp creates the span used to trace the operation. Returning // the updated context with the created span. func (rec *ObsReport) startOp(receiverCtx context.Context, operationSuffix string) context.Context { @@ -211,6 +230,10 @@ func (rec *ObsReport) endOp( acceptedItemsKey = internal.AcceptedLogRecordsKey refusedItemsKey = internal.RefusedLogRecordsKey failedItemsKey = internal.FailedLogRecordsKey + case xpipeline.SignalProfiles: + acceptedItemsKey = internal.AcceptedProfileSamplesKey + refusedItemsKey = internal.RefusedProfileSamplesKey + failedItemsKey = internal.FailedProfileSamplesKey } span.SetAttributes( @@ -241,6 +264,10 @@ func (rec *ObsReport) recordMetrics(receiverCtx context.Context, signal pipeline acceptedMeasure = rec.telemetryBuilder.ReceiverAcceptedLogRecords refusedMeasure = rec.telemetryBuilder.ReceiverRefusedLogRecords failedMeasure = rec.telemetryBuilder.ReceiverFailedLogRecords + case xpipeline.SignalProfiles: + acceptedMeasure = rec.telemetryBuilder.ReceiverAcceptedProfileSamples + refusedMeasure = rec.telemetryBuilder.ReceiverRefusedProfileSamples + failedMeasure = rec.telemetryBuilder.ReceiverFailedProfileSamples } acceptedMeasure.Add(receiverCtx, int64(numAccepted), rec.otelAttrs) diff --git a/receiver/receiverhelper/obsreport_test.go b/receiver/receiverhelper/obsreport_test.go index 47a99981c98..95b2e3e3678 100644 --- a/receiver/receiverhelper/obsreport_test.go +++ b/receiver/receiverhelper/obsreport_test.go @@ -409,6 +409,128 @@ func TestReceiveMetricsOp(t *testing.T) { } } +func TestReceiveProfilesOp(t *testing.T) { + originalState := NewReceiverMetricsGate.IsEnabled() + t.Cleanup(func() { + require.NoError(t, featuregate.GlobalRegistry().Set(NewReceiverMetricsGate.ID(), originalState)) + }) + + for _, tc := range []struct { + name string + enabled bool + }{{"gate_enabled", true}, {"gate_disabled", false}} { + t.Run(tc.name, func(t *testing.T) { + require.NoError(t, featuregate.GlobalRegistry().Set(NewReceiverMetricsGate.ID(), tc.enabled)) + testTelemetry(t, func(t *testing.T, tt *componenttest.Telemetry) { + parentCtx, parentSpan := tt.NewTelemetrySettings().TracerProvider.Tracer("test").Start(context.Background(), t.Name()) + defer parentSpan.End() + + params := []testParams{ + {items: 13, err: consumererror.NewDownstream(errFake)}, + {items: 42, err: nil}, + {items: 7, err: errors.New("non-downstream error")}, + } + for i, param := range params { + rec, err := newReceiver(ObsReportSettings{ + ReceiverID: receiverID, + Transport: transport, + ReceiverCreateSettings: receiver.Settings{ID: receiverID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + }) + require.NoError(t, err) + ctx := rec.StartProfilesOp(parentCtx) + assert.NotNil(t, ctx) + rec.EndProfilesOp(ctx, format, params[i].items, param.err) + } + + spans := tt.SpanRecorder.Ended() + require.Len(t, spans, len(params)) + + var acceptedSamples, refusedSamples, failedSamples int + for i, span := range spans { + assert.Equal(t, "receiver/"+receiverID.String()+"/ProfilesReceived", span.Name()) + err := params[i].err + if err == nil { + acceptedSamples += params[i].items + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.AcceptedProfileSamplesKey, Value: attribute.Int64Value(int64(params[i].items))}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.RefusedProfileSamplesKey, Value: attribute.Int64Value(0)}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.FailedProfileSamplesKey, Value: attribute.Int64Value(0)}) + assert.Equal(t, codes.Unset, span.Status().Code) + } else { + isDownstream := consumererror.IsDownstream(err) + if !tc.enabled || (tc.enabled && isDownstream) { + refusedSamples += params[i].items + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.RefusedProfileSamplesKey, Value: attribute.Int64Value(int64(params[i].items))}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.FailedProfileSamplesKey, Value: attribute.Int64Value(0)}) + } else { + failedSamples += params[i].items + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.RefusedProfileSamplesKey, Value: attribute.Int64Value(0)}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.FailedProfileSamplesKey, Value: attribute.Int64Value(int64(params[i].items))}) + } + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.AcceptedProfileSamplesKey, Value: attribute.Int64Value(0)}) + assert.Equal(t, codes.Error, span.Status().Code) + assert.Equal(t, err.Error(), span.Status().Description) + } + } + metadatatest.AssertEqualReceiverAcceptedProfileSamples(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(acceptedSamples), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + metadatatest.AssertEqualReceiverRefusedProfileSamples(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(refusedSamples), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + metadatatest.AssertEqualReceiverFailedProfileSamples(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(failedSamples), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + // Assert otelcol_receiver_requests metric with outcome attribute + if tc.enabled { + outcomes := make(map[string]int64) + for _, param := range params { + var outcome string + switch { + case param.err == nil: + outcome = "success" + case consumererror.IsDownstream(param.err): + outcome = "refused" + default: + outcome = "failure" + } + outcomes[outcome]++ + } + var expectedRequests []metricdata.DataPoint[int64] + for outcome, count := range outcomes { + expectedRequests = append(expectedRequests, metricdata.DataPoint[int64]{ + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport), + attribute.String("outcome", outcome)), + Value: count, + }) + } + metadatatest.AssertEqualReceiverRequests(t, tt, expectedRequests, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + } + }) + }) + } +} + func TestReceiveWithLongLivedCtx(t *testing.T) { originalState := NewReceiverMetricsGate.IsEnabled() t.Cleanup(func() { @@ -607,6 +729,49 @@ func TestCheckReceiverLogsViews(t *testing.T) { }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) } +func TestCheckReceiverProfilesViews(t *testing.T) { + tt := componenttest.NewTelemetry() + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + rec, err := NewObsReport(ObsReportSettings{ + ReceiverID: receiverID, + Transport: transport, + ReceiverCreateSettings: receiver.Settings{ID: receiverID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + }) + require.NoError(t, err) + ctx := rec.StartProfilesOp(context.Background()) + require.NotNil(t, ctx) + rec.EndProfilesOp(ctx, format, 7, nil) + + metadatatest.AssertEqualReceiverAcceptedProfileSamples(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(7), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + metadatatest.AssertEqualReceiverRefusedProfileSamples(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(0), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + metadatatest.AssertEqualReceiverFailedProfileSamples(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(0), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) +} + func testTelemetry(t *testing.T, testFunc func(t *testing.T, tt *componenttest.Telemetry)) { tt := componenttest.NewTelemetry() t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) diff --git a/scraper/scraperhelper/go.mod b/scraper/scraperhelper/go.mod index 83f878cd51b..9f997becd16 100644 --- a/scraper/scraperhelper/go.mod +++ b/scraper/scraperhelper/go.mod @@ -40,6 +40,7 @@ require ( go.opentelemetry.io/collector/consumer/xconsumer v0.140.0 // indirect go.opentelemetry.io/collector/featuregate v1.46.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.140.0 // indirect + go.opentelemetry.io/collector/pipeline/xpipeline v0.140.0 // indirect go.opentelemetry.io/collector/receiver/xreceiver v0.140.0 // indirect golang.org/x/sys v0.37.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8 // indirect @@ -81,3 +82,5 @@ replace go.opentelemetry.io/collector/consumer/consumertest => ../../consumer/co replace go.opentelemetry.io/collector/featuregate => ../../featuregate replace go.opentelemetry.io/collector/internal/testutil => ../../internal/testutil + +replace go.opentelemetry.io/collector/pipeline/xpipeline => ../../pipeline/xpipeline