Skip to content

Commit 323f99e

Browse files
committed
startupProbe: Kubelet changes
1 parent e4d26f8 commit 323f99e

File tree

4 files changed

+125
-12
lines changed

4 files changed

+125
-12
lines changed

pkg/kubelet/prober/prober.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,23 +36,24 @@ import (
3636
"k8s.io/kubernetes/pkg/probe"
3737
execprobe "k8s.io/kubernetes/pkg/probe/exec"
3838
httpprobe "k8s.io/kubernetes/pkg/probe/http"
39-
tcprobe "k8s.io/kubernetes/pkg/probe/tcp"
39+
tcpprobe "k8s.io/kubernetes/pkg/probe/tcp"
4040
"k8s.io/utils/exec"
4141

4242
"k8s.io/klog"
4343
)
4444

4545
const maxProbeRetries = 3
4646

47-
// Prober helps to check the liveness/readiness of a container.
47+
// Prober helps to check the liveness/readiness/startup of a container.
4848
type prober struct {
4949
exec execprobe.Prober
5050
// probe types needs different httprobe instances so they don't
5151
// share a connection pool which can cause collsions to the
5252
// same host:port and transient failures. See #49740.
5353
readinessHTTP httpprobe.Prober
5454
livenessHTTP httpprobe.Prober
55-
tcp tcprobe.Prober
55+
startupHTTP httpprobe.Prober
56+
tcp tcpprobe.Prober
5657
runner kubecontainer.ContainerCommandRunner
5758

5859
refManager *kubecontainer.RefManager
@@ -71,7 +72,8 @@ func newProber(
7172
exec: execprobe.New(),
7273
readinessHTTP: httpprobe.New(followNonLocalRedirects),
7374
livenessHTTP: httpprobe.New(followNonLocalRedirects),
74-
tcp: tcprobe.New(),
75+
startupHTTP: httpprobe.New(followNonLocalRedirects),
76+
tcp: tcpprobe.New(),
7577
runner: runner,
7678
refManager: refManager,
7779
recorder: recorder,
@@ -86,6 +88,8 @@ func (pb *prober) probe(probeType probeType, pod *v1.Pod, status v1.PodStatus, c
8688
probeSpec = container.ReadinessProbe
8789
case liveness:
8890
probeSpec = container.LivenessProbe
91+
case startup:
92+
probeSpec = container.StartupProbe
8993
default:
9094
return results.Failure, fmt.Errorf("unknown probe type: %q", probeType)
9195
}
@@ -174,11 +178,14 @@ func (pb *prober) runProbe(probeType probeType, p *v1.Probe, pod *v1.Pod, status
174178
url := formatURL(scheme, host, port, path)
175179
headers := buildHeader(p.HTTPGet.HTTPHeaders)
176180
klog.V(4).Infof("HTTP-Probe Headers: %v", headers)
177-
if probeType == liveness {
181+
switch probeType {
182+
case liveness:
178183
return pb.livenessHTTP.Probe(url, headers, timeout)
184+
case startup:
185+
return pb.startupHTTP.Probe(url, headers, timeout)
186+
default:
187+
return pb.readinessHTTP.Probe(url, headers, timeout)
179188
}
180-
// readiness
181-
return pb.readinessHTTP.Probe(url, headers, timeout)
182189
}
183190
if p.TCPSocket != nil {
184191
port, err := extractPort(p.TCPSocket.Port, container)

pkg/kubelet/prober/prober_manager.go

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@ import (
2323
"k8s.io/apimachinery/pkg/types"
2424
"k8s.io/apimachinery/pkg/util/sets"
2525
"k8s.io/apimachinery/pkg/util/wait"
26+
utilfeature "k8s.io/apiserver/pkg/util/feature"
2627
"k8s.io/client-go/tools/record"
2728
"k8s.io/component-base/metrics"
2829
"k8s.io/klog"
30+
"k8s.io/kubernetes/pkg/features"
2931
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
3032
"k8s.io/kubernetes/pkg/kubelet/prober/results"
3133
"k8s.io/kubernetes/pkg/kubelet/status"
@@ -37,7 +39,7 @@ var ProberResults = metrics.NewCounterVec(
3739
&metrics.CounterOpts{
3840
Subsystem: "prober",
3941
Name: "probe_total",
40-
Help: "Cumulative number of a liveness or readiness probe for a container by result.",
42+
Help: "Cumulative number of a liveness, readiness or startup probe for a container by result.",
4143
StabilityLevel: metrics.ALPHA,
4244
},
4345
[]string{"probe_type",
@@ -89,6 +91,9 @@ type manager struct {
8991
// livenessManager manages the results of liveness probes
9092
livenessManager results.Manager
9193

94+
// startupManager manages the results of startup probes
95+
startupManager results.Manager
96+
9297
// prober executes the probe actions.
9398
prober *prober
9499
}
@@ -103,11 +108,13 @@ func NewManager(
103108

104109
prober := newProber(runner, refManager, recorder)
105110
readinessManager := results.NewManager()
111+
startupManager := results.NewManager()
106112
return &manager{
107113
statusManager: statusManager,
108114
prober: prober,
109115
readinessManager: readinessManager,
110116
livenessManager: livenessManager,
117+
startupManager: startupManager,
111118
workers: make(map[probeKey]*worker),
112119
}
113120
}
@@ -116,6 +123,8 @@ func NewManager(
116123
func (m *manager) Start() {
117124
// Start syncing readiness.
118125
go wait.Forever(m.updateReadiness, 0)
126+
// Start syncing startup.
127+
go wait.Forever(m.updateStartup, 0)
119128
}
120129

121130
// Key uniquely identifying container probes
@@ -125,12 +134,13 @@ type probeKey struct {
125134
probeType probeType
126135
}
127136

128-
// Type of probe (readiness or liveness)
137+
// Type of probe (liveness, readiness or startup)
129138
type probeType int
130139

131140
const (
132141
liveness probeType = iota
133142
readiness
143+
startup
134144

135145
probeResultSuccessful string = "successful"
136146
probeResultFailed string = "failed"
@@ -144,6 +154,8 @@ func (t probeType) String() string {
144154
return "Readiness"
145155
case liveness:
146156
return "Liveness"
157+
case startup:
158+
return "Startup"
147159
default:
148160
return "UNKNOWN"
149161
}
@@ -157,6 +169,18 @@ func (m *manager) AddPod(pod *v1.Pod) {
157169
for _, c := range pod.Spec.Containers {
158170
key.containerName = c.Name
159171

172+
if c.StartupProbe != nil && utilfeature.DefaultFeatureGate.Enabled(features.StartupProbe) {
173+
key.probeType = startup
174+
if _, ok := m.workers[key]; ok {
175+
klog.Errorf("Startup probe already exists! %v - %v",
176+
format.Pod(pod), c.Name)
177+
return
178+
}
179+
w := newWorker(m, startup, pod, c)
180+
m.workers[key] = w
181+
go w.run()
182+
}
183+
160184
if c.ReadinessProbe != nil {
161185
key.probeType = readiness
162186
if _, ok := m.workers[key]; ok {
@@ -190,7 +214,7 @@ func (m *manager) RemovePod(pod *v1.Pod) {
190214
key := probeKey{podUID: pod.UID}
191215
for _, c := range pod.Spec.Containers {
192216
key.containerName = c.Name
193-
for _, probeType := range [...]probeType{readiness, liveness} {
217+
for _, probeType := range [...]probeType{readiness, liveness, startup} {
194218
key.probeType = probeType
195219
if worker, ok := m.workers[key]; ok {
196220
worker.stop()
@@ -223,6 +247,21 @@ func (m *manager) UpdatePodStatus(podUID types.UID, podStatus *v1.PodStatus) {
223247
ready = !exists
224248
}
225249
podStatus.ContainerStatuses[i].Ready = ready
250+
251+
var started bool
252+
if c.State.Running == nil {
253+
started = false
254+
} else if !utilfeature.DefaultFeatureGate.Enabled(features.StartupProbe) {
255+
// the container is running, assume it is started if the StartupProbe feature is disabled
256+
started = true
257+
} else if result, ok := m.startupManager.Get(kubecontainer.ParseContainerID(c.ContainerID)); ok {
258+
started = result == results.Success
259+
} else {
260+
// The check whether there is a probe which hasn't run yet.
261+
_, exists := m.getWorker(podUID, c.Name, startup)
262+
started = !exists
263+
}
264+
podStatus.ContainerStatuses[i].Started = &started
226265
}
227266
// init containers are ready if they have exited with success or if a readiness probe has
228267
// succeeded.
@@ -262,3 +301,10 @@ func (m *manager) updateReadiness() {
262301
ready := update.Result == results.Success
263302
m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
264303
}
304+
305+
func (m *manager) updateStartup() {
306+
update := <-m.startupManager.Updates()
307+
308+
started := update.Result == results.Success
309+
m.statusManager.SetContainerStartup(update.PodUID, update.ContainerID, started)
310+
}

pkg/kubelet/prober/worker.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,10 @@ func newWorker(
9898
w.spec = container.LivenessProbe
9999
w.resultsManager = m.livenessManager
100100
w.initialValue = results.Success
101+
case startup:
102+
w.spec = container.StartupProbe
103+
w.resultsManager = m.startupManager
104+
w.initialValue = results.Failure
101105
}
102106

103107
basicMetricLabels := prometheus.Labels{
@@ -218,10 +222,23 @@ func (w *worker) doProbe() (keepGoing bool) {
218222
w.pod.Spec.RestartPolicy != v1.RestartPolicyNever
219223
}
220224

225+
// Probe disabled for InitialDelaySeconds.
221226
if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
222227
return true
223228
}
224229

230+
if c.Started != nil && *c.Started {
231+
// Stop probing for startup once container has started.
232+
if w.probeType == startup {
233+
return true
234+
}
235+
} else {
236+
// Disable other probes until container has started.
237+
if w.probeType != startup {
238+
return true
239+
}
240+
}
241+
225242
// TODO: in order for exec probes to correctly handle downward API env, we must be able to reconstruct
226243
// the full container environment here, OR we must make a call to the CRI in order to get those environment
227244
// values from the running container.
@@ -255,8 +272,8 @@ func (w *worker) doProbe() (keepGoing bool) {
255272

256273
w.resultsManager.Set(w.containerID, result, w.pod)
257274

258-
if w.probeType == liveness && result == results.Failure {
259-
// The container fails a liveness check, it will need to be restarted.
275+
if (w.probeType == liveness || w.probeType == startup) && result == results.Failure {
276+
// The container fails a liveness/startup check, it will need to be restarted.
260277
// Stop probing until we see a new container ID. This is to reduce the
261278
// chance of hitting #21751, where running `docker exec` when a
262279
// container is being stopped may lead to corrupted container state.

pkg/kubelet/status/status_manager.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ type Manager interface {
100100
// triggers a status update.
101101
SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)
102102

103+
// SetContainerStartup updates the cached container status with the given startup, and
104+
// triggers a status update.
105+
SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool)
106+
103107
// TerminatePod resets the container status for the provided pod to terminated and triggers
104108
// a status update.
105109
TerminatePod(pod *v1.Pod)
@@ -248,6 +252,45 @@ func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontai
248252
m.updateStatusInternal(pod, status, false)
249253
}
250254

255+
func (m *manager) SetContainerStartup(podUID types.UID, containerID kubecontainer.ContainerID, started bool) {
256+
m.podStatusesLock.Lock()
257+
defer m.podStatusesLock.Unlock()
258+
259+
pod, ok := m.podManager.GetPodByUID(podUID)
260+
if !ok {
261+
klog.V(4).Infof("Pod %q has been deleted, no need to update startup", string(podUID))
262+
return
263+
}
264+
265+
oldStatus, found := m.podStatuses[pod.UID]
266+
if !found {
267+
klog.Warningf("Container startup changed before pod has synced: %q - %q",
268+
format.Pod(pod), containerID.String())
269+
return
270+
}
271+
272+
// Find the container to update.
273+
containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String())
274+
if !ok {
275+
klog.Warningf("Container startup changed for unknown container: %q - %q",
276+
format.Pod(pod), containerID.String())
277+
return
278+
}
279+
280+
if containerStatus.Started != nil && *containerStatus.Started == started {
281+
klog.V(4).Infof("Container startup unchanged (%v): %q - %q", started,
282+
format.Pod(pod), containerID.String())
283+
return
284+
}
285+
286+
// Make sure we're not updating the cached version.
287+
status := *oldStatus.status.DeepCopy()
288+
containerStatus, _, _ = findContainerStatus(&status, containerID.String())
289+
containerStatus.Started = &started
290+
291+
m.updateStatusInternal(pod, status, false)
292+
}
293+
251294
func findContainerStatus(status *v1.PodStatus, containerID string) (containerStatus *v1.ContainerStatus, init bool, ok bool) {
252295
// Find the container to update.
253296
for i, c := range status.ContainerStatuses {

0 commit comments

Comments
 (0)