diff --git a/sdk.go b/sdk.go index 984fe27..816bc95 100644 --- a/sdk.go +++ b/sdk.go @@ -63,6 +63,7 @@ type ServeOptions struct { MetricsAddress string MetricsRegistry *prometheus.Registry UnaryInterceptors []grpc.UnaryServerInterceptor + MetricsServerOpts []grpcprometheus.ServerMetricsOption } // A ServeOption configures how a Function is served. @@ -173,6 +174,15 @@ func WithMetricsRegistry(registry *prometheus.Registry) ServeOption { } } +// WithMetricsServerOpts configures the options for the Metrics Server. +// Note: Metrics collection is enabled only when MetricsAddress is non-empty. +func WithMetricsServerOpts(opts ...grpcprometheus.ServerMetricsOption) ServeOption { + return func(o *ServeOptions) error { + o.MetricsServerOpts = append(o.MetricsServerOpts, opts...) + return nil + } +} + // Serve the supplied Function by creating a gRPC server and listening for // RunFunctionRequests. Blocks until the server returns an error. func Serve(fn v1.FunctionRunnerServiceServer, o ...ServeOption) error { @@ -214,7 +224,7 @@ func Serve(fn v1.FunctionRunnerServiceServer, o ...ServeOption) error { // Add metrics interceptor if metrics address is provided if so.MetricsAddress != "" { // Use Prometheus metrics - metrics = grpcprometheus.NewServerMetrics() + metrics = grpcprometheus.NewServerMetrics(so.MetricsServerOpts...) // Apply metrics interceptor and custom interceptors interceptors = append(interceptors, metrics.UnaryServerInterceptor()) diff --git a/sdk_test.go b/sdk_test.go index fbef8ed..33ebffa 100644 --- a/sdk_test.go +++ b/sdk_test.go @@ -28,6 +28,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + grpcprometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus" "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -303,7 +304,7 @@ func TestMetricsServer_WithDefaultRegistryAndDefaultPort(t *testing.T) { // Wait for server to start time.Sleep(3 * time.Second) - t.Run("MetricsServerTest On DefaultPort With DefaultRegisrty", func(t *testing.T) { + t.Run("MetricsServerTest On DefaultPort With DefaultRegistry", func(t *testing.T) { // Test gRPC connection conn, err := grpc.NewClient(fmt.Sprintf("localhost:%d", grpcPort), grpc.WithTransportCredentials(insecure.NewCredentials())) @@ -358,6 +359,96 @@ func TestMetricsServer_WithDefaultRegistryAndDefaultPort(t *testing.T) { }) } +// TestMetricsServer_WithCustomMetricsServerOpts verifies that metrics server uses custom metrics server opts. +func TestMetricsServer_WithCustomMetricsServerOpts(t *testing.T) { + // Create mock server + mockServer := &MockFunctionServer{ + rsp: &v1.RunFunctionResponse{ + Meta: &v1.ResponseMeta{Tag: "default-metrics-test"}, + }, + } + + // Get ports + grpcPort := getAvailablePort(t) + metricsPort := getAvailablePort(t) + + serverDone := make(chan error, 1) + go func() { + err := Serve(mockServer, + Listen("tcp", fmt.Sprintf(":%d", grpcPort)), + Insecure(true), + WithMetricsServer(fmt.Sprintf(":%d", metricsPort)), + WithMetricsRegistry(prometheus.NewRegistry()), + WithMetricsServerOpts( + grpcprometheus.WithServerHandlingTimeHistogram(), + ), + ) + serverDone <- err + }() + + // Wait for server to start + time.Sleep(3 * time.Second) + + t.Run("MetricsServerTest with custom metrics server opts", func(t *testing.T) { + // Test gRPC connection + conn, err := grpc.NewClient(fmt.Sprintf("localhost:%d", grpcPort), + grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("Failed to connect: %v", err) + } + defer conn.Close() + + client := v1.NewFunctionRunnerServiceClient(conn) + + // Make the request + req := &v1.RunFunctionRequest{ + Meta: &v1.RequestMeta{Tag: "default-metrics-test"}, + } + + _, err = client.RunFunction(context.Background(), req) + if err != nil { + t.Errorf("Request failed: %v", err) + } + + // Wait for metrics to be collected + time.Sleep(2 * time.Second) + + // Verify metrics endpoint is accessible + metricsURL := fmt.Sprintf("http://localhost:%d/metrics", metricsPort) + httpReq, err := http.NewRequestWithContext(context.Background(), http.MethodGet, metricsURL, nil) + if err != nil { + t.Fatalf("Failed to create request: %v", err) + } + resp, err := http.DefaultClient.Do(httpReq) + if err != nil { + t.Fatalf("Failed to get metrics: %v", err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + t.Fatalf("Failed to read metrics: %v", err) + } + + metricsContent := string(body) + + // Verify metrics are present + if !strings.Contains(metricsContent, "# HELP") { + t.Error("Expected Prometheus format") + } + + // Verify gRPC metrics are present + if !strings.Contains(metricsContent, "grpc_server_started_total") { + t.Error("Expected grpc_server_started_total metric to be present") + } + + // Verify gRPC Histogram metrics are present + if !strings.Contains(metricsContent, "grpc_server_handling_seconds_bucket") { + t.Error("Expected grpc_server_handling_seconds_bucket metric to be present") + } + }) +} + // Helper function to get an available port. func getAvailablePort(t *testing.T) int { t.Helper()