Skip to content

Commit 2b29315

Browse files
authored
chore: extend the default otelgrpc metric with resume (#2293)
* chore: extend the default otelgrpc metric with resume * fix: lint * fix: make is resume metric optional
1 parent 33bb4c3 commit 2b29315

File tree

4 files changed

+70
-13
lines changed

4 files changed

+70
-13
lines changed

packages/api/internal/orchestrator/nodemanager/metadata.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ package nodemanager
22

33
import (
44
"context"
5+
"strconv"
56

67
"google.golang.org/grpc/metadata"
78

89
"github.com/e2b-dev/infra/packages/api/internal/clusters"
910
"github.com/e2b-dev/infra/packages/shared/pkg/edge"
11+
grpcshared "github.com/e2b-dev/infra/packages/shared/pkg/grpc"
1012
"github.com/e2b-dev/infra/packages/shared/pkg/grpc/orchestrator"
1113
)
1214

@@ -49,6 +51,10 @@ func (n *Node) GetSandboxCreateCtx(ctx context.Context, req *orchestrator.Sandbo
4951
)
5052
}
5153

54+
// Pass snapshot (is_resume) via metadata so the server-side stats handler
55+
// can include it in otelgrpc metric attributes during TagRPC.
56+
md.Set(grpcshared.IsResumeMetadataKey, strconv.FormatBool(req.GetSandbox().GetSnapshot()))
57+
5258
// Merge medata from client (auth, routing with service instance id) and event metadata.
5359
return n.client, appendMetadataCtx(ctx, md)
5460
}

packages/orchestrator/pkg/factories/run.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -602,7 +602,7 @@ func run(config cfg.Config, opts Options) (success bool) {
602602
})
603603
closers = append(closers, closer{"hyperloop server", hyperloopSrv.Shutdown})
604604

605-
grpcServer := e2bgrpc.NewGRPCServer(tel)
605+
grpcServer := e2bgrpc.NewGRPCServer(tel, e2bgrpc.WithSandboxResumeMetrics())
606606
orchestrator.RegisterSandboxServiceServer(grpcServer, orchestratorService)
607607
orchestrator.RegisterVolumeServiceServer(grpcServer, volumeService)
608608
orchestrator.RegisterChunkServiceServer(grpcServer, orchestratorService)

packages/shared/pkg/grpc/filter.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@ import (
66
"google.golang.org/grpc/stats"
77
)
88

9+
// IsResumeMetadataKey is the gRPC metadata key used to pass the is_resume/snapshot
10+
// value from the API client to the orchestrator server. This allows the server-side
11+
// otelgrpc stats handler to include sandbox.resume in metric attributes during TagRPC,
12+
// before the request payload is available.
13+
const IsResumeMetadataKey = "x-sandbox-resume"
14+
915
type noTraceKey struct{}
1016

1117
var noTrace = struct{}{}

packages/shared/pkg/grpc/server.go

Lines changed: 57 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,42 @@
11
package grpc
22

33
import (
4+
"context"
45
"time"
56

67
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
78
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
89
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/selector"
910
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
11+
"go.opentelemetry.io/otel/attribute"
1012
"google.golang.org/grpc"
1113
"google.golang.org/grpc/keepalive"
14+
"google.golang.org/grpc/metadata"
1215

1316
"github.com/e2b-dev/infra/packages/shared/pkg/logger"
1417
"github.com/e2b-dev/infra/packages/shared/pkg/telemetry"
1518
)
1619

