Skip to content

Commit 3793bec

Browse files
committed
kubelet: integrate the image pull manager
1 parent b3befff commit 3793bec

File tree

7 files changed

+418
-62
lines changed

7 files changed

+418
-62
lines changed

pkg/kubelet/images/image_manager.go

Lines changed: 135 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
credentialproviderplugin "k8s.io/kubernetes/pkg/credentialprovider/plugin"
3737
credentialprovidersecrets "k8s.io/kubernetes/pkg/credentialprovider/secrets"
3838
"k8s.io/kubernetes/pkg/features"
39+
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
3940
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
4041
"k8s.io/kubernetes/pkg/kubelet/events"
4142
"k8s.io/kubernetes/pkg/kubelet/metrics"
@@ -49,10 +50,11 @@ type ImagePodPullingTimeRecorder interface {
4950

5051
// imageManager provides the functionalities for image pulling.
5152
type imageManager struct {
52-
recorder record.EventRecorder
53-
imageService kubecontainer.ImageService
54-
backOff *flowcontrol.Backoff
55-
prevPullErrMsg sync.Map
53+
recorder record.EventRecorder
54+
imageService kubecontainer.ImageService
55+
imagePullManager ImagePullManager
56+
backOff *flowcontrol.Backoff
57+
prevPullErrMsg sync.Map
5658

5759
// It will check the presence of the image, and report the 'image pulling', image pulled' events correspondingly.
5860
puller imagePuller
@@ -64,7 +66,19 @@ type imageManager struct {
6466
var _ ImageManager = &imageManager{}
6567

6668
// NewImageManager instantiates a new ImageManager object.
67-
func NewImageManager(recorder record.EventRecorder, nodeKeyring credentialprovider.DockerKeyring, imageService kubecontainer.ImageService, imageBackOff *flowcontrol.Backoff, serialized bool, maxParallelImagePulls *int32, qps float32, burst int, podPullingTimeRecorder ImagePodPullingTimeRecorder) ImageManager {
69+
func NewImageManager(
70+
recorder record.EventRecorder,
71+
nodeKeyring credentialprovider.DockerKeyring,
72+
imageService kubecontainer.ImageService,
73+
imagePullManager ImagePullManager,
74+
imageBackOff *flowcontrol.Backoff,
75+
serialized bool,
76+
maxParallelImagePulls *int32,
77+
qps float32,
78+
burst int,
79+
podPullingTimeRecorder ImagePodPullingTimeRecorder,
80+
) ImageManager {
81+
6882
imageService = throttleImagePulling(imageService, qps, burst)
6983

7084
var puller imagePuller
@@ -76,6 +90,7 @@ func NewImageManager(recorder record.EventRecorder, nodeKeyring credentialprovid
7690
return &imageManager{
7791
recorder: recorder,
7892
imageService: imageService,
93+
imagePullManager: imagePullManager,
7994
nodeKeyring: nodeKeyring,
8095
backOff: imageBackOff,
8196
puller: puller,
@@ -85,33 +100,25 @@ func NewImageManager(recorder record.EventRecorder, nodeKeyring credentialprovid
85100

86101
// imagePullPrecheck inspects the pull policy and checks for image presence accordingly,
87102
// returning (imageRef, error msg, err) and logging any errors.
88-
func (m *imageManager) imagePullPrecheck(ctx context.Context, objRef *v1.ObjectReference, logPrefix string, pullPolicy v1.PullPolicy, spec *kubecontainer.ImageSpec, imgRef string) (imageRef string, msg string, err error) {
103+
func (m *imageManager) imagePullPrecheck(ctx context.Context, objRef *v1.ObjectReference, logPrefix string, pullPolicy v1.PullPolicy, spec *kubecontainer.ImageSpec, requestedImage string) (imageRef string, msg string, err error) {
89104
switch pullPolicy {
90105
case v1.PullAlways:
91106
return "", msg, nil
92-
case v1.PullIfNotPresent:
93-
imageRef, err = m.imageService.GetImageRef(ctx, *spec)
94-
if err != nil {
95-
msg = fmt.Sprintf("Failed to inspect image %q: %v", imageRef, err)
96-
m.logIt(objRef, v1.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, klog.Warning)
97-
return "", msg, ErrImageInspect
98-
}
99-
return imageRef, msg, nil
100-
case v1.PullNever:
107+
case v1.PullIfNotPresent, v1.PullNever:
101108
imageRef, err = m.imageService.GetImageRef(ctx, *spec)
102109
if err != nil {
103110
msg = fmt.Sprintf("Failed to inspect image %q: %v", imageRef, err)
104111
m.logIt(objRef, v1.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, klog.Warning)
105112
return "", msg, ErrImageInspect
106113
}
107-
if imageRef == "" {
108-
msg = fmt.Sprintf("Container image %q is not present with pull policy of Never", imgRef)
109-
m.logIt(objRef, v1.EventTypeWarning, events.ErrImageNeverPullPolicy, logPrefix, msg, klog.Warning)
110-
return "", msg, ErrImageNeverPull
111-
}
112-
return imageRef, msg, nil
113114
}
114-
return
115+
116+
if len(imageRef) == 0 && pullPolicy == v1.PullNever {
117+
msg, err = m.imageNotPresentOnNeverPolicyError(logPrefix, objRef, requestedImage)
118+
return "", msg, err
119+
}
120+
121+
return imageRef, msg, nil
115122
}
116123

117124
// records an event using ref, event msg. log to glog using prefix, msg, logFn
@@ -123,15 +130,30 @@ func (m *imageManager) logIt(objRef *v1.ObjectReference, eventtype, event, prefi
123130
}
124131
}
125132

126-
// EnsureImageExists pulls the image for the specified pod and imgRef, and returns
133+
// imageNotPresentOnNeverPolicy error is a utility function that emits an event about
134+
// an image not being present and returns the appropriate error to be passed on.
135+
//
136+
// Called in 2 scenarios:
137+
// 1. image is not present with `imagePullPolicy: Never“
138+
// 2. image is present but cannot be accessed with the presented set of credentials
139+
//
140+
// We don't want to reveal the presence of an image if it cannot be accessed, hence we
141+
// want the same behavior in both the above scenarios.
142+
func (m *imageManager) imageNotPresentOnNeverPolicyError(logPrefix string, objRef *v1.ObjectReference, requestedImage string) (string, error) {
143+
msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", requestedImage)
144+
m.logIt(objRef, v1.EventTypeWarning, events.ErrImageNeverPullPolicy, logPrefix, msg, klog.Warning)
145+
return msg, ErrImageNeverPull
146+
}
147+
148+
// EnsureImageExists pulls the image for the specified pod and requestedImage, and returns
127149
// (imageRef, error message, error).
128-
func (m *imageManager) EnsureImageExists(ctx context.Context, objRef *v1.ObjectReference, pod *v1.Pod, imgRef string, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig, podRuntimeHandler string, pullPolicy v1.PullPolicy) (imageRef, message string, err error) {
129-
logPrefix := fmt.Sprintf("%s/%s/%s", pod.Namespace, pod.Name, imgRef)
150+
func (m *imageManager) EnsureImageExists(ctx context.Context, objRef *v1.ObjectReference, pod *v1.Pod, requestedImage string, pullSecrets []v1.Secret, podSandboxConfig *runtimeapi.PodSandboxConfig, podRuntimeHandler string, pullPolicy v1.PullPolicy) (imageRef, message string, err error) {
151+
logPrefix := fmt.Sprintf("%s/%s/%s", pod.Namespace, pod.Name, requestedImage)
130152

131153
// If the image contains no tag or digest, a default tag should be applied.
132-
image, err := applyDefaultImageTag(imgRef)
154+
image, err := applyDefaultImageTag(requestedImage)
133155
if err != nil {
134-
msg := fmt.Sprintf("Failed to apply default image tag %q: %v", imgRef, err)
156+
msg := fmt.Sprintf("Failed to apply default image tag %q: %v", requestedImage, err)
135157
m.logIt(objRef, v1.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, klog.Warning)
136158
return "", msg, ErrInvalidImageName
137159
}
@@ -150,18 +172,12 @@ func (m *imageManager) EnsureImageExists(ctx context.Context, objRef *v1.ObjectR
150172
RuntimeHandler: podRuntimeHandler,
151173
}
152174

153-
imageRef, message, err = m.imagePullPrecheck(ctx, objRef, logPrefix, pullPolicy, &spec, imgRef)
175+
imageRef, message, err = m.imagePullPrecheck(ctx, objRef, logPrefix, pullPolicy, &spec, requestedImage)
154176
if err != nil {
155177
return "", message, err
156178
}
157-
if imageRef != "" {
158-
msg := fmt.Sprintf("Container image %q already present on machine", imgRef)
159-
m.logIt(objRef, v1.EventTypeNormal, events.PulledImage, logPrefix, msg, klog.Info)
160-
return imageRef, msg, nil
161-
}
162179

163-
img := spec.Image
164-
repoToPull, _, _, err := parsers.ParseImageName(img)
180+
repoToPull, _, _, err := parsers.ParseImageName(spec.Image)
165181
if err != nil {
166182
return "", err.Error(), err
167183
}
@@ -188,13 +204,67 @@ func (m *imageManager) EnsureImageExists(ctx context.Context, objRef *v1.ObjectR
188204
}
189205

190206
pullCredentials, _ := keyring.Lookup(repoToPull)
191-
return m.pullImage(ctx, logPrefix, objRef, pod.UID, imgRef, spec, pullCredentials, podSandboxConfig)
207+
208+
if imageRef != "" {
209+
if !utilfeature.DefaultFeatureGate.Enabled(features.KubeletEnsureSecretPulledImages) {
210+
msg := fmt.Sprintf("Container image %q already present on machine", requestedImage)
211+
m.logIt(objRef, v1.EventTypeNormal, events.PulledImage, logPrefix, msg, klog.Info)
212+
return imageRef, msg, nil
213+
}
214+
215+
var imagePullSecrets []kubeletconfiginternal.ImagePullSecret
216+
for _, s := range pullCredentials {
217+
if s.Source == nil {
218+
// we're only interested in creds that are not node accessible
219+
continue
220+
}
221+
imagePullSecrets = append(imagePullSecrets, kubeletconfiginternal.ImagePullSecret{
222+
UID: string(s.Source.Secret.UID),
223+
Name: s.Source.Secret.Name,
224+
Namespace: s.Source.Secret.Namespace,
225+
CredentialHash: s.AuthConfigHash,
226+
})
227+
}
228+
229+
pullRequired := m.imagePullManager.MustAttemptImagePull(requestedImage, imageRef, imagePullSecrets)
230+
if !pullRequired {
231+
msg := fmt.Sprintf("Container image %q already present on machine and can be accessed by the pod", requestedImage)
232+
m.logIt(objRef, v1.EventTypeNormal, events.PulledImage, logPrefix, msg, klog.Info)
233+
return imageRef, msg, nil
234+
}
235+
}
236+
237+
if pullPolicy == v1.PullNever {
238+
// The image is present as confirmed by imagePullPrecheck but it apparently
239+
// wasn't accessible given the credentials check by the imagePullManager.
240+
msg, err := m.imageNotPresentOnNeverPolicyError(logPrefix, objRef, requestedImage)
241+
return "", msg, err
242+
}
243+
244+
return m.pullImage(ctx, logPrefix, objRef, pod.UID, requestedImage, spec, pullCredentials, podSandboxConfig)
192245
}
193246

194-
func (m *imageManager) pullImage(ctx context.Context, logPrefix string, objRef *v1.ObjectReference, podUID types.UID, imgRef string, imgSpec kubecontainer.ImageSpec, pullCredentials []credentialprovider.TrackedAuthConfig, podSandboxConfig *runtimeapi.PodSandboxConfig) (imageRef, message string, err error) {
195-
backOffKey := fmt.Sprintf("%s_%s", podUID, imgRef)
247+
func (m *imageManager) pullImage(ctx context.Context, logPrefix string, objRef *v1.ObjectReference, podUID types.UID, image string, imgSpec kubecontainer.ImageSpec, pullCredentials []credentialprovider.TrackedAuthConfig, podSandboxConfig *runtimeapi.PodSandboxConfig) (imageRef, message string, err error) {
248+
var pullSucceeded bool
249+
var finalPullCredentials *credentialprovider.TrackedAuthConfig
250+
251+
if utilfeature.DefaultFeatureGate.Enabled(features.KubeletEnsureSecretPulledImages) {
252+
if err := m.imagePullManager.RecordPullIntent(image); err != nil {
253+
return "", fmt.Sprintf("Failed to record image pull intent for container image %q: %v", image, err), err
254+
}
255+
256+
defer func() {
257+
if pullSucceeded {
258+
m.imagePullManager.RecordImagePulled(image, imageRef, trackedToImagePullCreds(finalPullCredentials))
259+
} else {
260+
m.imagePullManager.RecordImagePullFailed(image)
261+
}
262+
}()
263+
}
264+
265+
backOffKey := fmt.Sprintf("%s_%s", podUID, image)
196266
if m.backOff.IsInBackOffSinceUpdate(backOffKey, m.backOff.Clock.Now()) {
197-
msg := fmt.Sprintf("Back-off pulling image %q", imgRef)
267+
msg := fmt.Sprintf("Back-off pulling image %q", image)
198268
m.logIt(objRef, v1.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, klog.Info)
199269

200270
// Wrap the error from the actual pull if available.
@@ -211,16 +281,16 @@ func (m *imageManager) pullImage(ctx context.Context, logPrefix string, objRef *
211281
m.prevPullErrMsg.Delete(backOffKey)
212282

213283
m.podPullingTimeRecorder.RecordImageStartedPulling(podUID)
214-
m.logIt(objRef, v1.EventTypeNormal, events.PullingImage, logPrefix, fmt.Sprintf("Pulling image %q", imgRef), klog.Info)
284+
m.logIt(objRef, v1.EventTypeNormal, events.PullingImage, logPrefix, fmt.Sprintf("Pulling image %q", image), klog.Info)
215285
startTime := time.Now()
216286

217287
pullChan := make(chan pullResult)
218288
m.puller.pullImage(ctx, imgSpec, pullCredentials, pullChan, podSandboxConfig)
219289
imagePullResult := <-pullChan
220290
if imagePullResult.err != nil {
221-
m.logIt(objRef, v1.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", imgRef, imagePullResult.err), klog.Warning)
291+
m.logIt(objRef, v1.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", image, imagePullResult.err), klog.Warning)
222292
m.backOff.Next(backOffKey, m.backOff.Clock.Now())
223-
msg, err := evalCRIPullErr(imgRef, imagePullResult.err)
293+
msg, err := evalCRIPullErr(image, imagePullResult.err)
224294

225295
// Store the actual pull error for providing that information during
226296
// the image pull back-off.
@@ -231,9 +301,11 @@ func (m *imageManager) pullImage(ctx context.Context, logPrefix string, objRef *
231301
m.podPullingTimeRecorder.RecordImageFinishedPulling(podUID)
232302
imagePullDuration := time.Since(startTime).Truncate(time.Millisecond)
233303
m.logIt(objRef, v1.EventTypeNormal, events.PulledImage, logPrefix, fmt.Sprintf("Successfully pulled image %q in %v (%v including waiting). Image size: %v bytes.",
234-
imgRef, imagePullResult.pullDuration.Truncate(time.Millisecond), imagePullDuration, imagePullResult.imageSize), klog.Info)
304+
image, imagePullResult.pullDuration.Truncate(time.Millisecond), imagePullDuration, imagePullResult.imageSize), klog.Info)
235305
metrics.ImagePullDuration.WithLabelValues(metrics.GetImageSizeBucket(imagePullResult.imageSize)).Observe(imagePullDuration.Seconds())
236306
m.backOff.GC()
307+
finalPullCredentials = imagePullResult.credentialsUsed
308+
pullSucceeded = true
237309

238310
return imagePullResult.imageRef, "", nil
239311
}
@@ -287,3 +359,23 @@ func applyDefaultImageTag(image string) (string, error) {
287359
}
288360
return image, nil
289361
}
362+
363+
func trackedToImagePullCreds(trackedCreds *credentialprovider.TrackedAuthConfig) *kubeletconfiginternal.ImagePullCredentials {
364+
ret := &kubeletconfiginternal.ImagePullCredentials{}
365+
switch {
366+
case trackedCreds == nil, trackedCreds.Source == nil:
367+
ret.NodePodsAccessible = true
368+
default:
369+
sourceSecret := trackedCreds.Source.Secret
370+
ret.KubernetesSecrets = []kubeletconfiginternal.ImagePullSecret{
371+
{
372+
UID: sourceSecret.UID,
373+
Name: sourceSecret.Name,
374+
Namespace: sourceSecret.Namespace,
375+
CredentialHash: trackedCreds.AuthConfigHash,
376+
},
377+
}
378+
}
379+
380+
return ret
381+
}

0 commit comments

Comments
 (0)