diff --git a/go.mod b/go.mod index 6226191..e6e8bed 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,9 @@ require ( github.com/go-logr/logr v1.4.2 github.com/go-logr/zapr v1.3.0 github.com/google/go-cmp v0.6.0 + github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.1.0 github.com/pkg/errors v0.9.1 + github.com/prometheus/client_golang v1.19.1 go.uber.org/zap v1.27.0 google.golang.org/grpc v1.67.0 google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.5.1 @@ -35,9 +37,11 @@ require ( github.com/Microsoft/go-winio v0.6.2 // indirect github.com/Microsoft/hcsshim v0.12.6 // indirect github.com/antlr4-go/antlr/v4 v4.13.1 // indirect + github.com/beorn7/perks v1.0.1 // indirect github.com/bufbuild/protocompile v0.14.1 // indirect github.com/bufbuild/protoplugin v0.0.0-20240911180120-7bb73e41a54a // indirect github.com/bufbuild/protovalidate-go v0.6.5 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/containerd/cgroups/v3 v3.0.3 // indirect github.com/containerd/containerd v1.7.22 // indirect github.com/containerd/continuity v0.4.3 // indirect @@ -80,6 +84,7 @@ require ( github.com/google/gofuzz v1.2.0 // indirect github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jdx/go-netrc v1.0.0 // indirect github.com/josharian/intern v1.0.0 // indirect @@ -109,6 +114,9 @@ require ( github.com/opencontainers/runtime-spec v1.2.0 // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect github.com/pkg/profile v1.7.0 // indirect + github.com/prometheus/client_model v0.6.1 // indirect + github.com/prometheus/common v0.55.0 // indirect + github.com/prometheus/procfs v0.15.1 // indirect github.com/quic-go/qpack v0.5.1 // indirect github.com/quic-go/quic-go v0.48.2 // indirect github.com/rs/cors v1.11.1 // indirect diff --git a/go.sum b/go.sum index 0c3f359..4dfb3dc 100644 --- a/go.sum +++ b/go.sum @@ -192,6 +192,10 @@ github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134/go.mod h1:vavhavw2zAx github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.1.0 h1:QGLs/O40yoNK9vmy4rhUGBVyMf1lISBGtXRpsu/Qu/o= +github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.1.0/go.mod h1:hM2alZsMUni80N33RBe6J0e423LB+odMj7d3EMP9l20= +github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 h1:pRhl55Yx1eC7BZ1N+BBWwnKaMyD8uC+34TLdndZMAKk= +github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0/go.mod h1:XKMd7iuf/RGPSMJ/U4HP0zS2Z9Fh8Ps9a+6X26m/tmI= github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo= github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0= github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k= diff --git a/sdk.go b/sdk.go index c7b5078..d122f22 100644 --- a/sdk.go +++ b/sdk.go @@ -22,10 +22,15 @@ import ( "crypto/tls" "crypto/x509" "net" + "net/http" "os" "path/filepath" + "time" + grpcprometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" "google.golang.org/grpc" "google.golang.org/grpc/credentials" ginsecure "google.golang.org/grpc/credentials/insecure" @@ -43,6 +48,7 @@ const ( DefaultNetwork = "tcp" DefaultAddress = ":9443" DefaultMaxRecvMsgSize = 1024 * 1024 * 4 + DefaultMetricsAddress = ":8080" ) // ServeOptions configure how a Function is served. @@ -52,6 +58,11 @@ type ServeOptions struct { MaxRecvMsgSize int Credentials credentials.TransportCredentials HealthServer healthgrpc.HealthServer + + // Metrics options + MetricsAddress string + MetricsRegistry *prometheus.Registry + UnaryInterceptors []grpc.UnaryServerInterceptor } // A ServeOption configures how a Function is served. @@ -143,13 +154,35 @@ func WithHealthServer(srv healthgrpc.HealthServer) ServeOption { } } +// WithMetricsServer configures the metrics server address and starts an HTTP server +// to expose Prometheus metrics on /metrics endpoint. If address is non-empty, +// metrics collection is automatically enabled. +func WithMetricsServer(address string) ServeOption { + return func(o *ServeOptions) error { + o.MetricsAddress = address + return nil + } +} + +// WithMetricsRegistry configures a custom Prometheus registry for metrics. +// Note: Metrics collection is enabled only when MetricsAddress is non-empty. +func WithMetricsRegistry(registry *prometheus.Registry) ServeOption { + return func(o *ServeOptions) error { + o.MetricsRegistry = registry + return nil + } +} + // Serve the supplied Function by creating a gRPC server and listening for // RunFunctionRequests. Blocks until the server returns an error. func Serve(fn v1.FunctionRunnerServiceServer, o ...ServeOption) error { + //nolint:forcetypeassert // prometheus.DefaultRegisterer is always *prometheus.Registry so := &ServeOptions{ - Network: DefaultNetwork, - Address: DefaultAddress, - MaxRecvMsgSize: DefaultMaxRecvMsgSize, + Network: DefaultNetwork, + Address: DefaultAddress, + MaxRecvMsgSize: DefaultMaxRecvMsgSize, + MetricsAddress: DefaultMetricsAddress, + MetricsRegistry: prometheus.DefaultRegisterer.(*prometheus.Registry), // Use default registry } for _, fn := range o { @@ -167,7 +200,29 @@ func Serve(fn v1.FunctionRunnerServiceServer, o ...ServeOption) error { return errors.Wrapf(err, "cannot listen for %s connections at address %q", so.Network, so.Address) } - srv := grpc.NewServer(grpc.MaxRecvMsgSize(so.MaxRecvMsgSize), grpc.Creds(so.Credentials)) + // Create server options + serverOpts := []grpc.ServerOption{ + grpc.MaxRecvMsgSize(so.MaxRecvMsgSize), + grpc.Creds(so.Credentials), + } + + // Build interceptors based on options + var interceptors []grpc.UnaryServerInterceptor + var metrics *grpcprometheus.ServerMetrics + + // Add metrics interceptor if metrics address is provided + if so.MetricsAddress != "" { + // Use Prometheus metrics + metrics = grpcprometheus.NewServerMetrics() + + // Apply metrics interceptor and custom interceptors + interceptors = append(interceptors, metrics.UnaryServerInterceptor()) + interceptors = append(interceptors, so.UnaryInterceptors...) + serverOpts = append(serverOpts, grpc.ChainUnaryInterceptor(interceptors...)) + // Register the metrics with the registry + so.MetricsRegistry.MustRegister(metrics) + } + srv := grpc.NewServer(serverOpts...) reflection.Register(srv) v1.RegisterFunctionRunnerServiceServer(srv, fn) v1beta1.RegisterFunctionRunnerServiceServer(srv, ServeBeta(fn)) @@ -176,6 +231,27 @@ func Serve(fn v1.FunctionRunnerServiceServer, o ...ServeOption) error { healthgrpc.RegisterHealthServer(srv, so.HealthServer) } + // Start metrics server if address is provided + if so.MetricsAddress != "" { + // Initialize metrics for the gRPC server + if metrics != nil { + metrics.InitializeMetrics(srv) + } + // Use the registry for metrics handler + handler := promhttp.HandlerFor(so.MetricsRegistry, promhttp.HandlerOpts{}) + + metricsServer := &http.Server{ + Addr: so.MetricsAddress, + Handler: handler, + ReadHeaderTimeout: 30 * time.Second, + } + + // Start metrics server in a goroutine + go func() { + _ = metricsServer.ListenAndServe() // Ignore errors + }() + } + return errors.Wrap(srv.Serve(lis), "cannot serve mTLS gRPC connections") } diff --git a/sdk_test.go b/sdk_test.go index 3bc900b..4c351bc 100644 --- a/sdk_test.go +++ b/sdk_test.go @@ -19,10 +19,18 @@ package function import ( "context" "fmt" + "io" + "net" + "net/http" + "strings" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "github.com/prometheus/client_golang/prometheus" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/testing/protocmp" @@ -187,3 +195,177 @@ type MockFunctionServer struct { func (s *MockFunctionServer) RunFunction(context.Context, *v1.RunFunctionRequest) (*v1.RunFunctionResponse, error) { return s.rsp, s.err } + +// TestMetricsServer_WithCustomRegistryAndCustomPort verifies that metrics server starts on custom port with custom registry as input. +func TestMetricsServer_WithCustomRegistryAndCustomPort(t *testing.T) { + // Create mock server + mockServer := &MockFunctionServer{ + rsp: &v1.RunFunctionResponse{ + Meta: &v1.ResponseMeta{Tag: "traffic-test"}, + }, + } + + // Get ports + grpcPort := getAvailablePort(t) + metricsPort := getAvailablePort(t) + + // Start server + serverDone := make(chan error, 1) + go func() { + err := Serve(mockServer, + Listen("tcp", fmt.Sprintf(":%d", grpcPort)), + Insecure(true), + WithMetricsServer(fmt.Sprintf(":%d", metricsPort)), + WithMetricsRegistry(prometheus.NewRegistry()), + ) + serverDone <- err + }() + + // Wait for server to start + time.Sleep(3 * time.Second) + + t.Run("MetricsServerTest On CustomPort With CustomRegistry", func(t *testing.T) { + conn, err := grpc.NewClient(fmt.Sprintf("localhost:%d", grpcPort), + grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer conn.Close() + + client := v1.NewFunctionRunnerServiceClient(conn) + + // Make the request + req1 := &v1.RunFunctionRequest{ + Meta: &v1.RequestMeta{Tag: "request-test"}, + } + + _, err = client.RunFunction(context.Background(), req1) + if err != nil { + t.Errorf("request failed: %v", err) + } + // Wait for metrics to be collected + time.Sleep(2 * time.Second) + + // Verify metrics endpoint has our custom metrics + metricsURL := fmt.Sprintf("http://localhost:%d/metrics", metricsPort) + req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, metricsURL, nil) + if err != nil { + t.Fatalf("Failed to create request: %v", err) + } + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Fatalf("Failed to get metrics: %v", err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Failed to read metrics: %v", err) + } + + metricsContent := string(body) + + // Verify Prometheus format is working + if !strings.Contains(metricsContent, "# HELP") { + t.Error("Expected Prometheus format") + } + + // Verify gRPC metrics are present + if !strings.Contains(metricsContent, "grpc_server_started_total") { + t.Error("Expected grpc_server_started_total metric to be present") + } + }) +} + +// TestMetricsServer_WithDefaultRegistryAndDefaultPort verifies that metrics server starts by default on :8080 with default registry with no input. +func TestMetricsServer_WithDefaultRegistryAndDefaultPort(t *testing.T) { + // Create mock server + mockServer := &MockFunctionServer{ + rsp: &v1.RunFunctionResponse{ + Meta: &v1.ResponseMeta{Tag: "default-metrics-test"}, + }, + } + + // Get ports + grpcPort := getAvailablePort(t) + // Should use default metrics port 8080 + metricsPort := 8080 + + serverDone := make(chan error, 1) + go func() { + err := Serve(mockServer, + Listen("tcp", fmt.Sprintf(":%d", grpcPort)), + Insecure(true), + ) + serverDone <- err + }() + + // Wait for server to start + time.Sleep(3 * time.Second) + + t.Run("MetricsServerTest On DefaultPort With DefaultRegisrty", func(t *testing.T) { + // Test gRPC connection + conn, err := grpc.NewClient(fmt.Sprintf("localhost:%d", grpcPort), + grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer conn.Close() + + client := v1.NewFunctionRunnerServiceClient(conn) + + // Make the request + req := &v1.RunFunctionRequest{ + Meta: &v1.RequestMeta{Tag: "default-metrics-test"}, + } + + _, err = client.RunFunction(context.Background(), req) + if err != nil { + t.Errorf("Request failed: %v", err) + } + + // Wait for metrics to be collected + time.Sleep(2 * time.Second) + + // Verify metrics endpoint is accessible + metricsURL := fmt.Sprintf("http://localhost:%d/metrics", metricsPort) + httpReq, err := http.NewRequestWithContext(context.Background(), http.MethodGet, metricsURL, nil) + if err != nil { + t.Fatalf("Failed to create request: %v", err) + } + resp, err := http.DefaultClient.Do(httpReq) + if err != nil { + t.Fatalf("Failed to get metrics: %v", err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Failed to read metrics: %v", err) + } + + metricsContent := string(body) + + // Verify metrics are present + if !strings.Contains(metricsContent, "# HELP") { + t.Error("Expected Prometheus format") + } + + // Verify gRPC metrics are present + if !strings.Contains(metricsContent, "grpc_server_started_total") { + t.Error("Expected grpc_server_started_total metric to be present") + } + }) +} + +// Helper function to get an available port. +func getAvailablePort(t *testing.T) int { + t.Helper() + + listener, err := net.Listen("tcp", ":0") + if err != nil { + t.Fatalf("Failed to get available port: %v", err) + } + defer listener.Close() + return listener.Addr().(*net.TCPAddr).Port +}