Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ require (
github.com/influxdata/influxdb-client-go/v2 v2.14.0
github.com/kelseyhightower/envconfig v1.4.0
github.com/prometheus/client_golang v1.23.2
github.com/prometheus/otlptranslator v1.0.0
github.com/tsenart/vegeta/v12 v12.13.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0
go.opentelemetry.io/contrib/instrumentation/runtime v0.64.0
go.opentelemetry.io/otel v1.39.0
go.opentelemetry.io/otel/exporters/prometheus v0.61.0
go.opentelemetry.io/otel/metric v1.39.0
go.opentelemetry.io/otel/sdk v1.39.0
go.opentelemetry.io/otel/sdk/metric v1.39.0
Expand Down Expand Up @@ -125,7 +127,6 @@ require (
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.67.4 // indirect
github.com/prometheus/otlptranslator v1.0.0 // indirect
github.com/prometheus/procfs v0.19.2 // indirect
github.com/rs/dnscache v0.0.0-20230804202142-fc85eb664529 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
Expand All @@ -141,7 +142,6 @@ require (
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.39.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.39.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.39.0 // indirect
go.opentelemetry.io/otel/exporters/prometheus v0.61.0 // indirect
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.39.0 // indirect
go.opentelemetry.io/proto/otlp v1.9.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
112 changes: 92 additions & 20 deletions pkg/queue/sharedmain/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,54 +19,65 @@ package sharedmain
import (
"cmp"
"context"
"errors"
"fmt"
"net"
"os"

"github.com/prometheus/otlptranslator"
"go.opentelemetry.io/contrib/instrumentation/runtime"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/metric"
otelprom "go.opentelemetry.io/otel/exporters/prometheus"
otelmetric "go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
"go.uber.org/zap"

"knative.dev/pkg/changeset"
"knative.dev/pkg/observability/metrics"
promserver "knative.dev/pkg/observability/metrics/prometheus"
"knative.dev/pkg/observability/semconv"
"knative.dev/pkg/observability/tracing"
"knative.dev/pkg/system"
servingmetrics "knative.dev/serving/pkg/metrics"
"knative.dev/serving/pkg/networking"
)

// observabilityMeterProvider combines the OTel metric provider interface with a Shutdown method.
type observabilityMeterProvider interface {
otelmetric.MeterProvider
Shutdown(context.Context) error
}

func SetupObservabilityOrDie(
ctx context.Context,
cfg *config,
logger *zap.SugaredLogger,
) (*metrics.MeterProvider, *tracing.TracerProvider) {
) (observabilityMeterProvider, *tracing.TracerProvider) {
r := res(logger, cfg)

// Force the port to be the default queue user metrics port if it's not overridden
// by the operator
if cfg.Observability.RequestMetrics.Protocol == metrics.ProtocolPrometheus &&
cfg.Observability.RequestMetrics.Endpoint == "" {
cfg.Observability.RequestMetrics.Endpoint = fmt.Sprintf(":%d", networking.UserQueueMetricsPort)
}

meterProvider, err := metrics.NewMeterProvider(
ctx,
cfg.Observability.RequestMetrics,
metric.WithResource(r),
)
if err != nil {
logger.Fatalw("Failed to setup meter provider", zap.Error(err))
var mp observabilityMeterProvider
if cfg.Observability.RequestMetrics.Protocol == metrics.ProtocolPrometheus {
mp = buildPrometheusProvider(cfg, r, logger)
} else {
meterProvider, err := metrics.NewMeterProvider(
ctx,
cfg.Observability.RequestMetrics,
sdkmetric.WithResource(r),
)
if err != nil {
logger.Fatalw("Failed to setup meter provider", zap.Error(err))
}
mp = meterProvider
}

otel.SetMeterProvider(meterProvider)
otel.SetMeterProvider(mp)

err = runtime.Start(
err := runtime.Start(
runtime.WithMinimumReadMemStatsInterval(cfg.Observability.Runtime.ExportInterval),
runtime.WithMeterProvider(meterProvider),
runtime.WithMeterProvider(mp),
)
if err != nil {
logger.Fatalw("Failed to start runtime metrics", zap.Error(err))
Expand All @@ -84,7 +95,68 @@ func SetupObservabilityOrDie(
otel.SetTextMapPropagator(tracing.DefaultTextMapPropagator())
otel.SetTracerProvider(tracerProvider)

return meterProvider, tracerProvider
return mp, tracerProvider
}

// buildPrometheusProvider creates a Prometheus-backed meter provider that exposes
// the serving-specific resource attributes (service, configuration, revision) as
// constant Prometheus labels on every metric.
func buildPrometheusProvider(cfg *config, r *resource.Resource, logger *zap.SugaredLogger) observabilityMeterProvider {
endpoint := cfg.Observability.RequestMetrics.Endpoint
if endpoint == "" {
endpoint = fmt.Sprintf(":%d", networking.UserQueueMetricsPort)
}

reader, err := otelprom.New(
otelprom.WithTranslationStrategy(otlptranslator.UnderscoreEscapingWithSuffixes),
otelprom.WithResourceAsConstantLabels(attribute.NewAllowKeysFilter(
attribute.Key(servingmetrics.ServiceNameKey),
attribute.Key(servingmetrics.ConfigurationNameKey),
attribute.Key(servingmetrics.RevisionNameKey),
semconv.K8SNamespaceNameKey,
semconv.K8SPodNameKey,
)),
)
if err != nil {
logger.Fatalw("Failed to create prometheus exporter", zap.Error(err))
}

host, port, err := net.SplitHostPort(endpoint)
if err != nil {
logger.Fatalw("Invalid prometheus endpoint", zap.String("endpoint", endpoint), zap.Error(err))
}

server, err := promserver.NewServer(
promserver.WithHost(host),
promserver.WithPort(port),
)
if err != nil {
logger.Fatalw("Failed to create prometheus server", zap.Error(err))
}
go server.ListenAndServe()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude reported that we should call Serve() here, see https://pkg.go.dev/knative.dev/pkg/observability/metrics/prometheus#Server.Serve

Looks good to me besides that

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is based on https://github.com/openshift-knative/serving/blob/release-v1.21/vendor/knative.dev/pkg/observability/metrics/prometheus_enabled.go (basically that with just the WithResourceAsConstantLabels config added), so we should probably try to keep the behavior as similar as possible.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


sdkProvider := sdkmetric.NewMeterProvider(
sdkmetric.WithReader(reader),
sdkmetric.WithResource(r),
)

return &prometheusProvider{
MeterProvider: sdkProvider,
shutdownFns: []func(context.Context) error{sdkProvider.Shutdown, server.Shutdown},
}
}

type prometheusProvider struct {
otelmetric.MeterProvider
shutdownFns []func(context.Context) error
}

func (p *prometheusProvider) Shutdown(ctx context.Context) error {
errs := make([]error, 0, len(p.shutdownFns))
for _, fn := range p.shutdownFns {
errs = append(errs, fn(ctx))
}
return errors.Join(errs...)
}

func res(logger *zap.SugaredLogger, cfg *config) *resource.Resource {
Expand Down
Loading