Skip to content

Commit 82df7a7

Browse files
committed
use cri proxy injector for parallel pulling image tests
1 parent 2bbc09b commit 82df7a7

File tree

5 files changed

+171
-106
lines changed

5 files changed

+171
-106
lines changed

test/e2e/nodefeature/nodefeature.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,6 @@ var (
7777
// TODO: document the feature (owning SIG, when to use this feature for a test)
7878
LSCIQuotaMonitoring = framework.WithNodeFeature(framework.ValidNodeFeatures.Add("LSCIQuotaMonitoring"))
7979

80-
// TODO: document the feature (owning SIG, when to use this feature for a test)
81-
MaxParallelImagePull = framework.WithNodeFeature(framework.ValidNodeFeatures.Add("MaxParallelImagePull"))
82-
8380
// TODO: document the feature (owning SIG, when to use this feature for a test)
8481
NodeAllocatable = framework.WithNodeFeature(framework.ValidNodeFeatures.Add("NodeAllocatable"))
8582

test/e2e_node/criproxy_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3333
"k8s.io/apimachinery/pkg/util/uuid"
3434
kubeletevents "k8s.io/kubernetes/pkg/kubelet/events"
35+
"k8s.io/kubernetes/pkg/kubelet/images"
3536
"k8s.io/kubernetes/test/e2e/feature"
3637
"k8s.io/kubernetes/test/e2e/framework"
3738
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
@@ -68,7 +69,12 @@ var _ = SIGDescribe(feature.CriProxy, framework.WithSerial(), func() {
6869
framework.ExpectNoError(err)
6970

7071
pod := e2epod.NewPodClient(f).Create(ctx, newPullImageAlwaysPod())
71-
podErr := e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod)
72+
podErr := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "ImagePullBackOff", 1*time.Minute, func(pod *v1.Pod) (bool, error) {
73+
if len(pod.Status.ContainerStatuses) > 0 && pod.Status.Reason == images.ErrImagePullBackOff.Error() {
74+
return true, nil
75+
}
76+
return false, nil
77+
})
7278
gomega.Expect(podErr).To(gomega.HaveOccurred())
7379

7480
eventMsg, err := getFailedToPullImageMsg(ctx, f, pod.Name)

test/e2e_node/image_pull_test.go

Lines changed: 152 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
//go:build linux
2+
// +build linux
3+
14
/*
25
Copyright 2024 The Kubernetes Authors.
36
@@ -18,28 +21,32 @@ package e2enode
1821

1922
import (
2023
"context"
24+
"fmt"
2125
"strings"
26+
"sync"
2227
"time"
2328

2429
"github.com/onsi/ginkgo/v2"
2530
"github.com/onsi/gomega"
2631
"github.com/pkg/errors"
32+
2733
v1 "k8s.io/api/core/v1"
2834
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2935
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
3036
kubeletevents "k8s.io/kubernetes/pkg/kubelet/events"
37+
"k8s.io/kubernetes/test/e2e/feature"
3138
"k8s.io/kubernetes/test/e2e/framework"
3239
e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
33-
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
34-
"k8s.io/kubernetes/test/e2e/nodefeature"
40+
"k8s.io/kubernetes/test/e2e_node/criproxy"
3541
imageutils "k8s.io/kubernetes/test/utils/image"
3642
admissionapi "k8s.io/pod-security-admission/api"
3743
"k8s.io/utils/ptr"
3844
)
3945

40-
// This test needs to run in serial to prevent caching of the images by other tests
41-
// and to prevent the wait time of image pulls to be increased by other images
42-
var _ = SIGDescribe("Pull Image", framework.WithSerial(), nodefeature.MaxParallelImagePull, func() {
46+
// CriProxy injector is used to simulate and verify the image pull behavior.
47+
// These tests need to run in serial to prevent caching of the images by other tests
48+
// and to prevent the wait time of image pulls to be increased by other images.
49+
var _ = SIGDescribe("Pull Image", feature.CriProxy, framework.WithSerial(), func() {
4350

4451
f := framework.NewDefaultFramework("parallel-pull-image-test")
4552
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
@@ -52,79 +59,75 @@ var _ = SIGDescribe("Pull Image", framework.WithSerial(), nodefeature.MaxParalle
5259
})
5360

5461
ginkgo.BeforeEach(func(ctx context.Context) {
62+
if err := resetCRIProxyInjector(); err != nil {
63+
ginkgo.Skip("Skip the test since the CRI Proxy is undefined.")
64+
}
65+
5566
testpods = prepareAndCleanup(ctx, f)
67+
gomega.Expect(len(testpods)).To(gomega.BeNumerically("<=", 5))
5668
})
5769

5870
ginkgo.AfterEach(func(ctx context.Context) {
71+
err := resetCRIProxyInjector()
72+
framework.ExpectNoError(err)
73+
5974
ginkgo.By("cleanup pods")
6075
for _, pod := range testpods {
6176
deletePodSyncByName(ctx, f, pod.Name)
6277
}
6378
})
6479

6580
ginkgo.It("should pull immediately if no more than 5 pods", func(ctx context.Context) {
66-
var pods []*v1.Pod
67-
for _, testpod := range testpods {
68-
pods = append(pods, e2epod.NewPodClient(f).Create(ctx, testpod))
69-
}
70-
for _, pod := range pods {
71-
err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Running", 10*time.Minute, func(pod *v1.Pod) (bool, error) {
72-
if pod.Status.Phase == v1.PodRunning {
73-
return true, nil
74-
}
75-
return false, nil
76-
})
77-
framework.ExpectNoError(err)
78-
}
79-
80-
events, err := f.ClientSet.CoreV1().Events(f.Namespace.Name).List(ctx, metav1.ListOptions{})
81-
framework.ExpectNoError(err)
82-
imagePulled := map[string]*pulledStruct{}
83-
// start from pulling event creationTimestamp
84-
// end from pulled event creationTimestamp
85-
podStartTime, podEndTime := map[string]metav1.Time{}, map[string]metav1.Time{}
86-
for _, event := range events.Items {
87-
if event.Reason == kubeletevents.PulledImage {
88-
podEndTime[event.InvolvedObject.Name] = event.CreationTimestamp
89-
for _, testpod := range testpods {
90-
if event.InvolvedObject.Name == testpod.Name {
91-
pulled, err := getDurationsFromPulledEventMsg(event.Message)
92-
imagePulled[testpod.Name] = pulled
93-
framework.ExpectNoError(err)
94-
break
81+
var mu sync.Mutex
82+
timeout := 20 * time.Second
83+
callCh := make(chan struct{})
84+
callStatus := make(map[int]chan struct{})
85+
err := addCRIProxyInjector(func(apiName string) error {
86+
if apiName == criproxy.PullImage {
87+
mu.Lock()
88+
callID := len(callStatus)
89+
callStatus[callID] = callCh
90+
mu.Unlock()
91+
if callID == 0 {
92+
// wait for next call
93+
select {
94+
case <-callCh:
95+
return nil
96+
case <-time.After(timeout):
97+
return fmt.Errorf("no parallel image pull after %s", timeout)
9598
}
99+
} else {
100+
// send a signal to the first call
101+
callCh <- struct{}{}
96102
}
97-
} else if event.Reason == kubeletevents.PullingImage {
98-
podStartTime[event.InvolvedObject.Name] = event.CreationTimestamp
99103
}
100-
}
101-
gomega.Expect(len(testpods)).To(gomega.BeComparableTo(len(imagePulled)))
104+
return nil
105+
})
106+
framework.ExpectNoError(err)
102107

103-
// skip if pod1 pulling time and pod2 pulling time are not overlapped
104-
if podStartTime[testpods[0].Name].Time.Before(podStartTime[testpods[1].Name].Time) {
105-
if podEndTime[testpods[0].Name].Time.Before(podStartTime[testpods[1].Name].Time) {
106-
e2eskipper.Skipf("pod1 pulling time and pod2 pulling time are not overlapped")
107-
}
108-
} else {
109-
if podEndTime[testpods[1].Name].Time.Before(podStartTime[testpods[0].Name].Time) {
110-
e2eskipper.Skipf("pod1 pulling time and pod2 pulling time are not overlapped")
111-
}
108+
for _, testpod := range testpods {
109+
_ = e2epod.NewPodClient(f).Create(ctx, testpod)
112110
}
113111

114-
// as this is parallel image pulling, the waiting duration should be similar with the pulled duration.
115-
// use 1.2 as a common ratio
116-
for _, pulled := range imagePulled {
117-
if float32(pulled.pulledIncludeWaitingDuration/time.Millisecond)/float32(pulled.pulledDuration/time.Millisecond) > 1.2 {
118-
framework.Failf("the pull duration including waiting %v should be similar with the pulled duration %v",
119-
pulled.pulledIncludeWaitingDuration, pulled.pulledDuration)
112+
imagePulled, podStartTime, podEndTime, err := getPodImagePullDurations(ctx, f, testpods)
113+
framework.ExpectNoError(err)
114+
115+
checkPodPullingOverlap(podStartTime, podEndTime, testpods)
116+
117+
for _, img := range imagePulled {
118+
framework.Logf("Pod pull duration including waiting is %v, and the pulled duration is %v", img.pulledIncludeWaitingDuration, img.pulledDuration)
119+
// if a pod image pull hanged for more than 50%, it is a delayed pull.
120+
if float32(img.pulledIncludeWaitingDuration.Milliseconds())/float32(img.pulledDuration.Milliseconds()) > 1.5 {
121+
// as this is parallel image pulling, the waiting duration should be similar with the pulled duration.
122+
framework.Failf("There is a delayed image pulling, which is not expected for parallel image pulling.")
120123
}
121124
}
122125
})
123126

124127
})
125128
})
126129

127-
var _ = SIGDescribe("Pull Image", framework.WithSerial(), nodefeature.MaxParallelImagePull, func() {
130+
var _ = SIGDescribe("Pull Image", feature.CriProxy, framework.WithSerial(), func() {
128131

129132
f := framework.NewDefaultFramework("serialize-pull-image-test")
130133
f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
@@ -139,23 +142,64 @@ var _ = SIGDescribe("Pull Image", framework.WithSerial(), nodefeature.MaxParalle
139142
var testpods []*v1.Pod
140143

141144
ginkgo.BeforeEach(func(ctx context.Context) {
145+
if err := resetCRIProxyInjector(); err != nil {
146+
ginkgo.Skip("Skip the test since the CRI Proxy is undefined.")
147+
}
148+
142149
testpods = prepareAndCleanup(ctx, f)
150+
gomega.Expect(len(testpods)).To(gomega.BeNumerically("<=", 5))
143151
})
144152

145153
ginkgo.AfterEach(func(ctx context.Context) {
154+
err := resetCRIProxyInjector()
155+
framework.ExpectNoError(err)
156+
146157
ginkgo.By("cleanup pods")
147158
for _, pod := range testpods {
148159
deletePodSyncByName(ctx, f, pod.Name)
149160
}
150161
})
151162

152163
ginkgo.It("should be waiting more", func(ctx context.Context) {
164+
// all serialize image pulls should timeout
165+
timeout := 20 * time.Second
166+
var mu sync.Mutex
167+
callCh := make(chan struct{})
168+
callStatus := make(map[int]chan struct{})
169+
err := addCRIProxyInjector(func(apiName string) error {
170+
if apiName == criproxy.PullImage {
171+
mu.Lock()
172+
callID := len(callStatus)
173+
callStatus[callID] = callCh
174+
mu.Unlock()
175+
if callID == 0 {
176+
// wait for next call
177+
select {
178+
case <-callCh:
179+
return errors.New("parallel image pull detected")
180+
case <-time.After(timeout):
181+
return nil
182+
}
183+
} else {
184+
// send a signal to the first call
185+
select {
186+
case callCh <- struct{}{}:
187+
return errors.New("parallel image pull detected")
188+
case <-time.After(timeout):
189+
return nil
190+
}
191+
}
192+
}
193+
return nil
194+
})
195+
framework.ExpectNoError(err)
196+
153197
var pods []*v1.Pod
154198
for _, testpod := range testpods {
155199
pods = append(pods, e2epod.NewPodClient(f).Create(ctx, testpod))
156200
}
157201
for _, pod := range pods {
158-
err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Running", 10*time.Minute, func(pod *v1.Pod) (bool, error) {
202+
err := e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, pod.Name, "Running", 2*time.Minute, func(pod *v1.Pod) (bool, error) {
159203
if pod.Status.Phase == v1.PodRunning {
160204
return true, nil
161205
}
@@ -164,56 +208,74 @@ var _ = SIGDescribe("Pull Image", framework.WithSerial(), nodefeature.MaxParalle
164208
framework.ExpectNoError(err)
165209
}
166210

167-
events, err := f.ClientSet.CoreV1().Events(f.Namespace.Name).List(ctx, metav1.ListOptions{})
211+
imagePulled, podStartTime, podEndTime, err := getPodImagePullDurations(ctx, f, testpods)
168212
framework.ExpectNoError(err)
169-
imagePulled := map[string]*pulledStruct{}
170-
// start from pulling event creationTimestamp
171-
// end from pulled event creationTimestamp
172-
podStartTime, podEndTime := map[string]metav1.Time{}, map[string]metav1.Time{}
173-
for _, event := range events.Items {
174-
if event.Reason == kubeletevents.PulledImage {
175-
podEndTime[event.InvolvedObject.Name] = event.CreationTimestamp
176-
for _, testpod := range testpods {
177-
if event.InvolvedObject.Name == testpod.Name {
178-
pulled, err := getDurationsFromPulledEventMsg(event.Message)
179-
imagePulled[testpod.Name] = pulled
180-
framework.ExpectNoError(err)
181-
break
182-
}
183-
}
184-
} else if event.Reason == kubeletevents.PullingImage {
185-
podStartTime[event.InvolvedObject.Name] = event.CreationTimestamp
186-
}
187-
}
188213
gomega.Expect(len(testpods)).To(gomega.BeComparableTo(len(imagePulled)))
189214

190-
// skip if pod1 pulling time and pod2 pulling time are not overlapped
191-
if podStartTime[testpods[0].Name].Time.Before(podStartTime[testpods[1].Name].Time) {
192-
if podEndTime[testpods[0].Name].Time.Before(podStartTime[testpods[1].Name].Time) {
193-
e2eskipper.Skipf("pod1 pulling time and pod2 pulling time are not overlapped")
194-
}
195-
} else {
196-
if podEndTime[testpods[1].Name].Time.Before(podStartTime[testpods[0].Name].Time) {
197-
e2eskipper.Skipf("pod1 pulling time and pod2 pulling time are not overlapped")
215+
checkPodPullingOverlap(podStartTime, podEndTime, testpods)
216+
217+
// if a pod image pull hanged for more than 50%, it is a delayed pull.
218+
var anyDelayedPull bool
219+
for _, img := range imagePulled {
220+
framework.Logf("Pod pull duration including waiting is %v, and the pulled duration is %v", img.pulledIncludeWaitingDuration, img.pulledDuration)
221+
if float32(img.pulledIncludeWaitingDuration.Milliseconds())/float32(img.pulledDuration.Milliseconds()) > 1.5 {
222+
anyDelayedPull = true
198223
}
199224
}
200-
201225
// as this is serialize image pulling, the waiting duration should be almost double the duration with the pulled duration.
202226
// use 1.5 as a common ratio to avoid some overlap during pod creation
203-
if float32(imagePulled[testpods[1].Name].pulledIncludeWaitingDuration/time.Millisecond)/float32(imagePulled[testpods[1].Name].pulledDuration/time.Millisecond) < 1.5 &&
204-
float32(imagePulled[testpods[0].Name].pulledIncludeWaitingDuration/time.Millisecond)/float32(imagePulled[testpods[0].Name].pulledDuration/time.Millisecond) < 1.5 {
205-
framework.Failf("At least, one of the pull duration including waiting %v/%v should be similar with the pulled duration %v/%v",
206-
imagePulled[testpods[1].Name].pulledIncludeWaitingDuration, imagePulled[testpods[0].Name].pulledIncludeWaitingDuration, imagePulled[testpods[1].Name].pulledDuration, imagePulled[testpods[0].Name].pulledDuration)
227+
if !anyDelayedPull {
228+
framework.Failf("All image pullings are not delayed, which is not expected for serilized image pull")
207229
}
208230
})
209231

210232
})
211233
})
212234

235+
func getPodImagePullDurations(ctx context.Context, f *framework.Framework, testpods []*v1.Pod) (map[string]*pulledStruct, map[string]metav1.Time, map[string]metav1.Time, error) {
236+
events, err := f.ClientSet.CoreV1().Events(f.Namespace.Name).List(ctx, metav1.ListOptions{})
237+
if err != nil {
238+
return nil, nil, nil, err
239+
}
240+
241+
imagePulled := map[string]*pulledStruct{}
242+
podStartTime := map[string]metav1.Time{}
243+
podEndTime := map[string]metav1.Time{}
244+
245+
for _, event := range events.Items {
246+
if event.Reason == kubeletevents.PulledImage {
247+
podEndTime[event.InvolvedObject.Name] = event.CreationTimestamp
248+
for _, testpod := range testpods {
249+
if event.InvolvedObject.Name == testpod.Name {
250+
pulled, err := getDurationsFromPulledEventMsg(event.Message)
251+
if err != nil {
252+
return nil, nil, nil, err
253+
}
254+
imagePulled[testpod.Name] = pulled
255+
break
256+
}
257+
}
258+
} else if event.Reason == kubeletevents.PullingImage {
259+
podStartTime[event.InvolvedObject.Name] = event.CreationTimestamp
260+
}
261+
}
262+
263+
return imagePulled, podStartTime, podEndTime, nil
264+
}
265+
266+
// as pods are created at the same time and image pull will delay 15s, the image pull time should be overlapped
267+
func checkPodPullingOverlap(podStartTime map[string]metav1.Time, podEndTime map[string]metav1.Time, testpods []*v1.Pod) {
268+
if podStartTime[testpods[0].Name].Time.Before(podStartTime[testpods[1].Name].Time) && podEndTime[testpods[0].Name].Time.Before(podStartTime[testpods[1].Name].Time) {
269+
framework.Failf("%v pulling time and %v pulling time are not overlapped", testpods[0].Name, testpods[1].Name)
270+
} else if podStartTime[testpods[0].Name].Time.After(podStartTime[testpods[1].Name].Time) && podStartTime[testpods[0].Name].Time.After(podEndTime[testpods[1].Name].Time) {
271+
framework.Failf("%v pulling time and %v pulling time are not overlapped", testpods[0].Name, testpods[1].Name)
272+
}
273+
}
274+
213275
func prepareAndCleanup(ctx context.Context, f *framework.Framework) (testpods []*v1.Pod) {
214276
// cuda images are > 2Gi and it will reduce the flaky rate
215-
image1 := imageutils.GetE2EImage(imageutils.CudaVectorAdd)
216-
image2 := imageutils.GetE2EImage(imageutils.CudaVectorAdd2)
277+
image1 := imageutils.GetE2EImage(imageutils.Httpd)
278+
image2 := imageutils.GetE2EImage(imageutils.HttpdNew)
217279
node := getNodeName(ctx, f)
218280

219281
testpod := &v1.Pod{

0 commit comments

Comments
 (0)