17-
func NewGRPCServer(tel *telemetry.Client) *grpc.Server {
18-
opts := []logging.Option{
20+
// ServerOption configures NewGRPCServer.
21+
type ServerOption func(*serverOptions)
22+
23+
type serverOptions struct {
24+
withSandboxResumeMetrics bool
25+
}
26+
27+
// WithSandboxResumeMetrics adds sandbox.resume attribute to otelgrpc metrics,
28+
// read from incoming gRPC metadata.
29+
func WithSandboxResumeMetrics() ServerOption {
30+
return func(o *serverOptions) { o.withSandboxResumeMetrics = true }
31+
}
32+
33+
func NewGRPCServer(tel *telemetry.Client, opts ...ServerOption) *grpc.Server {
34+
var cfg serverOptions
35+
for _, o := range opts {
36+
o(&cfg)
37+
}
38+
39+
logOpts := []logging.Option{
1940
logging.WithLogOnEvents(logging.StartCall, logging.PayloadReceived, logging.PayloadSent, logging.FinishCall),
2041
logging.WithLevels(logging.DefaultServerCodeToLevel),
2142
logging.WithFieldsFromContext(logging.ExtractFields),
@@ -28,33 +49,57 @@ func NewGRPCServer(tel *telemetry.Client) *grpc.Server {
2849
"/InfoService/ServiceInfo",
2950
)
3051

52+
otelOpts := []otelgrpc.Option{
53+
otelgrpc.WithTracerProvider(tel.TracerProvider),
54+
otelgrpc.WithMeterProvider(tel.MeterProvider),
55+
}
56+
if cfg.withSandboxResumeMetrics {
57+
otelOpts = append(otelOpts, otelgrpc.WithMetricAttributesFn(extractSandboxResumeAttrs))
58+
}
59+
3160
return grpc.NewServer(
3261
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
33-
MinTime: 5 * time.Second, // Minimum time between pings from client
34-
PermitWithoutStream: true, // Allow pings even when no active streams
62+
MinTime: 5 * time.Second,
63+
PermitWithoutStream: true,
3564
}),
3665
grpc.KeepaliveParams(keepalive.ServerParameters{
37-
Time: 15 * time.Second, // Server sends keepalive pings every 15s
38-
Timeout: 5 * time.Second, // Wait 5s for response before considering dead
66+
Time: 15 * time.Second,
67+
Timeout: 5 * time.Second,
3968
}),
4069
grpc.StatsHandler(
4170
NewStatsWrapper(
42-
otelgrpc.NewServerHandler(
43-
otelgrpc.WithTracerProvider(tel.TracerProvider),
44-
otelgrpc.WithMeterProvider(tel.MeterProvider),
45-
))),
71+
otelgrpc.NewServerHandler(otelOpts...))),
4672
grpc.ChainUnaryInterceptor(
4773
recovery.UnaryServerInterceptor(),
4874
selector.UnaryServerInterceptor(
49-
logging.UnaryServerInterceptor(logger.GRPCLogger(logger.L()), opts...),
75+
logging.UnaryServerInterceptor(logger.GRPCLogger(logger.L()), logOpts...),
5076
ignoredLoggingRoutes,
5177
),
5278
),
5379
grpc.ChainStreamInterceptor(
5480
selector.StreamServerInterceptor(
55-
logging.StreamServerInterceptor(logger.GRPCLogger(logger.L()), opts...),
81+
logging.StreamServerInterceptor(logger.GRPCLogger(logger.L()), logOpts...),
5682
ignoredLoggingRoutes,
5783
),
5884
),
5985
)
6086
}
87+
88+
// extractSandboxResumeAttrs reads sandbox.resume from gRPC metadata set by the
89+
// API client. Called by otelgrpc during TagRPC — before the request payload is
90+
// deserialized — so we use metadata instead of the payload.
91+
func extractSandboxResumeAttrs(ctx context.Context) []attribute.KeyValue {
92+
md, ok := metadata.FromIncomingContext(ctx)
93+
if !ok {
94+
return nil
95+
}
96+
97+
values := md.Get(IsResumeMetadataKey)
98+
if len(values) == 0 {
99+
return nil
100+
}
101+
102+
return []attribute.KeyValue{
103+
attribute.Bool("sandbox.resume", values[0] == "true"),
104+
}
105+
}

0 commit comments

Comments
 (0)