Skip to content

Commit 7d65c91

Browse files
committed
feat(metrics): add cp metrics and OTEL tracing
Added metrics for copy operations following the exec metrics pattern: - hypeman_cp_sessions_total (labels: direction, status) - hypeman_cp_duration_seconds - hypeman_cp_bytes_total Also added OTEL span in CpHandler since WebSocket connections bypass the otelchi middleware for HTTP request tracing.
1 parent c0eea27 commit 7d65c91

File tree

2 files changed

+129
-24
lines changed

2 files changed

+129
-24
lines changed

cmd/api/api/cp.go

Lines changed: 63 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ import (
1414
"github.com/onkernel/hypeman/lib/instances"
1515
"github.com/onkernel/hypeman/lib/logger"
1616
mw "github.com/onkernel/hypeman/lib/middleware"
17+
"go.opentelemetry.io/otel"
18+
"go.opentelemetry.io/otel/attribute"
19+
"go.opentelemetry.io/otel/codes"
20+
"go.opentelemetry.io/otel/trace"
1721
)
1822

1923
// cpErrorSent wraps an error that has already been sent to the client.
@@ -139,6 +143,18 @@ func (s *ApiService) CpHandler(w http.ResponseWriter, r *http.Request) {
139143
}
140144
}
141145

146+
// Start OTEL span for tracing (WebSocket bypasses otelchi middleware)
147+
tracer := otel.Tracer("hypeman/cp")
148+
ctx, span := tracer.Start(ctx, "cp.session",
149+
trace.WithAttributes(
150+
attribute.String("instance_id", inst.Id),
151+
attribute.String("direction", cpReq.Direction),
152+
attribute.String("guest_path", cpReq.GuestPath),
153+
attribute.String("subject", subject),
154+
),
155+
)
156+
defer span.End()
157+
142158
log.InfoContext(ctx, "cp session started",
143159
"instance_id", inst.Id,
144160
"subject", subject,
@@ -147,18 +163,33 @@ func (s *ApiService) CpHandler(w http.ResponseWriter, r *http.Request) {
147163
)
148164

149165
var cpErr error
166+
var bytesTransferred int64
150167
switch cpReq.Direction {
151168
case "to":
152-
cpErr = s.handleCopyTo(ctx, ws, inst, cpReq)
169+
bytesTransferred, cpErr = s.handleCopyTo(ctx, ws, inst, cpReq)
153170
case "from":
154-
cpErr = s.handleCopyFrom(ctx, ws, inst, cpReq)
171+
bytesTransferred, cpErr = s.handleCopyFrom(ctx, ws, inst, cpReq)
155172
default:
156173
cpErr = fmt.Errorf("invalid direction: %s (must be 'to' or 'from')", cpReq.Direction)
157174
}
158175

159176
duration := time.Since(startTime)
177+
success := cpErr == nil
178+
179+
// Record metrics
180+
if guest.GuestMetrics != nil {
181+
guest.GuestMetrics.RecordCpSession(ctx, startTime, cpReq.Direction, success, bytesTransferred)
182+
}
183+
184+
// Update span with result
185+
span.SetAttributes(
186+
attribute.Int64("bytes_transferred", bytesTransferred),
187+
attribute.Bool("success", success),
188+
)
160189

161190
if cpErr != nil {
191+
span.RecordError(cpErr)
192+
span.SetStatus(codes.Error, cpErr.Error())
162193
log.ErrorContext(ctx, "cp failed",
163194
"error", cpErr,
164195
"instance_id", inst.Id,
@@ -174,25 +205,28 @@ func (s *ApiService) CpHandler(w http.ResponseWriter, r *http.Request) {
174205
return
175206
}
176207

208+
span.SetStatus(codes.Ok, "")
177209
log.InfoContext(ctx, "cp session ended",
178210
"instance_id", inst.Id,
179211
"subject", subject,
180212
"direction", cpReq.Direction,
181213
"duration_ms", duration.Milliseconds(),
214+
"bytes_transferred", bytesTransferred,
182215
)
183216
}
184217

185218
// handleCopyTo handles copying files from client to guest
186-
func (s *ApiService) handleCopyTo(ctx context.Context, ws *websocket.Conn, inst *instances.Instance, req CpRequest) error {
219+
// Returns the number of bytes transferred and any error.
220+
func (s *ApiService) handleCopyTo(ctx context.Context, ws *websocket.Conn, inst *instances.Instance, req CpRequest) (int64, error) {
187221
grpcConn, err := guest.GetOrCreateConnPublic(ctx, inst.VsockSocket)
188222
if err != nil {
189-
return fmt.Errorf("get grpc connection: %w", err)
223+
return 0, fmt.Errorf("get grpc connection: %w", err)
190224
}
191225

192226
client := guest.NewGuestServiceClient(grpcConn)
193227
stream, err := client.CopyToGuest(ctx)
194228
if err != nil {
195-
return fmt.Errorf("start copy stream: %w", err)
229+
return 0, fmt.Errorf("start copy stream: %w", err)
196230
}
197231

198232
// Send start message
@@ -215,18 +249,19 @@ func (s *ApiService) handleCopyTo(ctx context.Context, ws *websocket.Conn, inst
215249
},
216250
},
217251
}); err != nil {
218-
return fmt.Errorf("send start: %w", err)
252+
return 0, fmt.Errorf("send start: %w", err)
219253
}
220254

221255
// Read data chunks from WebSocket and forward to guest
222256
var receivedEndMessage bool
257+
var bytesSent int64
223258
for {
224259
msgType, data, err := ws.ReadMessage()
225260
if err != nil {
226261
if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseGoingAway) {
227262
break
228263
}
229-
return fmt.Errorf("read websocket: %w", err)
264+
return bytesSent, fmt.Errorf("read websocket: %w", err)
230265
}
231266

232267
if msgType == websocket.TextMessage {
@@ -243,27 +278,28 @@ func (s *ApiService) handleCopyTo(ctx context.Context, ws *websocket.Conn, inst
243278
if err := stream.Send(&guest.CopyToGuestRequest{
244279
Request: &guest.CopyToGuestRequest_Data{Data: data},
245280
}); err != nil {
246-
return fmt.Errorf("send data: %w", err)
281+
return bytesSent, fmt.Errorf("send data: %w", err)
247282
}
283+
bytesSent += int64(len(data))
248284
}
249285
}
250286

251287
// If the WebSocket closed without receiving an end message, the transfer is incomplete
252288
if !receivedEndMessage {
253-
return fmt.Errorf("client disconnected before completing transfer")
289+
return bytesSent, fmt.Errorf("client disconnected before completing transfer")
254290
}
255291

256292
// Send end message to guest
257293
if err := stream.Send(&guest.CopyToGuestRequest{
258294
Request: &guest.CopyToGuestRequest_End{End: &guest.CopyToGuestEnd{}},
259295
}); err != nil {
260-
return fmt.Errorf("send end: %w", err)
296+
return bytesSent, fmt.Errorf("send end: %w", err)
261297
}
262298

263299
// Get response
264300
resp, err := stream.CloseAndRecv()
265301
if err != nil {
266-
return fmt.Errorf("close stream: %w", err)
302+
return bytesSent, fmt.Errorf("close stream: %w", err)
267303
}
268304

269305
// Send result to client
@@ -278,16 +314,17 @@ func (s *ApiService) handleCopyTo(ctx context.Context, ws *websocket.Conn, inst
278314

279315
if !resp.Success {
280316
// Return a wrapped error so the caller logs it correctly but doesn't send a duplicate
281-
return &cpErrorSent{err: fmt.Errorf("copy to guest failed: %s", resp.Error)}
317+
return resp.BytesWritten, &cpErrorSent{err: fmt.Errorf("copy to guest failed: %s", resp.Error)}
282318
}
283-
return nil
319+
return resp.BytesWritten, nil
284320
}
285321

286322
// handleCopyFrom handles copying files from guest to client
287-
func (s *ApiService) handleCopyFrom(ctx context.Context, ws *websocket.Conn, inst *instances.Instance, req CpRequest) error {
323+
// Returns the number of bytes transferred and any error.
324+
func (s *ApiService) handleCopyFrom(ctx context.Context, ws *websocket.Conn, inst *instances.Instance, req CpRequest) (int64, error) {
288325
grpcConn, err := guest.GetOrCreateConnPublic(ctx, inst.VsockSocket)
289326
if err != nil {
290-
return fmt.Errorf("get grpc connection: %w", err)
327+
return 0, fmt.Errorf("get grpc connection: %w", err)
291328
}
292329

293330
client := guest.NewGuestServiceClient(grpcConn)
@@ -296,10 +333,11 @@ func (s *ApiService) handleCopyFrom(ctx context.Context, ws *websocket.Conn, ins
296333
FollowLinks: req.FollowLinks,
297334
})
298335
if err != nil {
299-
return fmt.Errorf("start copy stream: %w", err)
336+
return 0, fmt.Errorf("start copy stream: %w", err)
300337
}
301338

302339
var receivedFinal bool
340+
var bytesReceived int64
303341

304342
// Stream responses to WebSocket client
305343
for {
@@ -308,7 +346,7 @@ func (s *ApiService) handleCopyFrom(ctx context.Context, ws *websocket.Conn, ins
308346
break
309347
}
310348
if err != nil {
311-
return fmt.Errorf("receive: %w", err)
349+
return bytesReceived, fmt.Errorf("receive: %w", err)
312350
}
313351

314352
switch r := resp.Response.(type) {
@@ -327,13 +365,14 @@ func (s *ApiService) handleCopyFrom(ctx context.Context, ws *websocket.Conn, ins
327365
}
328366
headerJSON, _ := json.Marshal(header)
329367
if err := ws.WriteMessage(websocket.TextMessage, headerJSON); err != nil {
330-
return fmt.Errorf("write header: %w", err)
368+
return bytesReceived, fmt.Errorf("write header: %w", err)
331369
}
332370

333371
case *guest.CopyFromGuestResponse_Data:
334372
if err := ws.WriteMessage(websocket.BinaryMessage, r.Data); err != nil {
335-
return fmt.Errorf("write data: %w", err)
373+
return bytesReceived, fmt.Errorf("write data: %w", err)
336374
}
375+
bytesReceived += int64(len(r.Data))
337376

338377
case *guest.CopyFromGuestResponse_End:
339378
endMarker := CpEndMarker{
@@ -342,11 +381,11 @@ func (s *ApiService) handleCopyFrom(ctx context.Context, ws *websocket.Conn, ins
342381
}
343382
endJSON, _ := json.Marshal(endMarker)
344383
if err := ws.WriteMessage(websocket.TextMessage, endJSON); err != nil {
345-
return fmt.Errorf("write end: %w", err)
384+
return bytesReceived, fmt.Errorf("write end: %w", err)
346385
}
347386
if r.End.Final {
348387
receivedFinal = true
349-
return nil
388+
return bytesReceived, nil
350389
}
351390

352391
case *guest.CopyFromGuestResponse_Error:
@@ -358,13 +397,13 @@ func (s *ApiService) handleCopyFrom(ctx context.Context, ws *websocket.Conn, ins
358397
errJSON, _ := json.Marshal(cpErr)
359398
ws.WriteMessage(websocket.TextMessage, errJSON)
360399
// Return a wrapped error so the caller logs it correctly but doesn't send a duplicate
361-
return &cpErrorSent{err: fmt.Errorf("copy from guest failed: %s", r.Error.Message)}
400+
return bytesReceived, &cpErrorSent{err: fmt.Errorf("copy from guest failed: %s", r.Error.Message)}
362401
}
363402
}
364403

365404
if !receivedFinal {
366-
return fmt.Errorf("copy stream ended without completion marker")
405+
return bytesReceived, fmt.Errorf("copy stream ended without completion marker")
367406
}
368-
return nil
407+
return bytesReceived, nil
369408
}
370409

lib/guest/metrics.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ type Metrics struct {
1414
execDuration metric.Float64Histogram
1515
execBytesSentTotal metric.Int64Counter
1616
execBytesReceivedTotal metric.Int64Counter
17+
18+
cpSessionsTotal metric.Int64Counter
19+
cpDuration metric.Float64Histogram
20+
cpBytesTotal metric.Int64Counter
1721
}
1822

1923
// GuestMetrics is the global metrics instance for the guest package.
@@ -67,11 +71,40 @@ func NewMetrics(meter metric.Meter) (*Metrics, error) {
6771
return nil, err
6872
}
6973

74+
cpSessionsTotal, err := meter.Int64Counter(
75+
"hypeman_cp_sessions_total",
76+
metric.WithDescription("Total number of cp (copy) sessions"),
77+
)
78+
if err != nil {
79+
return nil, err
80+
}
81+
82+
cpDuration, err := meter.Float64Histogram(
83+
"hypeman_cp_duration_seconds",
84+
metric.WithDescription("Copy operation duration"),
85+
metric.WithUnit("s"),
86+
)
87+
if err != nil {
88+
return nil, err
89+
}
90+
91+
cpBytesTotal, err := meter.Int64Counter(
92+
"hypeman_cp_bytes_total",
93+
metric.WithDescription("Total bytes transferred during copy operations"),
94+
metric.WithUnit("By"),
95+
)
96+
if err != nil {
97+
return nil, err
98+
}
99+
70100
return &Metrics{
71101
execSessionsTotal: execSessionsTotal,
72102
execDuration: execDuration,
73103
execBytesSentTotal: execBytesSentTotal,
74104
execBytesReceivedTotal: execBytesReceivedTotal,
105+
cpSessionsTotal: cpSessionsTotal,
106+
cpDuration: cpDuration,
107+
cpBytesTotal: cpBytesTotal,
75108
}, nil
76109
}
77110

@@ -104,3 +137,36 @@ func (m *Metrics) RecordExecSession(ctx context.Context, start time.Time, exitCo
104137
}
105138
}
106139

140+
// RecordCpSession records metrics for a completed cp (copy) session.
141+
// direction should be "to" (copy to instance) or "from" (copy from instance).
142+
func (m *Metrics) RecordCpSession(ctx context.Context, start time.Time, direction string, success bool, bytesTransferred int64) {
143+
if m == nil {
144+
return
145+
}
146+
147+
duration := time.Since(start).Seconds()
148+
status := "success"
149+
if !success {
150+
status = "error"
151+
}
152+
153+
m.cpSessionsTotal.Add(ctx, 1,
154+
metric.WithAttributes(
155+
attribute.String("direction", direction),
156+
attribute.String("status", status),
157+
))
158+
159+
m.cpDuration.Record(ctx, duration,
160+
metric.WithAttributes(
161+
attribute.String("direction", direction),
162+
attribute.String("status", status),
163+
))
164+
165+
if bytesTransferred > 0 {
166+
m.cpBytesTotal.Add(ctx, bytesTransferred,
167+
metric.WithAttributes(
168+
attribute.String("direction", direction),
169+
))
170+
}
171+
}
172+

0 commit comments

Comments
 (0)