Skip to content

Commit bf75546

Browse files
authored
Merge pull request kubernetes#128432 from zhifei92/integrating-health-check
Integrate device plugin registration gRPC server health checks.
2 parents ce81cc7 + 1381e41 commit bf75546

13 files changed

+104
-12
lines changed

pkg/kubelet/cm/container_manager.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828

2929
// TODO: Migrate kubelet to either use its own internal objects or client library.
3030
v1 "k8s.io/api/core/v1"
31+
"k8s.io/apiserver/pkg/server/healthz"
3132
internalapi "k8s.io/cri-api/pkg/apis"
3233
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
3334
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
@@ -122,6 +123,10 @@ type ContainerManager interface {
122123
// registration.
123124
GetPluginRegistrationHandlers() map[string]cache.PluginHandler
124125

126+
// GetHealthCheckers returns a set of health checkers for all plugins.
127+
// These checkers are integrated into the systemd watchdog to monitor the service's health.
128+
GetHealthCheckers() []healthz.HealthChecker
129+
125130
// ShouldResetExtendedResourceCapacity returns whether or not the extended resources should be zeroed,
126131
// due to node recreation.
127132
ShouldResetExtendedResourceCapacity() bool

pkg/kubelet/cm/container_manager_linux.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
utilerrors "k8s.io/apimachinery/pkg/util/errors"
4242
"k8s.io/apimachinery/pkg/util/sets"
4343
"k8s.io/apimachinery/pkg/util/wait"
44+
"k8s.io/apiserver/pkg/server/healthz"
4445
utilfeature "k8s.io/apiserver/pkg/util/feature"
4546
clientset "k8s.io/client-go/kubernetes"
4647
"k8s.io/client-go/tools/record"
@@ -661,6 +662,10 @@ func (cm *containerManagerImpl) GetPluginRegistrationHandlers() map[string]cache
661662
return res
662663
}
663664

665+
func (cm *containerManagerImpl) GetHealthCheckers() []healthz.HealthChecker {
666+
return []healthz.HealthChecker{cm.deviceManager.GetHealthChecker()}
667+
}
668+
664669
// TODO: move the GetResources logic to PodContainerManager.
665670
func (cm *containerManagerImpl) GetResources(ctx context.Context, pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {
666671
logger := klog.FromContext(ctx)

pkg/kubelet/cm/container_manager_stub.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525

2626
"k8s.io/apimachinery/pkg/api/resource"
2727
"k8s.io/apimachinery/pkg/types"
28+
"k8s.io/apiserver/pkg/server/healthz"
2829
internalapi "k8s.io/cri-api/pkg/apis"
2930
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
3031
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
@@ -95,6 +96,10 @@ func (cm *containerManagerStub) GetPluginRegistrationHandlers() map[string]cache
9596
return nil
9697
}
9798

99+
func (cm *containerManagerStub) GetHealthCheckers() []healthz.HealthChecker {
100+
return []healthz.HealthChecker{}
101+
}
102+
98103
func (cm *containerManagerStub) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {
99104
return cm.extendedPluginResources, cm.extendedPluginResources, []string{}
100105
}

pkg/kubelet/cm/container_manager_windows.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
v1 "k8s.io/api/core/v1"
3737
"k8s.io/apimachinery/pkg/api/resource"
3838
"k8s.io/apimachinery/pkg/types"
39+
"k8s.io/apiserver/pkg/server/healthz"
3940
clientset "k8s.io/client-go/kubernetes"
4041
"k8s.io/client-go/tools/record"
4142
internalapi "k8s.io/cri-api/pkg/apis"
@@ -224,6 +225,10 @@ func (cm *containerManagerImpl) GetPluginRegistrationHandlers() map[string]cache
224225
return map[string]cache.PluginHandler{pluginwatcherapi.DevicePlugin: cm.deviceManager.GetWatcherHandler()}
225226
}
226227

