Skip to content

Commit 957c953

Browse files
authored
Merge pull request kubernetes#118148 from linxiulei/sched_readyz
Expose /readyz & /livez in kube-scheduler
2 parents 83c2db0 + 44c08fd commit 957c953

File tree

5 files changed

+313
-25
lines changed

5 files changed

+313
-25
lines changed

cmd/kube-scheduler/app/server.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -167,10 +167,12 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
167167
defer cc.EventBroadcaster.Shutdown()
168168

169169
// Setup healthz checks.
170-
var checks []healthz.HealthChecker
170+
var checks, readyzChecks []healthz.HealthChecker
171171
if cc.ComponentConfig.LeaderElection.LeaderElect {
172172
checks = append(checks, cc.LeaderElection.WatchDog)
173+
readyzChecks = append(readyzChecks, cc.LeaderElection.WatchDog)
173174
}
175+
readyzChecks = append(readyzChecks, healthz.NewShutdownHealthz(ctx.Done()))
174176

175177
waitingForLeader := make(chan struct{})
176178
isLeader := func() bool {
@@ -184,9 +186,20 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
184186
}
185187
}
186188

189+
handlerSyncReadyCh := make(chan struct{})
190+
handlerSyncCheck := healthz.NamedCheck("sched-handler-sync", func(_ *http.Request) error {
191+
select {
192+
case <-handlerSyncReadyCh:
193+
return nil
194+
default:
195+
}
196+
return fmt.Errorf("waiting for handlers to sync")
197+
})
198+
readyzChecks = append(readyzChecks, handlerSyncCheck)
199+
187200
// Start up the healthz server.
188201
if cc.SecureServing != nil {
189-
handler := buildHandlerChain(newHealthzAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
202+
handler := buildHandlerChain(newHealthEndpointsAndMetricsHandler(&cc.ComponentConfig, cc.InformerFactory, isLeader, checks, readyzChecks), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
190203
// TODO: handle stoppedCh and listenerStoppedCh returned by c.SecureServing.Serve
191204
if _, _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
192205
// fail early for secure handlers, removing the old error loop from above
@@ -214,6 +227,7 @@ func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *
214227
logger.Error(err, "waiting for handlers to sync")
215228
}
216229

230+
close(handlerSyncReadyCh)
217231
logger.V(3).Info("Handlers synced")
218232
}
219233
if !cc.ComponentConfig.DelayCacheUntilActive || cc.LeaderElection == nil {
@@ -288,11 +302,14 @@ func installMetricHandler(pathRecorderMux *mux.PathRecorderMux, informers inform
288302
})
289303
}
290304

