Skip to content

Commit ae69ce6

Browse files
authored
Merge pull request #229 from sagarwal16-atlassian/sa_function-sdk-go-issue_20
add support for exposing prometheus metrics for function runs
2 parents 0d86067 + 32739b3 commit ae69ce6

File tree

4 files changed

+274
-4
lines changed

4 files changed

+274
-4
lines changed

go.mod

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@ require (
99
github.com/go-logr/logr v1.4.2
1010
github.com/go-logr/zapr v1.3.0
1111
github.com/google/go-cmp v0.6.0
12+
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.1.0
1213
github.com/pkg/errors v0.9.1
14+
github.com/prometheus/client_golang v1.19.1
1315
go.uber.org/zap v1.27.0
1416
google.golang.org/grpc v1.67.0
1517
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.5.1
@@ -35,9 +37,11 @@ require (
3537
github.com/Microsoft/go-winio v0.6.2 // indirect
3638
github.com/Microsoft/hcsshim v0.12.6 // indirect
3739
github.com/antlr4-go/antlr/v4 v4.13.1 // indirect
40+
github.com/beorn7/perks v1.0.1 // indirect
3841
github.com/bufbuild/protocompile v0.14.1 // indirect
3942
github.com/bufbuild/protoplugin v0.0.0-20240911180120-7bb73e41a54a // indirect
4043
github.com/bufbuild/protovalidate-go v0.6.5 // indirect
44+
github.com/cespare/xxhash/v2 v2.3.0 // indirect
4145
github.com/containerd/cgroups/v3 v3.0.3 // indirect
4246
github.com/containerd/containerd v1.7.22 // indirect
4347
github.com/containerd/continuity v0.4.3 // indirect
@@ -80,6 +84,7 @@ require (
8084
github.com/google/gofuzz v1.2.0 // indirect
8185
github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134 // indirect
8286
github.com/google/uuid v1.6.0 // indirect
87+
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 // indirect
8388
github.com/inconshreveable/mousetrap v1.1.0 // indirect
8489
github.com/jdx/go-netrc v1.0.0 // indirect
8590
github.com/josharian/intern v1.0.0 // indirect
@@ -109,6 +114,9 @@ require (
109114
github.com/opencontainers/runtime-spec v1.2.0 // indirect
110115
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
111116
github.com/pkg/profile v1.7.0 // indirect
117+
github.com/prometheus/client_model v0.6.1 // indirect
118+
github.com/prometheus/common v0.55.0 // indirect
119+
github.com/prometheus/procfs v0.15.1 // indirect
112120
github.com/quic-go/qpack v0.5.1 // indirect
113121
github.com/quic-go/quic-go v0.48.2 // indirect
114122
github.com/rs/cors v1.11.1 // indirect

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,10 @@ github.com/google/pprof v0.0.0-20240910150728-a0b0bb1d4134/go.mod h1:vavhavw2zAx
192192
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
193193
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
194194
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
195+
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.1.0 h1:QGLs/O40yoNK9vmy4rhUGBVyMf1lISBGtXRpsu/Qu/o=
196+
github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.1.0/go.mod h1:hM2alZsMUni80N33RBe6J0e423LB+odMj7d3EMP9l20=
197+
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 h1:pRhl55Yx1eC7BZ1N+BBWwnKaMyD8uC+34TLdndZMAKk=
198+
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0/go.mod h1:XKMd7iuf/RGPSMJ/U4HP0zS2Z9Fh8Ps9a+6X26m/tmI=
195199
github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo=
196200
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1YCS1PXdKYWi8FsN0=
197201
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k=

sdk.go

Lines changed: 80 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,15 @@ import (
2222
"crypto/tls"
2323
"crypto/x509"
2424
"net"
25+
"net/http"
2526
"os"
2627
"path/filepath"
28+
"time"
2729

30+
grpcprometheus "github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
2831
"github.com/pkg/errors"
32+
"github.com/prometheus/client_golang/prometheus"
33+
"github.com/prometheus/client_golang/prometheus/promhttp"
2934
"google.golang.org/grpc"
3035
"google.golang.org/grpc/credentials"
3136
ginsecure "google.golang.org/grpc/credentials/insecure"
@@ -43,6 +48,7 @@ const (
4348
DefaultNetwork = "tcp"
4449
DefaultAddress = ":9443"
4550
DefaultMaxRecvMsgSize = 1024 * 1024 * 4
51+
DefaultMetricsAddress = ":8080"
4652
)
4753

4854
// ServeOptions configure how a Function is served.
@@ -52,6 +58,11 @@ type ServeOptions struct {
5258
MaxRecvMsgSize int
5359
Credentials credentials.TransportCredentials
5460
HealthServer healthgrpc.HealthServer
61+
62+
// Metrics options
63+
MetricsAddress string
64+
MetricsRegistry *prometheus.Registry
65+
UnaryInterceptors []grpc.UnaryServerInterceptor
5566
}
5667

5768
// A ServeOption configures how a Function is served.
@@ -143,13 +154,35 @@ func WithHealthServer(srv healthgrpc.HealthServer) ServeOption {
143154
}
144155
}
145156

157+
// WithMetricsServer configures the metrics server address and starts an HTTP server
158+
// to expose Prometheus metrics on /metrics endpoint. If address is non-empty,
159+
// metrics collection is automatically enabled.
160+
func WithMetricsServer(address string) ServeOption {
161+
return func(o *ServeOptions) error {
162+
o.MetricsAddress = address
163+
return nil
164+
}
165+
}
166+
167+
// WithMetricsRegistry configures a custom Prometheus registry for metrics.
168+
// Note: Metrics collection is enabled only when MetricsAddress is non-empty.
169+
func WithMetricsRegistry(registry *prometheus.Registry) ServeOption {
170+
return func(o *ServeOptions) error {
171+
o.MetricsRegistry = registry
172+
return nil
173+
}
174+
}
175+
146176
// Serve the supplied Function by creating a gRPC server and listening for
147177
// RunFunctionRequests. Blocks until the server returns an error.
148178
func Serve(fn v1.FunctionRunnerServiceServer, o ...ServeOption) error {
179+
//nolint:forcetypeassert // prometheus.DefaultRegisterer is always *prometheus.Registry
149180
so := &ServeOptions{
150-
Network: DefaultNetwork,
151-
Address: DefaultAddress,
152-
MaxRecvMsgSize: DefaultMaxRecvMsgSize,
181+
Network: DefaultNetwork,
182+
Address: DefaultAddress,
183+
MaxRecvMsgSize: DefaultMaxRecvMsgSize,
184+
MetricsAddress: DefaultMetricsAddress,
185+
MetricsRegistry: prometheus.DefaultRegisterer.(*prometheus.Registry), // Use default registry
153186
}
154187

155188
for _, fn := range o {
@@ -167,7 +200,29 @@ func Serve(fn v1.FunctionRunnerServiceServer, o ...ServeOption) error {
167200
return errors.Wrapf(err, "cannot listen for %s connections at address %q", so.Network, so.Address)
168201
}
169202

170-
srv := grpc.NewServer(grpc.MaxRecvMsgSize(so.MaxRecvMsgSize), grpc.Creds(so.Credentials))
203+
// Create server options
204+
serverOpts := []grpc.ServerOption{
205+
grpc.MaxRecvMsgSize(so.MaxRecvMsgSize),
206+
grpc.Creds(so.Credentials),
207+
}
208+
209+
// Build interceptors based on options
210+
var interceptors []grpc.UnaryServerInterceptor
211+
var metrics *grpcprometheus.ServerMetrics
212+
213+
// Add metrics interceptor if metrics address is provided
214+
if so.MetricsAddress != "" {
215+
// Use Prometheus metrics
216+
metrics = grpcprometheus.NewServerMetrics()
217+
218+
// Apply metrics interceptor and custom interceptors
219+
interceptors = append(interceptors, metrics.UnaryServerInterceptor())
220+
interceptors = append(interceptors, so.UnaryInterceptors...)
221+
serverOpts = append(serverOpts, grpc.ChainUnaryInterceptor(interceptors...))
222+
// Register the metrics with the registry
223+
so.MetricsRegistry.MustRegister(metrics)
224+
}
225+
srv := grpc.NewServer(serverOpts...)
171226
reflection.Register(srv)
172227
v1.RegisterFunctionRunnerServiceServer(srv, fn)
173228
v1beta1.RegisterFunctionRunnerServiceServer(srv, ServeBeta(fn))
@@ -176,6 +231,27 @@ func Serve(fn v1.FunctionRunnerServiceServer, o ...ServeOption) error {
176231
healthgrpc.RegisterHealthServer(srv, so.HealthServer)
177232
}
178233

234+
// Start metrics server if address is provided
235+
if so.MetricsAddress != "" {
236+
// Initialize metrics for the gRPC server
237+
if metrics != nil {
238+
metrics.InitializeMetrics(srv)
239+
}
240+
// Use the registry for metrics handler
241+
handler := promhttp.HandlerFor(so.MetricsRegistry, promhttp.HandlerOpts{})
242+
243+
metricsServer := &http.Server{
244+
Addr: so.MetricsAddress,
245+
Handler: handler,
246+
ReadHeaderTimeout: 30 * time.Second,
247+
}
248+
249+
// Start metrics server in a goroutine
250+
go func() {
251+
_ = metricsServer.ListenAndServe() // Ignore errors
252+
}()
253+
}
254+
179255
return errors.Wrap(srv.Serve(lis), "cannot serve mTLS gRPC connections")
180256
}
181257

sdk_test.go

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,18 @@ package function
1919
import (
2020
"context"
2121
"fmt"
22+
"io"
23+
"net"
24+
"net/http"
25+
"strings"
2226
"testing"
27+
"time"
2328

2429
"github.com/google/go-cmp/cmp"
2530
"github.com/google/go-cmp/cmp/cmpopts"
31+
"github.com/prometheus/client_golang/prometheus"
32+
"google.golang.org/grpc"
33+
"google.golang.org/grpc/credentials/insecure"
2634
"google.golang.org/protobuf/encoding/protojson"
2735
"google.golang.org/protobuf/testing/protocmp"
2836

@@ -187,3 +195,177 @@ type MockFunctionServer struct {
187195
func (s *MockFunctionServer) RunFunction(context.Context, *v1.RunFunctionRequest) (*v1.RunFunctionResponse, error) {
188196
return s.rsp, s.err
189197
}
198+
199+
// TestMetricsServer_WithCustomRegistryAndCustomPort verifies that metrics server starts on custom port with custom registry as input.
200+
func TestMetricsServer_WithCustomRegistryAndCustomPort(t *testing.T) {
201+
// Create mock server
202+
mockServer := &MockFunctionServer{
203+
rsp: &v1.RunFunctionResponse{
204+
Meta: &v1.ResponseMeta{Tag: "traffic-test"},
205+
},
206+
}
207+
208+
// Get ports
209+
grpcPort := getAvailablePort(t)
210+
metricsPort := getAvailablePort(t)
211+
212+
// Start server
213+
serverDone := make(chan error, 1)
214+
go func() {
215+
err := Serve(mockServer,
216+
Listen("tcp", fmt.Sprintf(":%d", grpcPort)),
217+
Insecure(true),
218+
WithMetricsServer(fmt.Sprintf(":%d", metricsPort)),
219+
WithMetricsRegistry(prometheus.NewRegistry()),
220+
)
221+
serverDone <- err
222+
}()
223+
224+
// Wait for server to start
225+
time.Sleep(3 * time.Second)
226+
227+
t.Run("MetricsServerTest On CustomPort With CustomRegistry", func(t *testing.T) {
228+
conn, err := grpc.NewClient(fmt.Sprintf("localhost:%d", grpcPort),
229+
grpc.WithTransportCredentials(insecure.NewCredentials()))
230+
if err != nil {
231+
t.Fatalf("Failed to connect: %v", err)
232+
}
233+
defer conn.Close()
234+
235+
client := v1.NewFunctionRunnerServiceClient(conn)
236+
237+
// Make the request
238+
req1 := &v1.RunFunctionRequest{
239+
Meta: &v1.RequestMeta{Tag: "request-test"},
240+
}
241+
242+
_, err = client.RunFunction(context.Background(), req1)
243+
if err != nil {
244+
t.Errorf("request failed: %v", err)
245+
}
246+
// Wait for metrics to be collected
247+
time.Sleep(2 * time.Second)
248+
249+
// Verify metrics endpoint has our custom metrics
250+
metricsURL := fmt.Sprintf("http://localhost:%d/metrics", metricsPort)
251+
req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, metricsURL, nil)
252+
if err != nil {
253+
t.Fatalf("Failed to create request: %v", err)
254+
}
255+
resp, err := http.DefaultClient.Do(req)
256+
if err != nil {
257+
t.Fatalf("Failed to get metrics: %v", err)
258+
}
259+
defer resp.Body.Close()
260+
261+
body, err := io.ReadAll(resp.Body)
262+
if err != nil {
263+
t.Fatalf("Failed to read metrics: %v", err)
264+
}
265+
266+
metricsContent := string(body)
267+
268+
// Verify Prometheus format is working
269+
if !strings.Contains(metricsContent, "# HELP") {
270+
t.Error("Expected Prometheus format")
271+
}
272+
273+
// Verify gRPC metrics are present
274+
if !strings.Contains(metricsContent, "grpc_server_started_total") {
275+
t.Error("Expected grpc_server_started_total metric to be present")
276+
}
277+
})
278+
}
279+
280+
// TestMetricsServer_WithDefaultRegistryAndDefaultPort verifies that metrics server starts by default on :8080 with default registry with no input.
281+
func TestMetricsServer_WithDefaultRegistryAndDefaultPort(t *testing.T) {
282+
// Create mock server
283+
mockServer := &MockFunctionServer{
284+
rsp: &v1.RunFunctionResponse{
285+
Meta: &v1.ResponseMeta{Tag: "default-metrics-test"},
286+
},
287+
}
288+
289+
// Get ports
290+
grpcPort := getAvailablePort(t)
291+
// Should use default metrics port 8080
292+
metricsPort := 8080
293+
294+
serverDone := make(chan error, 1)
295+
go func() {
296+
err := Serve(mockServer,
297+
Listen("tcp", fmt.Sprintf(":%d", grpcPort)),
298+
Insecure(true),
299+
)
300+
serverDone <- err
301+
}()
302+
303+
// Wait for server to start
304+
time.Sleep(3 * time.Second)
305+
306+
t.Run("MetricsServerTest On DefaultPort With DefaultRegisrty", func(t *testing.T) {
307+
// Test gRPC connection
308+
conn, err := grpc.NewClient(fmt.Sprintf("localhost:%d", grpcPort),
309+
grpc.WithTransportCredentials(insecure.NewCredentials()))
310+
if err != nil {
311+
t.Fatalf("Failed to connect: %v", err)
312+
}
313+
defer conn.Close()
314+
315+
client := v1.NewFunctionRunnerServiceClient(conn)
316+
317+
// Make the request
318+
req := &v1.RunFunctionRequest{
319+
Meta: &v1.RequestMeta{Tag: "default-metrics-test"},
320+
}
321+
322+
_, err = client.RunFunction(context.Background(), req)
323+
if err != nil {
324+
t.Errorf("Request failed: %v", err)
325+
}
326+
327+
// Wait for metrics to be collected
328+
time.Sleep(2 * time.Second)
329+
330+
// Verify metrics endpoint is accessible
331+
metricsURL := fmt.Sprintf("http://localhost:%d/metrics", metricsPort)
332+
httpReq, err := http.NewRequestWithContext(context.Background(), http.MethodGet, metricsURL, nil)
333+
if err != nil {
334+
t.Fatalf("Failed to create request: %v", err)
335+
}
336+
resp, err := http.DefaultClient.Do(httpReq)
337+
if err != nil {
338+
t.Fatalf("Failed to get metrics: %v", err)
339+
}
340+
defer resp.Body.Close()
341+
342+
body, err := io.ReadAll(resp.Body)
343+
if err != nil {
344+
t.Fatalf("Failed to read metrics: %v", err)
345+
}
346+
347+
metricsContent := string(body)
348+
349+
// Verify metrics are present
350+
if !strings.Contains(metricsContent, "# HELP") {
351+
t.Error("Expected Prometheus format")
352+
}
353+
354+
// Verify gRPC metrics are present
355+
if !strings.Contains(metricsContent, "grpc_server_started_total") {
356+
t.Error("Expected grpc_server_started_total metric to be present")
357+
}
358+
})
359+
}
360+
361+
// Helper function to get an available port.
362+
func getAvailablePort(t *testing.T) int {
363+
t.Helper()
364+
365+
listener, err := net.Listen("tcp", ":0")
366+
if err != nil {
367+
t.Fatalf("Failed to get available port: %v", err)
368+
}
369+
defer listener.Close()
370+
return listener.Addr().(*net.TCPAddr).Port
371+
}

0 commit comments

Comments
 (0)