228+
func (cm *containerManagerImpl) GetHealthCheckers() []healthz.HealthChecker {
229+
return []healthz.HealthChecker{cm.deviceManager.GetHealthChecker()}
230+
}
231+
227232
func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {
228233
return cm.deviceManager.GetCapacity()
229234
}

pkg/kubelet/cm/devicemanager/manager.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"k8s.io/apimachinery/pkg/api/resource"
3535
errorsutil "k8s.io/apimachinery/pkg/util/errors"
3636
"k8s.io/apimachinery/pkg/util/sets"
37+
"k8s.io/apiserver/pkg/server/healthz"
3738
utilfeature "k8s.io/apiserver/pkg/util/feature"
3839
pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
3940
"k8s.io/kubernetes/pkg/features"
@@ -326,6 +327,11 @@ func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler {
326327
return m.server
327328
}
328329

330+
// GetHealthChecker returns the plugin handler
331+
func (m *ManagerImpl) GetHealthChecker() healthz.HealthChecker {
332+
return m.server
333+
}
334+
329335
// checkpointFile returns device plugin checkpoint file path.
330336
func (m *ManagerImpl) checkpointFile() string {
331337
return filepath.Join(m.checkpointdir, kubeletDeviceManagerCheckpoint)

pkg/kubelet/cm/devicemanager/plugin/v1beta1/server.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"context"
2121
"fmt"
2222
"net"
23+
"net/http"
2324
"os"
2425
"path/filepath"
2526
"sync"
@@ -28,6 +29,7 @@ import (
2829
"google.golang.org/grpc"
2930

3031
core "k8s.io/api/core/v1"
32+
"k8s.io/apiserver/pkg/server/healthz"
3133
"k8s.io/klog/v2"
3234
api "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1"
3335
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
@@ -39,6 +41,7 @@ import (
3941
// Server interface provides methods for Device plugin registration server.
4042
type Server interface {
4143
cache.PluginHandler
44+
healthz.HealthChecker
4245
Start() error
4346
Stop() error
4447
SocketPath() string
@@ -53,6 +56,9 @@ type server struct {
5356
rhandler RegistrationHandler
5457
chandler ClientHandler
5558
clients map[string]Client
59+
60+
// isStarted indicates whether the service has started successfully.
61+
isStarted bool
5662
}
5763

5864
// NewServer returns an initialized device plugin registration server.
@@ -109,7 +115,9 @@ func (s *server) Start() error {
109115
api.RegisterRegistrationServer(s.grpc, s)
110116
go func() {
111117
defer s.wg.Done()
118+
s.setHealthy()
112119
if err = s.grpc.Serve(ln); err != nil {
120+
s.setUnhealthy()
113121
klog.ErrorS(err, "Error while serving device plugin registration grpc server")
114122
}
115123
}()
@@ -134,6 +142,9 @@ func (s *server) Stop() error {
134142
s.grpc.Stop()
135143
s.wg.Wait()
136144
s.grpc = nil
145+
// During kubelet termination, we do not need the registration server,
146+
// and we consider the kubelet to be healthy even when it is down.
147+
s.setHealthy()
137148

138149
return nil
139150
}
@@ -190,3 +201,24 @@ func (s *server) visitClients(visit func(r string, c Client)) {
190201
}
191202
s.mutex.Unlock()
192203
}
204+
205+
func (s *server) Name() string {
206+
return "device-plugin"
207+
}
208+
209+
func (s *server) Check(_ *http.Request) error {
210+
if s.isStarted {
211+
return nil
212+
}
213+
return fmt.Errorf("device plugin registration gRPC server failed and no device plugins can register")
214+
}
215+
216+
// setHealthy sets the health status of the gRPC server.
217+
func (s *server) setHealthy() {
218+
s.isStarted = true
219+
}
220+
221+
// setUnhealthy sets the health status of the gRPC server to unhealthy.
222+
func (s *server) setUnhealthy() {
223+
s.isStarted = false
224+
}

pkg/kubelet/cm/devicemanager/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
v1 "k8s.io/api/core/v1"
2323
"k8s.io/apimachinery/pkg/util/sets"
24+
"k8s.io/apiserver/pkg/server/healthz"
2425
"k8s.io/kubernetes/pkg/kubelet/cm/containermap"
2526
"k8s.io/kubernetes/pkg/kubelet/cm/resourceupdates"
2627
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
@@ -62,6 +63,7 @@ type Manager interface {
6263

6364
// GetWatcherHandler returns the plugin handler for the device manager.
6465
GetWatcherHandler() cache.PluginHandler
66+
GetHealthChecker() healthz.HealthChecker
6567

6668
// GetDevices returns information about the devices assigned to pods and containers
6769
GetDevices(podUID, containerName string) ResourceDeviceInstances

pkg/kubelet/cm/fake_container_manager.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424

2525
"k8s.io/apimachinery/pkg/api/resource"
2626
"k8s.io/apimachinery/pkg/types"
27+
"k8s.io/apiserver/pkg/server/healthz"
2728
internalapi "k8s.io/cri-api/pkg/apis"
2829
podresourcesapi "k8s.io/kubelet/pkg/apis/podresources/v1"
2930
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
@@ -131,6 +132,13 @@ func (cm *FakeContainerManager) GetPluginRegistrationHandlers() map[string]cache
131132
return nil
132133
}
133134

135+
func (cm *FakeContainerManager) GetHealthCheckers() []healthz.HealthChecker {
136+
cm.Lock()
137+
defer cm.Unlock()
138+
cm.CalledFunctions = append(cm.CalledFunctions, "GetPluginRegistrationServerChecker")
139+
return []healthz.HealthChecker{}
140+
}
141+
134142
func (cm *FakeContainerManager) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {
135143
cm.Lock()
136144
defer cm.Unlock()

pkg/kubelet/kubelet.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import (
3434
"time"
3535

3636
cadvisorapi "github.com/google/cadvisor/info/v1"
37+
inuserns "github.com/moby/sys/userns"
3738
"github.com/opencontainers/selinux/go-selinux"
3839
"go.opentelemetry.io/otel/attribute"
3940
"go.opentelemetry.io/otel/codes"
@@ -46,7 +47,6 @@ import (
4647
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
4748
netutils "k8s.io/utils/net"
4849

49-
inuserns "github.com/moby/sys/userns"
5050
v1 "k8s.io/api/core/v1"
5151
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
5252
"k8s.io/apimachinery/pkg/fields"
@@ -970,7 +970,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
970970
if utilfeature.DefaultFeatureGate.Enabled(features.SystemdWatchdog) {
971971
// NewHealthChecker returns an error indicating that the watchdog is configured but the configuration is incorrect,
972972
// the kubelet will not be started.
973-
klet.healthChecker, err = watchdog.NewHealthChecker(klet)
973+
checkers := klet.containerManager.GetHealthCheckers()
974+
klet.healthChecker, err = watchdog.NewHealthChecker(klet, watchdog.WithExtendedCheckers(checkers))
974975
if err != nil {
975976
return nil, fmt.Errorf("create health checker: %w", err)
976977
}
@@ -2913,12 +2914,12 @@ func (kl *Kubelet) BirthCry() {
29132914
// ListenAndServe runs the kubelet HTTP server.
29142915
func (kl *Kubelet) ListenAndServe(kubeCfg *kubeletconfiginternal.KubeletConfiguration, tlsOptions *server.TLSOptions,
29152916
auth server.AuthInterface, tp trace.TracerProvider) {
2916-
server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, kubeCfg, tlsOptions, auth, tp)
2917+
server.ListenAndServeKubeletServer(kl, kl.resourceAnalyzer, kl.containerManager.GetHealthCheckers(), kubeCfg, tlsOptions, auth, tp)
29172918
}
29182919

29192920
// ListenAndServeReadOnly runs the kubelet HTTP server in read-only mode.
29202921
func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint, tp trace.TracerProvider) {
2921-
server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, address, port, tp)
2922+
server.ListenAndServeKubeletReadOnlyServer(kl, kl.resourceAnalyzer, kl.containerManager.GetHealthCheckers(), address, port, tp)
29222923
}
29232924

29242925
// ListenAndServePodResources runs the kubelet podresources grpc service

pkg/kubelet/server/server.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ type Server struct {
114114
metricsBuckets sets.Set[string]
115115
metricsMethodBuckets sets.Set[string]
116116
resourceAnalyzer stats.ResourceAnalyzer
117+
extendedCheckers []healthz.HealthChecker
117118
}
118119

119120
// TLSOptions holds the TLS options.
@@ -156,6 +157,7 @@ func (a *filteringContainer) RegisteredHandlePaths() []string {
156157
func ListenAndServeKubeletServer(
157158
host HostInterface,
158159
resourceAnalyzer stats.ResourceAnalyzer,
160+
checkers []healthz.HealthChecker,
159161
kubeCfg *kubeletconfiginternal.KubeletConfiguration,
160162
tlsOptions *TLSOptions,
161163
auth AuthInterface,
@@ -164,7 +166,7 @@ func ListenAndServeKubeletServer(
164166
address := netutils.ParseIPSloppy(kubeCfg.Address)
165167
port := uint(kubeCfg.Port)
166168
klog.InfoS("Starting to listen", "address", address, "port", port)
167-
handler := NewServer(host, resourceAnalyzer, auth, kubeCfg)
169+
handler := NewServer(host, resourceAnalyzer, checkers, auth, kubeCfg)
168170

169171
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) {
170172
handler.InstallTracingFilter(tp)
@@ -198,11 +200,12 @@ func ListenAndServeKubeletServer(
198200
func ListenAndServeKubeletReadOnlyServer(
199201
host HostInterface,
200202
resourceAnalyzer stats.ResourceAnalyzer,
203+
checkers []healthz.HealthChecker,
201204
address net.IP,
202205
port uint,
203206
tp oteltrace.TracerProvider) {
204207
klog.InfoS("Starting to listen read-only", "address", address, "port", port)
205-
s := NewServer(host, resourceAnalyzer, nil, nil)
208+
s := NewServer(host, resourceAnalyzer, checkers, nil, nil)
206209

207210
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) {
208211
s.InstallTracingFilter(tp, otelrestful.WithPublicEndpoint())
@@ -278,6 +281,7 @@ type HostInterface interface {
278281
func NewServer(
279282
host HostInterface,
280283
resourceAnalyzer stats.ResourceAnalyzer,
284+
checkers []healthz.HealthChecker,
281285
auth AuthInterface,
282286
kubeCfg *kubeletconfiginternal.KubeletConfiguration) Server {
283287

@@ -288,6 +292,7 @@ func NewServer(
288292
restfulCont: &filteringContainer{Container: restful.NewContainer()},
289293
metricsBuckets: sets.New[string](),
290294
metricsMethodBuckets: sets.New[string]("OPTIONS", "GET", "HEAD", "POST", "PUT", "DELETE", "TRACE", "CONNECT"),
295+
extendedCheckers: checkers,
291296
}
292297
if auth != nil {
293298
server.InstallAuthFilter()
@@ -392,11 +397,13 @@ func (s *Server) getMetricMethodBucket(method string) string {
392397
// patterns with the restful Container.
393398
func (s *Server) InstallDefaultHandlers() {
394399
s.addMetricsBucketMatcher("healthz")
395-
healthz.InstallHandler(s.restfulCont,
400+
checkers := []healthz.HealthChecker{
396401
healthz.PingHealthz,
397402
healthz.LogHealthz,
398403
healthz.NamedCheck("syncloop", s.host.SyncLoopHealthCheck),
399-
)
404+
}
405+
checkers = append(checkers, s.extendedCheckers...)
406+
healthz.InstallHandler(s.restfulCont, checkers...)
400407

401408
slis.SLIMetricsWithReset{}.Install(s.restfulCont)
402409

0 commit comments

Comments
 (0)