Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ require (
github.com/go-logr/logr v1.4.2
github.com/go-logr/zapr v1.3.0
github.com/google/go-cmp v0.6.0
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.1.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.19.1
go.uber.org/zap v1.27.0
google.golang.org/grpc v1.67.0
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.5.1
Expand All @@ -35,9 +37,11 @@ require (
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/Microsoft/hcsshim v0.12.6 // indirect
github.com/antlr4-go/antlr/v4 v4.13.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bufbuild/protocompile v0.14.1 // indirect
github.com/bufbuild/protoplugin v0.0.0-20240911180120-7bb73e41a54a // indirect
github.com/bufbuild/protovalidate-go v0.6.5 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/containerd/cgroups/v3 v3.0.3 // indirect
github.com/containerd/containerd v1.7.22 // indirect
github.com/containerd/continuity v0.4.3 // indirect
Expand Down Expand Up @@ -80,6 +84,7 @@ require (
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jdx/go-netrc v1.0.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
Expand Down Expand Up @@ -109,6 +114,9 @@ require (
github.com/opencontainers/runtime-spec v1.2.0 // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/pkg/profile v1.7.0 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/quic-go/qpack v0.5.1 // indirect
github.com/quic-go/quic-go v0.48.2 // indirect
github.com/rs/cors v1.11.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134/go.mod h1:vavhavw2zAx
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.1.0 h1:QGLs/O40yoNK9vmy4rhUGBVyMf1lISBGtXRpsu/Qu/o=
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.1.0/go.mod h1:hM2alZsMUni80N33RBe6J0e423LB+odMj7d3EMP9l20=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 h1:pRhl55Yx1eC7BZ1N+BBWwnKaMyD8uC+34TLdndZMAKk=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0/go.mod h1:XKMd7iuf/RGPSMJ/U4HP0zS2Z9Fh8Ps9a+6X26m/tmI=
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k=
Expand Down
84 changes: 80 additions & 4 deletions sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,15 @@ import (
"crypto/tls"
"crypto/x509"
"net"
"net/http"
"os"
"path/filepath"
"time"

grpcprometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
ginsecure "google.golang.org/grpc/credentials/insecure"
Expand All @@ -43,6 +48,7 @@ const (
DefaultNetwork = "tcp"
DefaultAddress = ":9443"
DefaultMaxRecvMsgSize = 1024 * 1024 * 4
DefaultMetricsAddress = ":8080"
)

// ServeOptions configure how a Function is served.
Expand All @@ -52,6 +58,11 @@ type ServeOptions struct {
MaxRecvMsgSize int
Credentials credentials.TransportCredentials
HealthServer healthgrpc.HealthServer

// Metrics options
MetricsAddress string
MetricsRegistry *prometheus.Registry
UnaryInterceptors []grpc.UnaryServerInterceptor
}

// A ServeOption configures how a Function is served.
Expand Down Expand Up @@ -143,13 +154,35 @@ func WithHealthServer(srv healthgrpc.HealthServer) ServeOption {
}
}

// WithMetricsServer configures the metrics server address and starts an HTTP server
// to expose Prometheus metrics on /metrics endpoint. If address is non-empty,
// metrics collection is automatically enabled.
func WithMetricsServer(address string) ServeOption {
return func(o *ServeOptions) error {
o.MetricsAddress = address
return nil
}
}

// WithMetricsRegistry configures a custom Prometheus registry for metrics.
// Note: Metrics collection is enabled only when MetricsAddress is non-empty.
func WithMetricsRegistry(registry *prometheus.Registry) ServeOption {
return func(o *ServeOptions) error {
o.MetricsRegistry = registry
return nil
}
}

// Serve the supplied Function by creating a gRPC server and listening for
// RunFunctionRequests. Blocks until the server returns an error.
func Serve(fn v1.FunctionRunnerServiceServer, o ...ServeOption) error {
//nolint:forcetypeassert // prometheus.DefaultRegisterer is always *prometheus.Registry
so := &ServeOptions{
Network: DefaultNetwork,
Address: DefaultAddress,
MaxRecvMsgSize: DefaultMaxRecvMsgSize,
Network: DefaultNetwork,
Address: DefaultAddress,
MaxRecvMsgSize: DefaultMaxRecvMsgSize,
MetricsAddress: DefaultMetricsAddress,
MetricsRegistry: prometheus.DefaultRegisterer.(*prometheus.Registry), // Use default registry
}

for _, fn := range o {
Expand All @@ -167,7 +200,29 @@ func Serve(fn v1.FunctionRunnerServiceServer, o ...ServeOption) error {
return errors.Wrapf(err, "cannot listen for %s connections at address %q", so.Network, so.Address)
}

srv := grpc.NewServer(grpc.MaxRecvMsgSize(so.MaxRecvMsgSize), grpc.Creds(so.Credentials))
// Create server options
serverOpts := []grpc.ServerOption{
grpc.MaxRecvMsgSize(so.MaxRecvMsgSize),
grpc.Creds(so.Credentials),
}

// Build interceptors based on options
var interceptors []grpc.UnaryServerInterceptor
var metrics *grpcprometheus.ServerMetrics

// Add metrics interceptor if metrics address is provided
if so.MetricsAddress != "" {
// Use Prometheus metrics
metrics = grpcprometheus.NewServerMetrics()

// Apply metrics interceptor and custom interceptors
interceptors = append(interceptors, metrics.UnaryServerInterceptor())
interceptors = append(interceptors, so.UnaryInterceptors...)
serverOpts = append(serverOpts, grpc.ChainUnaryInterceptor(interceptors...))
// Register the metrics with the registry
so.MetricsRegistry.MustRegister(metrics)
}
srv := grpc.NewServer(serverOpts...)
reflection.Register(srv)
v1.RegisterFunctionRunnerServiceServer(srv, fn)
v1beta1.RegisterFunctionRunnerServiceServer(srv, ServeBeta(fn))
Expand All @@ -176,6 +231,27 @@ func Serve(fn v1.FunctionRunnerServiceServer, o ...ServeOption) error {
healthgrpc.RegisterHealthServer(srv, so.HealthServer)
}

// Start metrics server if address is provided
if so.MetricsAddress != "" {
// Initialize metrics for the gRPC server
if metrics != nil {
metrics.InitializeMetrics(srv)
}
// Use the registry for metrics handler
handler := promhttp.HandlerFor(so.MetricsRegistry, promhttp.HandlerOpts{})

metricsServer := &http.Server{
Addr: so.MetricsAddress,
Handler: handler,
ReadHeaderTimeout: 30 * time.Second,
}

// Start metrics server in a goroutine
go func() {
_ = metricsServer.ListenAndServe() // Ignore errors
}()
}

return errors.Wrap(srv.Serve(lis), "cannot serve mTLS gRPC connections")
}

Expand Down
182 changes: 182 additions & 0 deletions sdk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,18 @@ package function
import (
"context"
"fmt"
"io"
"net"
"net/http"
"strings"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/testing/protocmp"

Expand Down Expand Up @@ -187,3 +195,177 @@ type MockFunctionServer struct {
func (s *MockFunctionServer) RunFunction(context.Context, *v1.RunFunctionRequest) (*v1.RunFunctionResponse, error) {
return s.rsp, s.err
}

// TestMetricsServer_WithCustomRegistryAndCustomPort verifies that metrics server starts on custom port with custom registry as input.
func TestMetricsServer_WithCustomRegistryAndCustomPort(t *testing.T) {
// Create mock server
mockServer := &MockFunctionServer{
rsp: &v1.RunFunctionResponse{
Meta: &v1.ResponseMeta{Tag: "traffic-test"},
},
}

// Get ports
grpcPort := getAvailablePort(t)
metricsPort := getAvailablePort(t)

// Start server
serverDone := make(chan error, 1)
go func() {
err := Serve(mockServer,
Listen("tcp", fmt.Sprintf(":%d", grpcPort)),
Insecure(true),
WithMetricsServer(fmt.Sprintf(":%d", metricsPort)),
WithMetricsRegistry(prometheus.NewRegistry()),
)
serverDone <- err
}()

// Wait for server to start
time.Sleep(3 * time.Second)

t.Run("MetricsServerTest On CustomPort With CustomRegistry", func(t *testing.T) {
conn, err := grpc.NewClient(fmt.Sprintf("localhost:%d", grpcPort),
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()

client := v1.NewFunctionRunnerServiceClient(conn)

// Make the request
req1 := &v1.RunFunctionRequest{
Meta: &v1.RequestMeta{Tag: "request-test"},
}

_, err = client.RunFunction(context.Background(), req1)
if err != nil {
t.Errorf("request failed: %v", err)
}
// Wait for metrics to be collected
time.Sleep(2 * time.Second)

// Verify metrics endpoint has our custom metrics
metricsURL := fmt.Sprintf("http://localhost:%d/metrics", metricsPort)
req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, metricsURL, nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("Failed to get metrics: %v", err)
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Failed to read metrics: %v", err)
}

metricsContent := string(body)

// Verify Prometheus format is working
if !strings.Contains(metricsContent, "# HELP") {
t.Error("Expected Prometheus format")
}

// Verify gRPC metrics are present
if !strings.Contains(metricsContent, "grpc_server_started_total") {
t.Error("Expected grpc_server_started_total metric to be present")
}
})
}

// TestMetricsServer_WithDefaultRegistryAndDefaultPort verifies that metrics server starts by default on :8080 with default registry with no input.
func TestMetricsServer_WithDefaultRegistryAndDefaultPort(t *testing.T) {
// Create mock server
mockServer := &MockFunctionServer{
rsp: &v1.RunFunctionResponse{
Meta: &v1.ResponseMeta{Tag: "default-metrics-test"},
},
}

// Get ports
grpcPort := getAvailablePort(t)
// Should use default metrics port 8080
metricsPort := 8080

serverDone := make(chan error, 1)
go func() {
err := Serve(mockServer,
Listen("tcp", fmt.Sprintf(":%d", grpcPort)),
Insecure(true),
)
serverDone <- err
}()

// Wait for server to start
time.Sleep(3 * time.Second)

t.Run("MetricsServerTest On DefaultPort With DefaultRegisrty", func(t *testing.T) {
// Test gRPC connection
conn, err := grpc.NewClient(fmt.Sprintf("localhost:%d", grpcPort),
grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("Failed to connect: %v", err)
}
defer conn.Close()

client := v1.NewFunctionRunnerServiceClient(conn)

// Make the request
req := &v1.RunFunctionRequest{
Meta: &v1.RequestMeta{Tag: "default-metrics-test"},
}

_, err = client.RunFunction(context.Background(), req)
if err != nil {
t.Errorf("Request failed: %v", err)
}

// Wait for metrics to be collected
time.Sleep(2 * time.Second)

// Verify metrics endpoint is accessible
metricsURL := fmt.Sprintf("http://localhost:%d/metrics", metricsPort)
httpReq, err := http.NewRequestWithContext(context.Background(), http.MethodGet, metricsURL, nil)
if err != nil {
t.Fatalf("Failed to create request: %v", err)
}
resp, err := http.DefaultClient.Do(httpReq)
if err != nil {
t.Fatalf("Failed to get metrics: %v", err)
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
t.Fatalf("Failed to read metrics: %v", err)
}

metricsContent := string(body)

// Verify metrics are present
if !strings.Contains(metricsContent, "# HELP") {
t.Error("Expected Prometheus format")
}

// Verify gRPC metrics are present
if !strings.Contains(metricsContent, "grpc_server_started_total") {
t.Error("Expected grpc_server_started_total metric to be present")
}
})
}

// Helper function to get an available port.
func getAvailablePort(t *testing.T) int {
t.Helper()

listener, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatalf("Failed to get available port: %v", err)
}
defer listener.Close()
return listener.Addr().(*net.TCPAddr).Port
}