Skip to content

Commit 2bbc09b

Browse files
committed
add e2e test for image pull serialize and parallel
- wait for pod pulled: running/failed(no restart)
1 parent a8fc7ae commit 2bbc09b

File tree

7 files changed

+322
-19
lines changed

7 files changed

+322
-19
lines changed

test/e2e/nodefeature/nodefeature.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ var (
7777
// TODO: document the feature (owning SIG, when to use this feature for a test)
7878
LSCIQuotaMonitoring = framework.WithNodeFeature(framework.ValidNodeFeatures.Add("LSCIQuotaMonitoring"))
7979

80+
// TODO: document the feature (owning SIG, when to use this feature for a test)
81+
MaxParallelImagePull = framework.WithNodeFeature(framework.ValidNodeFeatures.Add("MaxParallelImagePull"))
82+
8083
// TODO: document the feature (owning SIG, when to use this feature for a test)
8184
NodeAllocatable = framework.WithNodeFeature(framework.ValidNodeFeatures.Add("NodeAllocatable"))
8285

test/e2e_node/e2e_node_suite_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ var _ = ginkgo.SynchronizedBeforeSuite(func(ctx context.Context) []byte {
236236
if framework.TestContext.PrepullImages {
237237
klog.Infof("Pre-pulling images so that they are cached for the tests.")
238238
updateImageAllowList(ctx)
239-
err := PrePullAllImages()
239+
err := PrePullAllImages(ctx)
240240
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
241241
}
242242

test/e2e_node/eviction_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -692,7 +692,8 @@ func runEvictionTest(f *framework.Framework, pressureTimeout time.Duration, expe
692692
if expectedNodeCondition == v1.NodeDiskPressure && framework.TestContext.PrepullImages {
693693
// The disk eviction test may cause the prepulled images to be evicted,
694694
// prepull those images again to ensure this test not affect following tests.
695-
PrePullAllImages()
695+
err := PrePullAllImages(ctx)
696+
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
696697
}
697698
}
698699
// Run prePull using a defer to make sure it is executed even when the assertions below fails

test/e2e_node/image_gc_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ var _ = SIGDescribe("ImageGarbageCollect", framework.WithSerial(), framework.Wit
5050
_, is, err = getCRIClient()
5151
framework.ExpectNoError(err)
5252
})
53-
ginkgo.AfterEach(func() {
54-
framework.ExpectNoError(PrePullAllImages())
53+
ginkgo.AfterEach(func(ctx context.Context) {
54+
framework.ExpectNoError(PrePullAllImages(ctx))
5555
})
5656
ginkgo.Context("when ImageMaximumGCAge is set", func() {
5757
tempSetCurrentKubeletConfig(f, func(ctx context.Context, initialConfig *kubeletconfig.KubeletConfiguration) {

test/e2e_node/image_list.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,9 @@ func getNodeProblemDetectorImage() string {
110110
// puller represents a generic image puller
111111
type puller interface {
112112
// Pull pulls an image by name
113-
Pull(image string) ([]byte, error)
113+
Pull(ctx context.Context, image string) ([]byte, error)
114+
// Remove removes an image by name
115+
Remove(ctx context.Context, image string) error
114116
// Name returns the name of the specific puller implementation
115117
Name() string
116118
}
@@ -123,15 +125,19 @@ func (rp *remotePuller) Name() string {
123125
return "CRI"
124126
}
125127

126-
func (rp *remotePuller) Pull(image string) ([]byte, error) {
127-
resp, err := rp.imageService.ImageStatus(context.Background(), &runtimeapi.ImageSpec{Image: image}, false)
128+
func (rp *remotePuller) Pull(ctx context.Context, image string) ([]byte, error) {
129+
resp, err := rp.imageService.ImageStatus(ctx, &runtimeapi.ImageSpec{Image: image}, false)
128130
if err == nil && resp.GetImage() != nil {
129131
return nil, nil
130132
}
131-
_, err = rp.imageService.PullImage(context.Background(), &runtimeapi.ImageSpec{Image: image}, nil, nil)
133+
_, err = rp.imageService.PullImage(ctx, &runtimeapi.ImageSpec{Image: image}, nil, nil)
132134
return nil, err
133135
}
134136

137+
func (rp *remotePuller) Remove(ctx context.Context, image string) error {
138+
return rp.imageService.RemoveImage(ctx, &runtimeapi.ImageSpec{Image: image})
139+
}
140+
135141
func getPuller() (puller, error) {
136142
_, is, err := getCRIClient()
137143
if err != nil {
@@ -143,7 +149,7 @@ func getPuller() (puller, error) {
143149
}
144150

145151
// PrePullAllImages pre-fetches all images tests depend on so that we don't fail in an actual test.
146-
func PrePullAllImages() error {
152+
func PrePullAllImages(ctx context.Context) error {
147153
puller, err := getPuller()
148154
if err != nil {
149155
return err
@@ -191,7 +197,7 @@ func PrePullAllImages() error {
191197
if retryCount > 0 {
192198
time.Sleep(imagePullRetryDelay)
193199
}
194-
if output, pullErr = puller.Pull(images[i]); pullErr == nil {
200+
if output, pullErr = puller.Pull(ctx, images[i]); pullErr == nil {
195201
break
196202
}
197203
klog.Warningf("Failed to pull %s as user %q, retrying in %s (%d of %d): %v",
@@ -211,6 +217,14 @@ func PrePullAllImages() error {
211217
return utilerrors.NewAggregate(pullErrs)
212218
}
213219

220+
func RemoveImage(ctx context.Context, image string) error {
221+
puller, err := getPuller()
222+
if err != nil {
223+
return err
224+
}
225+
return puller.Remove(ctx, image)
226+
}
227+
214228
func getContainerImageFromE2ETestDaemonset(dsYamlPath string) (string, error) {
215229
data, err := e2etestfiles.Read(dsYamlPath)
216230
if err != nil {

test/e2e_node/image_pull_test.go

Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
/*
2+
Copyright 2024 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package e2enode
18+
19+
import (
20+
"context"
21+
"strings"
22+
"time"
23+
24+
"github.com/onsi/ginkgo/v2"
25+
"github.com/onsi/gomega"
26+
"github.com/pkg/errors"
27+
v1 "k8s.io/api/core/v1"
28+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
30+
kubeletevents "k8s.io/kubernetes/pkg/kubelet/events"
31+
"k8s.io/kubernetes/test/e2e/framework"
32+
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
33+
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
34+
"k8s.io/kubernetes/test/e2e/nodefeature"
35+
imageutils "k8s.io/kubernetes/test/utils/image"
36+
admissionapi "k8s.io/pod-security-admission/api"
37+
"k8s.io/utils/ptr"
38+
)
39+
40+
// This test needs to run in serial to prevent caching of the images by other tests
41+
// and to prevent the wait time of image pulls to be increased by other images
42+
var _ = SIGDescribe("Pull Image", framework.WithSerial(), nodefeature.MaxParallelImagePull, func() {
43+
44+
f := framework.NewDefaultFramework("parallel-pull-image-test")
45+
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
46+
var testpods []*v1.Pod
47+
48+
ginkgo.Context("parallel image pull with MaxParallelImagePulls=5", func() {
49+
tempSetCurrentKubeletConfig(f, func(ctx context.Context, initialConfig *kubeletconfig.KubeletConfiguration) {
50+
initialConfig.SerializeImagePulls = false
51+
initialConfig.MaxParallelImagePulls = ptr.To[int32](5)
52+
})
53+
54+
ginkgo.BeforeEach(func(ctx context.Context) {
55+
testpods = prepareAndCleanup(ctx, f)
56+
})
57+
58+
ginkgo.AfterEach(func(ctx context.Context) {
59+
ginkgo.By("cleanup pods")
60+
for _, pod := range testpods {
61+
deletePodSyncByName(ctx, f, pod.Name)
62+
}
63+
})
64+
65+
ginkgo.It("should pull immediately if no more than 5 pods", func(ctx context.Context) {
66+
var pods []*v1.Pod
67+
for _, testpod := range testpods {
68+
pods = append(pods, e2epod.NewPodClient(f).Create(ctx, testpod))
69+
}
70+
for _, pod := range pods {
71+
err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Running", 10*time.Minute, func(pod *v1.Pod) (bool, error) {
72+
if pod.Status.Phase == v1.PodRunning {
73+
return true, nil
74+
}
75+
return false, nil
76+
})
77+
framework.ExpectNoError(err)
78+
}
79+
80+
events, err := f.ClientSet.CoreV1().Events(f.Namespace.Name).List(ctx, metav1.ListOptions{})
81+
framework.ExpectNoError(err)
82+
imagePulled := map[string]*pulledStruct{}
83+
// start from pulling event creationTimestamp
84+
// end from pulled event creationTimestamp
85+
podStartTime, podEndTime := map[string]metav1.Time{}, map[string]metav1.Time{}
86+
for _, event := range events.Items {
87+
if event.Reason == kubeletevents.PulledImage {
88+
podEndTime[event.InvolvedObject.Name] = event.CreationTimestamp
89+
for _, testpod := range testpods {
90+
if event.InvolvedObject.Name == testpod.Name {
91+
pulled, err := getDurationsFromPulledEventMsg(event.Message)
92+
imagePulled[testpod.Name] = pulled
93+
framework.ExpectNoError(err)
94+
break
95+
}
96+
}
97+
} else if event.Reason == kubeletevents.PullingImage {
98+
podStartTime[event.InvolvedObject.Name] = event.CreationTimestamp
99+
}
100+
}
101+
gomega.Expect(len(testpods)).To(gomega.BeComparableTo(len(imagePulled)))
102+
103+
// skip if pod1 pulling time and pod2 pulling time are not overlapped
104+
if podStartTime[testpods[0].Name].Time.Before(podStartTime[testpods[1].Name].Time) {
105+
if podEndTime[testpods[0].Name].Time.Before(podStartTime[testpods[1].Name].Time) {
106+
e2eskipper.Skipf("pod1 pulling time and pod2 pulling time are not overlapped")
107+
}
108+
} else {
109+
if podEndTime[testpods[1].Name].Time.Before(podStartTime[testpods[0].Name].Time) {
110+
e2eskipper.Skipf("pod1 pulling time and pod2 pulling time are not overlapped")
111+
}
112+
}
113+
114+
// as this is parallel image pulling, the waiting duration should be similar with the pulled duration.
115+
// use 1.2 as a common ratio
116+
for _, pulled := range imagePulled {
117+
if float32(pulled.pulledIncludeWaitingDuration/time.Millisecond)/float32(pulled.pulledDuration/time.Millisecond) > 1.2 {
118+
framework.Failf("the pull duration including waiting %v should be similar with the pulled duration %v",
119+
pulled.pulledIncludeWaitingDuration, pulled.pulledDuration)
120+
}
121+
}
122+
})
123+
124+
})
125+
})
126+
127+
var _ = SIGDescribe("Pull Image", framework.WithSerial(), nodefeature.MaxParallelImagePull, func() {
128+
129+
f := framework.NewDefaultFramework("serialize-pull-image-test")
130+
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
131+
132+
ginkgo.Context("serialize image pull", func() {
133+
// this is the default behavior now.
134+
tempSetCurrentKubeletConfig(f, func(ctx context.Context, initialConfig *kubeletconfig.KubeletConfiguration) {
135+
initialConfig.SerializeImagePulls = true
136+
initialConfig.MaxParallelImagePulls = ptr.To[int32](1)
137+
})
138+
139+
var testpods []*v1.Pod
140+
141+
ginkgo.BeforeEach(func(ctx context.Context) {
142+
testpods = prepareAndCleanup(ctx, f)
143+
})
144+
145+
ginkgo.AfterEach(func(ctx context.Context) {
146+
ginkgo.By("cleanup pods")
147+
for _, pod := range testpods {
148+
deletePodSyncByName(ctx, f, pod.Name)
149+
}
150+
})
151+
152+
ginkgo.It("should be waiting more", func(ctx context.Context) {
153+
var pods []*v1.Pod
154+
for _, testpod := range testpods {
155+
pods = append(pods, e2epod.NewPodClient(f).Create(ctx, testpod))
156+
}
157+
for _, pod := range pods {
158+
err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Running", 10*time.Minute, func(pod *v1.Pod) (bool, error) {
159+
if pod.Status.Phase == v1.PodRunning {
160+
return true, nil
161+
}
162+
return false, nil
163+
})
164+
framework.ExpectNoError(err)
165+
}
166+
167+
events, err := f.ClientSet.CoreV1().Events(f.Namespace.Name).List(ctx, metav1.ListOptions{})
168+
framework.ExpectNoError(err)
169+
imagePulled := map[string]*pulledStruct{}
170+
// start from pulling event creationTimestamp
171+
// end from pulled event creationTimestamp
172+
podStartTime, podEndTime := map[string]metav1.Time{}, map[string]metav1.Time{}
173+
for _, event := range events.Items {
174+
if event.Reason == kubeletevents.PulledImage {
175+
podEndTime[event.InvolvedObject.Name] = event.CreationTimestamp
176+
for _, testpod := range testpods {
177+
if event.InvolvedObject.Name == testpod.Name {
178+
pulled, err := getDurationsFromPulledEventMsg(event.Message)
179+
imagePulled[testpod.Name] = pulled
180+
framework.ExpectNoError(err)
181+
break
182+
}
183+
}
184+
} else if event.Reason == kubeletevents.PullingImage {
185+
podStartTime[event.InvolvedObject.Name] = event.CreationTimestamp
186+
}
187+
}
188+
gomega.Expect(len(testpods)).To(gomega.BeComparableTo(len(imagePulled)))
189+
190+
// skip if pod1 pulling time and pod2 pulling time are not overlapped
191+
if podStartTime[testpods[0].Name].Time.Before(podStartTime[testpods[1].Name].Time) {
192+
if podEndTime[testpods[0].Name].Time.Before(podStartTime[testpods[1].Name].Time) {
193+
e2eskipper.Skipf("pod1 pulling time and pod2 pulling time are not overlapped")
194+
}
195+
} else {
196+
if podEndTime[testpods[1].Name].Time.Before(podStartTime[testpods[0].Name].Time) {
197+
e2eskipper.Skipf("pod1 pulling time and pod2 pulling time are not overlapped")
198+
}
199+
}
200+
201+
// as this is serialize image pulling, the waiting duration should be almost double the duration with the pulled duration.
202+
// use 1.5 as a common ratio to avoid some overlap during pod creation
203+
if float32(imagePulled[testpods[1].Name].pulledIncludeWaitingDuration/time.Millisecond)/float32(imagePulled[testpods[1].Name].pulledDuration/time.Millisecond) < 1.5 &&
204+
float32(imagePulled[testpods[0].Name].pulledIncludeWaitingDuration/time.Millisecond)/float32(imagePulled[testpods[0].Name].pulledDuration/time.Millisecond) < 1.5 {
205+
framework.Failf("At least, one of the pull duration including waiting %v/%v should be similar with the pulled duration %v/%v",
206+
imagePulled[testpods[1].Name].pulledIncludeWaitingDuration, imagePulled[testpods[0].Name].pulledIncludeWaitingDuration, imagePulled[testpods[1].Name].pulledDuration, imagePulled[testpods[0].Name].pulledDuration)
207+
}
208+
})
209+
210+
})
211+
})
212+
213+
func prepareAndCleanup(ctx context.Context, f *framework.Framework) (testpods []*v1.Pod) {
214+
// cuda images are > 2Gi and it will reduce the flaky rate
215+
image1 := imageutils.GetE2EImage(imageutils.CudaVectorAdd)
216+
image2 := imageutils.GetE2EImage(imageutils.CudaVectorAdd2)
217+
node := getNodeName(ctx, f)
218+
219+
testpod := &v1.Pod{
220+
ObjectMeta: metav1.ObjectMeta{
221+
Name: "testpod",
222+
Namespace: f.Namespace.Name,
223+
},
224+
Spec: v1.PodSpec{
225+
Containers: []v1.Container{{
226+
Name: "testpod",
227+
Image: image1,
228+
ImagePullPolicy: v1.PullAlways,
229+
}},
230+
NodeName: node,
231+
RestartPolicy: v1.RestartPolicyNever,
232+
},
233+
}
234+
testpod2 := &v1.Pod{
235+
ObjectMeta: metav1.ObjectMeta{
236+
Name: "testpod2",
237+
Namespace: f.Namespace.Name,
238+
},
239+
Spec: v1.PodSpec{
240+
Containers: []v1.Container{{
241+
Name: "testpod2",
242+
Image: image2,
243+
ImagePullPolicy: v1.PullAlways,
244+
}},
245+
NodeName: node,
246+
RestartPolicy: v1.RestartPolicyNever,
247+
},
248+
}
249+
testpods = []*v1.Pod{testpod, testpod2}
250+
251+
ginkgo.By("cleanup images")
252+
for _, pod := range testpods {
253+
_ = RemoveImage(ctx, pod.Spec.Containers[0].Image)
254+
}
255+
return testpods
256+
}
257+
258+
type pulledStruct struct {
259+
pulledDuration time.Duration
260+
pulledIncludeWaitingDuration time.Duration
261+
}
262+
263+
// getDurationsFromPulledEventMsg will parse two durations in the pulled message
264+
// Example msg: `Successfully pulled image \"busybox:1.28\" in 39.356s (49.356s including waiting). Image size: 41901587 bytes.`
265+
func getDurationsFromPulledEventMsg(msg string) (*pulledStruct, error) {
266+
splits := strings.Split(msg, " ")
267+
if len(splits) != 13 {
268+
return nil, errors.Errorf("pull event message should be spilted to 13: %d", len(splits))
269+
}
270+
pulledDuration, err := time.ParseDuration(splits[5])
271+
if err != nil {
272+
return nil, err
273+
}
274+
// to skip '('
275+
pulledIncludeWaitingDuration, err := time.ParseDuration(splits[6][1:])
276+
if err != nil {
277+
return nil, err
278+
}
279+
return &pulledStruct{
280+
pulledDuration: pulledDuration,
281+
pulledIncludeWaitingDuration: pulledIncludeWaitingDuration,
282+
}, nil
283+
}

0 commit comments

Comments
 (0)