Skip to content

Commit a12a32c

Browse files
authored
Merge pull request kubernetes#127146 from bart0sh/PR156-DRA-Kubelet-latency
Kubelet: add DRA latency metrics
2 parents c3980f6 + c1cd849 commit a12a32c

File tree

6 files changed

+83
-12
lines changed

6 files changed

+83
-12
lines changed

pkg/kubelet/cm/dra/manager.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package dra
1919
import (
2020
"context"
2121
"fmt"
22+
"strconv"
2223
"time"
2324

2425
v1 "k8s.io/api/core/v1"
@@ -35,6 +36,7 @@ import (
3536
"k8s.io/kubernetes/pkg/kubelet/cm/dra/state"
3637
"k8s.io/kubernetes/pkg/kubelet/config"
3738
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
39+
"k8s.io/kubernetes/pkg/kubelet/metrics"
3840
)
3941

4042
// draManagerStateFileName is the file name where dra manager stores its state
@@ -150,6 +152,13 @@ func (m *ManagerImpl) reconcileLoop(ctx context.Context) {
150152
// for each new resource requirement, process their responses and update the cached
151153
// containerResources on success.
152154
func (m *ManagerImpl) PrepareResources(ctx context.Context, pod *v1.Pod) error {
155+
startTime := time.Now()
156+
err := m.prepareResources(ctx, pod)
157+
metrics.DRAOperationsDuration.WithLabelValues("PrepareResources", strconv.FormatBool(err == nil)).Observe(time.Since(startTime).Seconds())
158+
return err
159+
}
160+
161+
func (m *ManagerImpl) prepareResources(ctx context.Context, pod *v1.Pod) error {
153162
logger := klog.FromContext(ctx)
154163
batches := make(map[string][]*drapb.Claim)
155164
resourceClaims := make(map[types.UID]*resourceapi.ResourceClaim)
@@ -369,6 +378,10 @@ func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*Conta
369378
// As such, calls to the underlying NodeUnprepareResource API are skipped for claims that have
370379
// already been successfully unprepared.
371380
func (m *ManagerImpl) UnprepareResources(ctx context.Context, pod *v1.Pod) error {
381+
var err error = nil
382+
defer func(startTime time.Time) {
383+
metrics.DRAOperationsDuration.WithLabelValues("UnprepareResources", strconv.FormatBool(err != nil)).Observe(time.Since(startTime).Seconds())
384+
}(time.Now())
372385
var claimNames []string
373386
for i := range pod.Spec.ResourceClaims {
374387
claimName, _, err := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
@@ -383,7 +396,8 @@ func (m *ManagerImpl) UnprepareResources(ctx context.Context, pod *v1.Pod) error
383396
}
384397
claimNames = append(claimNames, *claimName)
385398
}
386-
return m.unprepareResources(ctx, pod.UID, pod.Namespace, claimNames)
399+
err = m.unprepareResources(ctx, pod.UID, pod.Namespace, claimNames)
400+
return err
387401
}
388402

389403
func (m *ManagerImpl) unprepareResources(ctx context.Context, podUID types.UID, namespace string, claimNames []string) error {

pkg/kubelet/cm/dra/plugin/plugin.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@ import (
2727
"google.golang.org/grpc"
2828
"google.golang.org/grpc/connectivity"
2929
"google.golang.org/grpc/credentials/insecure"
30+
"google.golang.org/grpc/status"
3031

3132
utilversion "k8s.io/apimachinery/pkg/util/version"
3233
"k8s.io/klog/v2"
3334
drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha4"
35+
"k8s.io/kubernetes/pkg/kubelet/metrics"
3436
)
3537

3638
// NewDRAPluginClient returns a wrapper around those gRPC methods of a DRA
@@ -51,6 +53,7 @@ func NewDRAPluginClient(pluginName string) (*Plugin, error) {
5153
}
5254

5355
type Plugin struct {
56+
name string
5457
backgroundCtx context.Context
5558
cancel func(cause error)
5659

@@ -85,6 +88,7 @@ func (p *Plugin) getOrCreateGRPCConn() (*grpc.ClientConn, error) {
8588
grpc.WithContextDialer(func(ctx context.Context, target string) (net.Conn, error) {
8689
return (&net.Dialer{}).DialContext(ctx, network, target)
8790
}),
91+
grpc.WithChainUnaryInterceptor(newMetricsInterceptor(p.name)),
8892
)
8993
if err != nil {
9094
return nil, err
@@ -144,3 +148,12 @@ func (p *Plugin) NodeUnprepareResources(
144148
logger.V(4).Info("Done calling NodeUnprepareResources rpc", "response", response, "err", err)
145149
return response, err
146150
}
151+
152+
func newMetricsInterceptor(pluginName string) grpc.UnaryClientInterceptor {
153+
return func(ctx context.Context, method string, req, reply any, conn *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
154+
start := time.Now()
155+
err := invoker(ctx, method, req, reply, conn, opts...)
156+
metrics.DRAGRPCOperationsDuration.WithLabelValues(pluginName, method, status.Code(err).String()).Observe(time.Since(start).Seconds())
157+
return err
158+
}
159+
}

pkg/kubelet/cm/dra/plugin/plugin_test.go

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,9 @@ func TestGRPCConnIsReused(t *testing.T) {
114114
wg := sync.WaitGroup{}
115115
m := sync.Mutex{}
116116

117+
pluginName := "dummy-plugin"
117118
p := &Plugin{
119+
name: pluginName,
118120
backgroundCtx: tCtx,
119121
endpoint: addr,
120122
clientCallTimeout: defaultClientCallTimeout,
@@ -132,15 +134,15 @@ func TestGRPCConnIsReused(t *testing.T) {
132134
}
133135

134136
// ensure the plugin we are using is registered
135-
draPlugins.add("dummy-plugin", p)
136-
defer draPlugins.delete("dummy-plugin")
137+
draPlugins.add(p)
138+
defer draPlugins.delete(pluginName)
137139

138140
// we call `NodePrepareResource` 2 times and check whether a new connection is created or the same is reused
139141
for i := 0; i < 2; i++ {
140142
wg.Add(1)
141143
go func() {
142144
defer wg.Done()
143-
client, err := NewDRAPluginClient("dummy-plugin")
145+
client, err := NewDRAPluginClient(pluginName)
144146
if err != nil {
145147
t.Error(err)
146148
return
@@ -205,7 +207,7 @@ func TestNewDRAPluginClient(t *testing.T) {
205207
{
206208
description: "plugin exists",
207209
setup: func(name string) tearDown {
208-
draPlugins.add(name, &Plugin{})
210+
draPlugins.add(&Plugin{name: name})
209211
return func() {
210212
draPlugins.delete(name)
211213
}
@@ -251,7 +253,9 @@ func TestNodeUnprepareResources(t *testing.T) {
251253
}
252254
defer teardown()
253255

256+
pluginName := "dummy-plugin"
254257
p := &Plugin{
258+
name: pluginName,
255259
backgroundCtx: tCtx,
256260
endpoint: addr,
257261
clientCallTimeout: defaultClientCallTimeout,
@@ -268,10 +272,10 @@ func TestNodeUnprepareResources(t *testing.T) {
268272
t.Fatal(err)
269273
}
270274

271-
draPlugins.add("dummy-plugin", p)
272-
defer draPlugins.delete("dummy-plugin")
275+
draPlugins.add(p)
276+
defer draPlugins.delete(pluginName)
273277

274-
client, err := NewDRAPluginClient("dummy-plugin")
278+
client, err := NewDRAPluginClient(pluginName)
275279
if err != nil {
276280
t.Fatal(err)
277281
}

pkg/kubelet/cm/dra/plugin/plugins_store.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,16 @@ func (s *pluginsStore) get(pluginName string) *Plugin {
4242

4343
// Set lets you save a DRA Plugin to the list and give it a specific name.
4444
// This method is protected by a mutex.
45-
func (s *pluginsStore) add(pluginName string, p *Plugin) (replaced bool) {
45+
func (s *pluginsStore) add(p *Plugin) (replaced bool) {
4646
s.Lock()
4747
defer s.Unlock()
4848

4949
if s.store == nil {
5050
s.store = make(map[string]*Plugin)
5151
}
5252

53-
_, exists := s.store[pluginName]
54-
s.store[pluginName] = p
53+
_, exists := s.store[p.name]
54+
s.store[p.name] = p
5555
return exists
5656
}
5757

pkg/kubelet/cm/dra/plugin/registration.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string,
160160
ctx, cancel := context.WithCancelCause(ctx)
161161

162162
pluginInstance := &Plugin{
163+
name: pluginName,
163164
backgroundCtx: ctx,
164165
cancel: cancel,
165166
conn: nil,
@@ -170,7 +171,7 @@ func (h *RegistrationHandler) RegisterPlugin(pluginName string, endpoint string,
170171

171172
// Storing endpoint of newly registered DRA Plugin into the map, where plugin name will be the key
172173
// all other DRA components will be able to get the actual socket of DRA plugins by its name.
173-
if draPlugins.add(pluginName, pluginInstance) {
174+
if draPlugins.add(pluginInstance) {
174175
logger.V(1).Info("Already registered, previous plugin was replaced")
175176
}
176177

pkg/kubelet/metrics/metrics.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
const (
3333
FirstNetworkPodStartSLIDurationKey = "first_network_pod_start_sli_duration_seconds"
3434
KubeletSubsystem = "kubelet"
35+
DRASubsystem = "dra"
3536
NodeNameKey = "node_name"
3637
NodeLabelKey = "node"
3738
NodeStartupPreKubeletKey = "node_startup_pre_kubelet_duration_seconds"
@@ -132,6 +133,10 @@ const (
132133
ContainerAlignedComputeResourcesScopeLabelKey = "scope"
133134
ContainerAlignedComputeResourcesBoundaryLabelKey = "boundary"
134135

136+
// Metric keys for DRA operations
137+
DRAOperationsDurationKey = "operations_duration_seconds"
138+
DRAGRPCOperationsDurationKey = "grpc_operations_duration_seconds"
139+
135140
// Values used in metric labels
136141
Container = "container"
137142
InitContainer = "init_container"
@@ -168,6 +173,11 @@ var (
168173
{60 * 1024 * 1024 * 1024, "60GB-100GB"},
169174
{100 * 1024 * 1024 * 1024, "GT100GB"},
170175
}
176+
// DRADurationBuckets is the bucket boundaries for DRA operation duration metrics
177+
// DRAOperationsDuration and DRAGRPCOperationsDuration defined below in this file.
178+
// The buckets max value 40 is based on the 45sec max gRPC timeout value defined
179+
// for the DRA gRPC calls in the pkg/kubelet/cm/dra/plugin/registration.go
180+
DRADurationBuckets = metrics.ExponentialBucketsRange(.1, 40, 15)
171181
)
172182

173183
var (
@@ -938,6 +948,30 @@ var (
938948
StabilityLevel: metrics.ALPHA,
939949
},
940950
)
951+
952+
// DRAOperationsDuration tracks the duration of the DRA PrepareResources and UnprepareResources requests.
953+
DRAOperationsDuration = metrics.NewHistogramVec(
954+
&metrics.HistogramOpts{
955+
Subsystem: DRASubsystem,
956+
Name: DRAOperationsDurationKey,
957+
Help: "Latency histogram in seconds for the duration of handling all ResourceClaims referenced by a pod when the pod starts or stops. Identified by the name of the operation (PrepareResources or UnprepareResources) and separated by the success of the operation. The number of failed operations is provided through the histogram's overall count.",
958+
Buckets: DRADurationBuckets,
959+
StabilityLevel: metrics.ALPHA,
960+
},
961+
[]string{"operation_name", "is_error"},
962+
)
963+
964+
// DRAGRPCOperationsDuration tracks the duration of the DRA GRPC operations.
965+
DRAGRPCOperationsDuration = metrics.NewHistogramVec(
966+
&metrics.HistogramOpts{
967+
Subsystem: DRASubsystem,
968+
Name: DRAGRPCOperationsDurationKey,
969+
Help: "Duration in seconds of the DRA gRPC operations",
970+
Buckets: DRADurationBuckets,
971+
StabilityLevel: metrics.ALPHA,
972+
},
973+
[]string{"driver_name", "method_name", "grpc_status_code"},
974+
)
941975
)
942976

943977
var registerMetrics sync.Once
@@ -1030,6 +1064,11 @@ func Register(collectors ...metrics.StableCollector) {
10301064
legacyregistry.MustRegister(LifecycleHandlerHTTPFallbacks)
10311065
legacyregistry.MustRegister(LifecycleHandlerSleepTerminated)
10321066
legacyregistry.MustRegister(CgroupVersion)
1067+
1068+
if utilfeature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation) {
1069+
legacyregistry.MustRegister(DRAOperationsDuration)
1070+
legacyregistry.MustRegister(DRAGRPCOperationsDuration)
1071+
}
10331072
})
10341073
}
10351074

0 commit comments

Comments
 (0)