Skip to content

Commit f1a23dc

Browse files
authored
Context-Based Telemetry
This commit enables Trident to gather unique metrics for incoming and outgoing API requests.
1 parent d990d70 commit f1a23dc

39 files changed

+2456
-412
lines changed

cli/k8s_client/client_factory.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1-
// Copyright 2021 NetApp, Inc. All Rights Reserved.
1+
// Copyright 2025 NetApp, Inc. All Rights Reserved.
22

33
package k8sclient
44

55
import (
66
"context"
77
"fmt"
8+
"net/http"
89
"os"
910
"os/exec"
1011
"strings"
@@ -82,6 +83,22 @@ func CreateK8SClients(masterURL, kubeConfigPath, overrideNamespace string) (*Cli
8283
return nil, err
8384
}
8485

86+
// Register client-go metrics adapters to feed Trident telemeters
87+
registerK8sClientGoMetricsAdapter()
88+
89+
// Wrap all interaction with K8s in a metrics transport http.RoundTripper.
90+
clients.RestConfig.Wrap(func(rt http.RoundTripper) http.RoundTripper {
91+
return NewMetricsTransport(
92+
rt,
93+
WithMetricsTransportTarget(ContextRequestTargetKubernetes),
94+
WithMetricsTransportTelemeters(
95+
// client-go/metrics is configured to track certain metrics, so we avoid duplicating those here.
96+
OutgoingAPIRequestDurationTelemeter,
97+
OutgoingAPIRequestInFlightTelemeter,
98+
),
99+
)
100+
})
101+
85102
// Create the Kubernetes client
86103
clients.KubeClient, err = kubernetes.NewForConfig(clients.RestConfig)
87104
if err != nil {

cli/k8s_client/metrics.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
// Copyright 2025 NetApp, Inc. All Rights Reserved.
2+
3+
package k8sclient
4+
5+
import (
6+
"context"
7+
"fmt"
8+
"net/http"
9+
"net/url"
10+
"strconv"
11+
"strings"
12+
"time"
13+
14+
"k8s.io/client-go/tools/metrics"
15+
16+
. "github.com/netapp/trident/logging"
17+
"github.com/netapp/trident/utils/errors"
18+
)
19+
20+
// registerK8sClientGoMetricsAdapter registers client-go metric adapters to feed Trident telemeters.
21+
// This is currently limited to rate limiter latency and request retries.
22+
func registerK8sClientGoMetricsAdapter() {
23+
metrics.Register(metrics.RegisterOpts{
24+
RateLimiterLatency: rateLimiterLatencyAdapter{},
25+
RequestRetry: requestRetryAdapter{},
26+
})
27+
}
28+
29+
// rateLimiterLatencyAdapter plugs Trident telemeters into client-go rate limiter latency metrics.
30+
// This reflects the amount of time the client had to wait for a token from the limiter.
31+
// This is directly indicates throttling due to QPS/burst limits.
32+
type rateLimiterLatencyAdapter struct{}
33+
34+
func (rateLimiterLatencyAdapter) Observe(ctx context.Context, verb string, u url.URL, latency time.Duration) {
35+
rec := NewContextBuilder(ctx).
36+
WithTarget(ContextRequestTargetKubernetes).
37+
WithAddress(u.Host).
38+
WithMethod(verb).
39+
WithDuration(latency).
40+
WithTelemetry(OutgoingAPIRequestLimitedDurationTelemeter).
41+
BuildTelemetry()
42+
43+
err := errors.TooManyRequestsError("client-go rate limiter wait %s", latency.String())
44+
rec(&err)
45+
}
46+
47+
// requestRetryAdapter plugs Trident telemeters into client-go request retry metrics.
48+
// This adapter only captures retries, not initial requests.
49+
// Each retry indicates a failure that triggered the retry.
50+
type requestRetryAdapter struct{}
51+
52+
func (requestRetryAdapter) IncrementRetry(ctx context.Context, code string, method string, host string) {
53+
rec := NewContextBuilder(ctx).
54+
WithTarget(ContextRequestTargetKubernetes).
55+
WithAddress(host).
56+
WithMethod(method).
57+
WithTelemetry(OutgoingAPIRequestRetryTotalTelemeter).
58+
BuildTelemetry()
59+
60+
var err error
61+
// Only retries triggered by errors are counted.
62+
if err = assertErrorForCode(code); err != nil {
63+
// Assign to the outer err so the deferred recorder observes the retry
64+
err = errors.WrapWithMustRetryError(err, "retry triggered after http status: %s", code)
65+
}
66+
rec(&err)
67+
}
68+
69+
// assertErrorForCode returns nil for 2xx/3xx HTTP status codes, error otherwise.
70+
func assertErrorForCode(code string) error {
71+
c := strings.TrimSpace(code)
72+
if c == "" {
73+
return fmt.Errorf("missing http status code")
74+
}
75+
76+
// Require a numeric HTTP status code.
77+
n, err := strconv.Atoi(c)
78+
if err != nil {
79+
return fmt.Errorf("invalid http status %s", c)
80+
}
81+
82+
// Validate range: HTTP status codes are 100–599.
83+
if n < 100 || n > 599 {
84+
return fmt.Errorf("invalid http status %d", n)
85+
}
86+
87+
// Return success for codes in the range: [200, 399], 200 <= n <= 399.
88+
if http.StatusOK <= n && n < http.StatusBadRequest {
89+
return nil
90+
}
91+
92+
return fmt.Errorf("http status %d", n)
93+
}

cli/k8s_client/metrics_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
// Copyright 2025 NetApp, Inc. All Rights Reserved.
2+
3+
package k8sclient
4+
5+
import (
6+
"testing"
7+
8+
"github.com/stretchr/testify/assert"
9+
)
10+
11+
func TestAssertErrorForCode(t *testing.T) {
12+
tests := map[string]struct {
13+
code string
14+
assertErr assert.ErrorAssertionFunc
15+
}{
16+
"with status 200": {code: "200", assertErr: assert.NoError},
17+
"with status 302": {code: "302", assertErr: assert.NoError},
18+
"with status 404": {code: "404", assertErr: assert.Error},
19+
"with status <empty>": {code: " ", assertErr: assert.Error},
20+
"with status xyz": {code: "xyz", assertErr: assert.Error},
21+
"with status 99": {code: "99", assertErr: assert.Error},
22+
"with status 600": {code: "600", assertErr: assert.Error},
23+
}
24+
25+
for name, tc := range tests {
26+
t.Run(name, func(t *testing.T) {
27+
err := assertErrorForCode(tc.code)
28+
tc.assertErr(t, err)
29+
})
30+
}
31+
}

core/concurrent_cache/concurrent_cache.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
// Copyright 2025 NetApp, Inc. All Rights Reserved.
2+
13
// Package concurrent_cache provides similar functionality as concurrent_cache but with half the calories
24
// Resources/objects/whatever are always locked and unlocked in the same order, by list, then type, then id.
35
// One query may only contain list operations and one "tree" of resources.
@@ -12,6 +14,7 @@ import (
1214
"slices"
1315
"sync"
1416

17+
. "github.com/netapp/trident/logging"
1518
"github.com/netapp/trident/pkg/locks"
1619
"github.com/netapp/trident/storage"
1720
storageclass "github.com/netapp/trident/storage_class"
@@ -197,22 +200,24 @@ func Query(query ...Subquery) []Subquery {
197200
// 5. Merge all queries
198201
// Returns sorted list of subqueries
199202
// 6. Acquire locks and fill in Results
200-
func Lock(ctx context.Context, queries ...[]Subquery) ([]Result, func(), error) {
203+
func Lock(ctx context.Context, queries ...[]Subquery) (results []Result, unlocker func(), err error) {
204+
ctx = NewContextBuilder(ctx).WithLayer(LogLayerCoreCache).BuildContext()
205+
201206
roots := make([][]int, len(queries))
202207
cachesPresent := make(map[resource]struct{}, resourceCount)
203208
// phase 1, requires no locks, check errors, dedupe, and build trees
204-
if err := assembleQueries(queries, roots, cachesPresent); err != nil {
209+
if err = assembleQueries(queries, roots, cachesPresent); err != nil {
205210
return nil, func() {}, err
206211
}
207212

208213
// phase 2, requires locks, fill in IDs
209-
if err := lockCachesAndFillInIDs(queries, roots, cachesPresent); err != nil {
214+
if err = lockCachesAndFillInIDs(queries, roots, cachesPresent); err != nil {
210215
return nil, func() {}, err
211216
}
212217

213218
// phase 3, takes per-resource locks
214219
merged := mergeQueries(queries)
215-
results, unlocker, err := lockQuery(merged, len(queries))
220+
results, unlocker, err = lockQuery(merged, len(queries))
216221
if err == nil && ctx.Err() != nil {
217222
err = ctx.Err()
218223
}

core/concurrent_core.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1831,7 +1831,7 @@ func (o *ConcurrentTridentOrchestrator) RemoveBackendConfigRef(
18311831
func (o *ConcurrentTridentOrchestrator) AddVolume(
18321832
ctx context.Context, volumeConfig *storage.VolumeConfig,
18331833
) (volExternal *storage.VolumeExternal, err error) {
1834-
ctx = GenerateRequestContextForLayer(ctx, LogLayerCore)
1834+
ctx = NewContextBuilder(ctx).WithLayer(LogLayerCore).BuildContext()
18351835

18361836
if o.bootstrapError != nil {
18371837
Logc(ctx).WithField("bootstrapError", o.bootstrapError).Warn("AddVolume error.")
@@ -2601,7 +2601,7 @@ func (o *ConcurrentTridentOrchestrator) DetachVolume(ctx context.Context, volume
26012601

26022602
// DeleteVolume removes a volume from storage, persistence, and cache.
26032603
func (o *ConcurrentTridentOrchestrator) DeleteVolume(ctx context.Context, volumeName string) (err error) {
2604-
ctx = GenerateRequestContextForLayer(ctx, LogLayerCore)
2604+
ctx = NewContextBuilder(ctx).WithLayer(LogLayerCore).BuildContext()
26052605

26062606
if o.bootstrapError != nil {
26072607
return o.bootstrapError

core/orchestrator_core.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1907,7 +1907,7 @@ func (o *TridentOrchestrator) RemoveBackendConfigRef(ctx context.Context, backen
19071907
func (o *TridentOrchestrator) AddVolume(
19081908
ctx context.Context, volumeConfig *storage.VolumeConfig,
19091909
) (externalVol *storage.VolumeExternal, err error) {
1910-
ctx = GenerateRequestContextForLayer(ctx, LogLayerCore)
1910+
ctx = NewContextBuilder(ctx).WithLayer(LogLayerCore).BuildContext()
19111911

19121912
if o.bootstrapError != nil {
19131913
return nil, o.bootstrapError

frontend/csi/controller_server.go

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"context"
77
"fmt"
88
"math"
9+
"net/http"
910
"reflect"
1011
"strconv"
1112
"strings"
@@ -32,9 +33,20 @@ import (
3233

3334
func (p *Plugin) CreateVolume(
3435
ctx context.Context, req *csi.CreateVolumeRequest,
35-
) (*csi.CreateVolumeResponse, error) {
36-
ctx = SetContextWorkflow(ctx, WorkflowVolumeCreate)
37-
ctx = GenerateRequestContextForLayer(ctx, LogLayerCSIFrontend)
36+
) (res *csi.CreateVolumeResponse, err error) {
37+
ctx, rec := NewContextBuilder(ctx).
38+
WithWorkflow(WorkflowVolumeCreate).
39+
WithLayer(LogLayerCSIFrontend).
40+
WithSource(ContextSourceCSI).
41+
WithClient(ContextRequestClientCSIProvisioner).
42+
WithRoute(csi.Controller_CreateVolume_FullMethodName).
43+
WithMethod(http.MethodPost).
44+
WithTelemetry(
45+
IncomingAPIRequestInFlightTelemeter,
46+
IncomingAPIRequestDurationTelemeter,
47+
).
48+
BuildContextAndTelemetry()
49+
defer rec(&err)
3850

3951
fields := LogFields{"Method": "CreateVolume", "Type": "CSI_Controller", "name": req.Name}
4052
Logc(ctx).WithFields(fields).Debug(">>>> CreateVolume")
@@ -285,9 +297,20 @@ func (p *Plugin) CreateVolume(
285297

286298
func (p *Plugin) DeleteVolume(
287299
ctx context.Context, req *csi.DeleteVolumeRequest,
288-
) (*csi.DeleteVolumeResponse, error) {
289-
ctx = SetContextWorkflow(ctx, WorkflowVolumeDelete)
290-
ctx = GenerateRequestContextForLayer(ctx, LogLayerCSIFrontend)
300+
) (res *csi.DeleteVolumeResponse, err error) {
301+
ctx, rec := NewContextBuilder(ctx).
302+
WithWorkflow(WorkflowVolumeDelete).
303+
WithLayer(LogLayerCSIFrontend).
304+
WithSource(ContextSourceCSI).
305+
WithClient(ContextRequestClientCSIProvisioner).
306+
WithRoute(csi.Controller_DeleteVolume_FullMethodName).
307+
WithMethod(http.MethodDelete).
308+
WithTelemetry(
309+
IncomingAPIRequestInFlightTelemeter,
310+
IncomingAPIRequestDurationTelemeter,
311+
).
312+
BuildContextAndTelemetry()
313+
defer rec(&err)
291314

292315
fields := LogFields{"Method": "DeleteVolume", "Type": "CSI_Controller"}
293316
Logc(ctx).WithFields(fields).Debug(">>>> DeleteVolume")

frontend/csi/utils.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,9 @@ func NewGroupControllerServiceCapability(
6969
}
7070

7171
// logGRPC is a unary interceptor that logs GRPC requests.
72-
func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{},
73-
error,
74-
) {
72+
func logGRPC(
73+
ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler,
74+
) (interface{}, error) {
7575
ctx = GenerateRequestContext(ctx, "", ContextSourceCSI, WorkflowNone, LogLayerCSIFrontend)
7676
Audit().Logf(ctx, AuditGRPCAccess, LogFields{}, "GRPC call: %s", info.FullMethod)
7777
logFields := LogFields{
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
// Copyright 2025 NetApp, Inc. All Rights Reserved.
2+
13
package logging
24

35
import (
@@ -8,6 +10,14 @@ const auditKey = "audit"
810

911
var auditor AuditLogger
1012

13+
type AuditEvent string
14+
15+
type AuditLogger interface {
16+
Log(ctx context.Context, event AuditEvent, fields LogFields, message string)
17+
Logln(ctx context.Context, event AuditEvent, fields LogFields, message string)
18+
Logf(ctx context.Context, event AuditEvent, fields LogFields, format string, args ...interface{})
19+
}
20+
1121
type auditLogger struct {
1222
enabled bool
1323
}

0 commit comments

Comments
 (0)