291-
// newHealthzAndMetricsHandler creates a healthz server from the config, and will also
305+
// newHealthEndpointsAndMetricsHandler creates an API health server from the config, and will also
292306
// embed the metrics handler.
293-
func newHealthzAndMetricsHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, informers informers.SharedInformerFactory, isLeader func() bool, checks ...healthz.HealthChecker) http.Handler {
307+
// TODO: healthz check is deprecated, please use livez and readyz instead. Will be removed in the future.
308+
func newHealthEndpointsAndMetricsHandler(config *kubeschedulerconfig.KubeSchedulerConfiguration, informers informers.SharedInformerFactory, isLeader func() bool, healthzChecks, readyzChecks []healthz.HealthChecker) http.Handler {
294309
pathRecorderMux := mux.NewPathRecorderMux("kube-scheduler")
295-
healthz.InstallHandler(pathRecorderMux, checks...)
310+
healthz.InstallHandler(pathRecorderMux, healthzChecks...)
311+
healthz.InstallLivezHandler(pathRecorderMux)
312+
healthz.InstallReadyzHandler(pathRecorderMux, readyzChecks...)
296313
installMetricHandler(pathRecorderMux, informers, isLeader)
297314
slis.SLIMetricsWithReset{}.Install(pathRecorderMux)
298315

staging/src/k8s.io/apiserver/pkg/server/healthz.go

Lines changed: 1 addition & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func (s *GenericAPIServer) AddLivezChecks(delay time.Duration, checks ...healthz
118118
// that we can register that the api-server is no longer ready while we attempt to gracefully
119119
// shutdown.
120120
func (s *GenericAPIServer) addReadyzShutdownCheck(stopCh <-chan struct{}) error {
121-
return s.AddReadyzChecks(shutdownCheck{stopCh})
121+
return s.AddReadyzChecks(healthz.NewShutdownHealthz(stopCh))
122122
}
123123

124124
// installHealthz creates the healthz endpoint for this server
@@ -139,25 +139,6 @@ func (s *GenericAPIServer) installLivez() {
139139
s.livezRegistry.installHandler(s.Handler.NonGoRestfulMux)
140140
}
141141

142-
// shutdownCheck fails if the embedded channel is closed. This is intended to allow for graceful shutdown sequences
143-
// for the apiserver.
144-
type shutdownCheck struct {
145-
StopCh <-chan struct{}
146-
}
147-
148-
func (shutdownCheck) Name() string {
149-
return "shutdown"
150-
}
151-
152-
func (c shutdownCheck) Check(req *http.Request) error {
153-
select {
154-
case <-c.StopCh:
155-
return fmt.Errorf("process is shutting down")
156-
default:
157-
}
158-
return nil
159-
}
160-
161142
// delayedHealthCheck wraps a health check which will not fail until the explicitly defined delay has elapsed. This
162143
// is intended for use primarily for livez health checks.
163144
func delayedHealthCheck(check healthz.HealthChecker, clock clock.Clock, delay time.Duration) healthz.HealthChecker {

staging/src/k8s.io/apiserver/pkg/server/healthz/healthz.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,29 @@ func (i *informerSync) Name() string {
105105
return "informer-sync"
106106
}
107107

108+
type shutdown struct {
109+
stopCh <-chan struct{}
110+
}
111+
112+
// NewShutdownHealthz returns a new HealthChecker that will fail if the embedded channel is closed.
113+
// This is intended to allow for graceful shutdown sequences.
114+
func NewShutdownHealthz(stopCh <-chan struct{}) HealthChecker {
115+
return &shutdown{stopCh}
116+
}
117+
118+
func (s *shutdown) Name() string {
119+
return "shutdown"
120+
}
121+
122+
func (s *shutdown) Check(req *http.Request) error {
123+
select {
124+
case <-s.stopCh:
125+
return fmt.Errorf("process is shutting down")
126+
default:
127+
}
128+
return nil
129+
}
130+
108131
func (i *informerSync) Check(_ *http.Request) error {
109132
stopCh := make(chan struct{})
110133
// Close stopCh to force checking if informers are synced now.
Lines changed: 240 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,240 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package serving
18+
19+
import (
20+
"crypto/tls"
21+
"crypto/x509"
22+
"fmt"
23+
"io"
24+
"net/http"
25+
"os"
26+
"path"
27+
"strings"
28+
"testing"
29+
30+
"k8s.io/klog/v2/ktesting"
31+
kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
32+
kubeschedulertesting "k8s.io/kubernetes/cmd/kube-scheduler/app/testing"
33+
"k8s.io/kubernetes/test/integration/framework"
34+
)
35+
36+
func TestHealthEndpoints(t *testing.T) {
37+
server, configStr, _, err := startTestAPIServer(t)
38+
if err != nil {
39+
t.Fatalf("Failed to start kube-apiserver server: %v", err)
40+
}
41+
defer server.TearDownFn()
42+
43+
apiserverConfig, err := os.CreateTemp("", "kubeconfig")
44+
if err != nil {
45+
t.Fatalf("Failed to create config file: %v", err)
46+
}
47+
defer func() {
48+
_ = os.Remove(apiserverConfig.Name())
49+
}()
50+
if _, err = apiserverConfig.WriteString(configStr); err != nil {
51+
t.Fatalf("Failed to write config file: %v", err)
52+
}
53+
54+
brokenConfigStr := strings.ReplaceAll(configStr, "127.0.0.1", "127.0.0.2")
55+
brokenConfig, err := os.CreateTemp("", "kubeconfig")
56+
if err != nil {
57+
t.Fatalf("Failed to create config file: %v", err)
58+
}
59+
if _, err := brokenConfig.WriteString(brokenConfigStr); err != nil {
60+
t.Fatalf("Failed to write config file: %v", err)
61+
}
62+
defer func() {
63+
_ = os.Remove(brokenConfig.Name())
64+
}()
65+
66+
tests := []struct {
67+
name string
68+
path string
69+
useBrokenConfig bool
70+
wantResponseCode int
71+
}{
72+
{
73+
"/healthz",
74+
"/healthz",
75+
false,
76+
http.StatusOK,
77+
},
78+
{
79+
"/livez",
80+
"/livez",
81+
false,
82+
http.StatusOK,
83+
},
84+
{
85+
"/livez with ping check",
86+
"/livez/ping",
87+
false,
88+
http.StatusOK,
89+
},
90+
{
91+
"/readyz",
92+
"/readyz",
93+
false,
94+
http.StatusOK,
95+
},
96+
{
97+
"/readyz with sched-handler-sync",
98+
"/readyz/sched-handler-sync",
99+
false,
100+
http.StatusOK,
101+
},
102+
{
103+
"/readyz with shutdown",
104+
"/readyz/shutdown",
105+
false,
106+
http.StatusOK,
107+
},
108+
{
109+
"/readyz with broken apiserver",
110+
"/readyz",
111+
true,
112+
http.StatusInternalServerError,
113+
},
114+
}
115+
116+
for _, tt := range tests {
117+
t.Run(tt.name, func(t *testing.T) {
118+
tt := tt
119+
_, ctx := ktesting.NewTestContext(t)
120+
121+
configFile := apiserverConfig.Name()
122+
if tt.useBrokenConfig {
123+
configFile = brokenConfig.Name()
124+
}
125+
result, err := kubeschedulertesting.StartTestServer(
126+
ctx,
127+
[]string{"--kubeconfig", configFile, "--leader-elect=false", "--authorization-always-allow-paths", tt.path})
128+
129+
if err != nil {
130+
t.Fatalf("Failed to start kube-scheduler server: %v", err)
131+
}
132+
if result.TearDownFn != nil {
133+
defer result.TearDownFn()
134+
}
135+
136+
client, base, err := clientAndURLFromTestServer(result)
137+
if err != nil {
138+
t.Fatalf("Failed to get client from test server: %v", err)
139+
}
140+
req, err := http.NewRequest("GET", base+tt.path, nil)
141+
if err != nil {
142+
t.Fatalf("failed to request: %v", err)
143+
}
144+
r, err := client.Do(req)
145+
if err != nil {
146+
t.Fatalf("failed to GET %s from component: %v", tt.path, err)
147+
}
148+
149+
body, err := io.ReadAll(r.Body)
150+
if err != nil {
151+
t.Fatalf("failed to read response body: %v", err)
152+
}
153+
if err = r.Body.Close(); err != nil {
154+
t.Fatalf("failed to close response body: %v", err)
155+
}
156+
if got, expected := r.StatusCode, tt.wantResponseCode; got != expected {
157+
t.Fatalf("expected http %d at %s of component, got: %d %q", expected, tt.path, got, string(body))
158+
}
159+
})
160+
}
161+
}
162+
163+
// TODO: Make this a util function once there is a unified way to start a testing apiserver so that we can reuse it.
164+
func startTestAPIServer(t *testing.T) (server *kubeapiservertesting.TestServer, apiserverConfig, token string, err error) {
165+
// Insulate this test from picking up in-cluster config when run inside a pod
166+
// We can't assume we have permissions to write to /var/run/secrets/... from a unit test to mock in-cluster config for testing
167+
originalHost := os.Getenv("KUBERNETES_SERVICE_HOST")
168+
if len(originalHost) > 0 {
169+
if err = os.Setenv("KUBERNETES_SERVICE_HOST", ""); err != nil {
170+
return
171+
}
172+
defer func() {
173+
err = os.Setenv("KUBERNETES_SERVICE_HOST", originalHost)
174+
}()
175+
}
176+
177+
// authenticate to apiserver via bearer token
178+
token = "flwqkenfjasasdfmwerasd" // Fake token for testing.
179+
var tokenFile *os.File
180+
tokenFile, err = os.CreateTemp("", "kubeconfig")
181+
if err != nil {
182+
return
183+
}
184+
if _, err = tokenFile.WriteString(fmt.Sprintf(`%s,system:kube-scheduler,system:kube-scheduler,""`, token)); err != nil {
185+
return
186+
}
187+
if err = tokenFile.Close(); err != nil {
188+
return
189+
}
190+
191+
// start apiserver
192+
server = kubeapiservertesting.StartTestServerOrDie(t, nil, []string{
193+
"--token-auth-file", tokenFile.Name(),
194+
"--authorization-mode", "AlwaysAllow",
195+
}, framework.SharedEtcd())
196+
197+
apiserverConfig = fmt.Sprintf(`
198+
apiVersion: v1
199+
kind: Config
200+
clusters:
201+
- cluster:
202+
server: %s
203+
certificate-authority: %s
204+
name: integration
205+
contexts:
206+
- context:
207+
cluster: integration
208+
user: kube-scheduler
209+
name: default-context
210+
current-context: default-context
211+
users:
212+
- name: kube-scheduler
213+
user:
214+
token: %s
215+
`, server.ClientConfig.Host, server.ServerOpts.SecureServing.ServerCert.CertKey.CertFile, token)
216+
return server, apiserverConfig, token, nil
217+
}
218+
219+
func clientAndURLFromTestServer(s kubeschedulertesting.TestServer) (*http.Client, string, error) {
220+
secureInfo := s.Config.SecureServing
221+
secureOptions := s.Options.SecureServing
222+
url := fmt.Sprintf("https://%s", secureInfo.Listener.Addr().String())
223+
url = strings.ReplaceAll(url, "[::]", "127.0.0.1") // switch to IPv4 because the self-signed cert does not support [::]
224+
225+
// read self-signed server cert disk
226+
pool := x509.NewCertPool()
227+
serverCertPath := path.Join(secureOptions.ServerCert.CertDirectory, secureOptions.ServerCert.PairName+".crt")
228+
serverCert, err := os.ReadFile(serverCertPath)
229+
if err != nil {
230+
return nil, "", fmt.Errorf("Failed to read component server cert %q: %w", serverCertPath, err)
231+
}
232+
pool.AppendCertsFromPEM(serverCert)
233+
tr := &http.Transport{
234+
TLSClientConfig: &tls.Config{
235+
RootCAs: pool,
236+
},
237+
}
238+
client := &http.Client{Transport: tr}
239+
return client, url, nil
240+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package serving
18+
19+
import (
20+
"testing"
21+
22+
"k8s.io/kubernetes/test/integration/framework"
23+
)
24+
25+
func TestMain(m *testing.M) {
26+
framework.EtcdMain(m.Run)
27+
}

0 commit comments

Comments
 (0)