Skip to content

Commit 4386a7b

Browse files
committed
Use Watch API in fetchUpdatePayloadToDir
1 parent 9111c33 commit 4386a7b

File tree

7 files changed

+715
-32
lines changed

7 files changed

+715
-32
lines changed

lib/resourcebuilder/batch.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,6 @@ import (
66

77
batchv1 "k8s.io/api/batch/v1"
88
corev1 "k8s.io/api/core/v1"
9-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10-
batchclientv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
119
"k8s.io/klog/v2"
1210
)
1311

lib/resourcebuilder/interface.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"fmt"
66
"sync"
7-
"time"
87

98
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
109
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -88,7 +87,3 @@ func New(mapper *ResourceMapper, rest *rest.Config, m manifest.Manifest) (Interf
8887
}
8988
return f(rest, m), nil
9089
}
91-
92-
// defaultObjectPollInterval is the default interval to poll the API to determine whether an object
93-
// is ready. Use this when a more specific interval is not necessary.
94-
const defaultObjectPollInterval = 3 * time.Second

pkg/cvo/updatepayload.go

Lines changed: 101 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"fmt"
88
"os"
99
"path/filepath"
10-
"sort"
1110
"strings"
1211
"time"
1312

@@ -16,15 +15,18 @@ import (
1615
corev1 "k8s.io/api/core/v1"
1716
"k8s.io/apimachinery/pkg/api/resource"
1817
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18+
"k8s.io/apimachinery/pkg/fields"
19+
"k8s.io/apimachinery/pkg/runtime"
1920
utilerrors "k8s.io/apimachinery/pkg/util/errors"
2021
randutil "k8s.io/apimachinery/pkg/util/rand"
21-
"k8s.io/apimachinery/pkg/util/wait"
22+
"k8s.io/apimachinery/pkg/watch"
2223
"k8s.io/client-go/kubernetes"
24+
"k8s.io/client-go/tools/cache"
25+
toolswatch "k8s.io/client-go/tools/watch"
2326
"k8s.io/klog/v2"
2427
"k8s.io/utils/ptr"
2528

2629
configv1 "github.com/openshift/api/config/v1"
27-
"github.com/openshift/cluster-version-operator/lib/resourcebuilder"
2830
"github.com/openshift/cluster-version-operator/pkg/payload"
2931
"github.com/openshift/library-go/pkg/verify"
3032
)
@@ -162,9 +164,10 @@ func (r *payloadRetriever) targetUpdatePayloadDir(ctx context.Context, update co
162164
payloadHash := base64.RawURLEncoding.EncodeToString(hash.Sum(nil))
163165
tdir := filepath.Join(r.workingDir, payloadHash)
164166

165-
// Prune older pods and directories while gracefully handling errors.
167+
// Prune older pods and directories.
166168
if err := r.prunePods(ctx); err != nil {
167-
klog.Warningf("failed to prune pods: %v", err)
169+
klog.Errorf("failed to prune pods: %v", err)
170+
return "", fmt.Errorf("failed to prune pods: %w", err)
168171
}
169172

170173
if err := payload.ValidateDirectory(tdir); os.IsNotExist(err) {
@@ -217,7 +220,7 @@ func (r *payloadRetriever) fetchUpdatePayloadToDir(ctx context.Context, dir stri
217220
return container
218221
}
219222

220-
pod := &v1.Pod{
223+
pod := &corev1.Pod{
221224
ObjectMeta: metav1.ObjectMeta{
222225
Name: name,
223226
Namespace: namespace,
@@ -278,30 +281,103 @@ func (r *payloadRetriever) fetchUpdatePayloadToDir(ctx context.Context, dir stri
278281
}},
279282
RestartPolicy: corev1.RestartPolicyOnFailure,
280283
},
281-
},
284+
}
282285

283-
if _, err := r.kubeClient.V1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
286+
klog.Infof("Spawning Pod %s ...", name)
287+
if _, err := r.kubeClient.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
284288
return err
285289
}
286290

287-
return wait.PollUntilContextCancel(ctx, 3 * time.Second, true, func(localCtx context.Context) (bool, error) {
288-
p, err := r.kubeClient.V1().Pods(pod.Namespace).Get(localCtx, pod.Name, metav1.GetOptions{})
289-
if err != nil {
290-
klog.Warningf("unable to get OpenShift release retrieval pod: %v", err)
291-
return false, nil
291+
return waitForPodCompletion(ctx, r.kubeClient.CoreV1().Pods(pod.Namespace), pod.Name)
292+
}
293+
294+
type PodListerWatcher interface {
295+
List(ctx context.Context, opts metav1.ListOptions) (*corev1.PodList, error)
296+
Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
297+
}
298+
299+
func collectStatuses(status corev1.PodStatus, waitingConditionFunc func(reason, message string) bool) []string {
300+
var statuses []string
301+
for _, cs := range status.ContainerStatuses {
302+
if cs.State.Waiting != nil && waitingConditionFunc(cs.State.Waiting.Reason, cs.State.Waiting.Message) {
303+
statuses = append(statuses, fmt.Sprintf("container %s is waiting with reason %q and message %q", cs.Name, cs.State.Waiting.Reason, cs.State.Waiting.Message))
304+
}
305+
if cs.State.Terminated != nil && cs.State.Terminated.Message != "" {
306+
statuses = append(statuses, fmt.Sprintf("container %s is terminated with reason %q and message %q", cs.Name, cs.State.Terminated.Reason, cs.State.Terminated.Message))
307+
}
308+
}
309+
for _, ics := range status.InitContainerStatuses {
310+
if ics.State.Waiting != nil && waitingConditionFunc(ics.State.Waiting.Reason, ics.State.Waiting.Message) {
311+
statuses = append(statuses, fmt.Sprintf("initcontainer %s is waiting with reason %q and message %q", ics.Name, ics.State.Waiting.Reason, ics.State.Waiting.Message))
312+
}
313+
if ics.State.Terminated != nil && ics.State.Terminated.Message != "" {
314+
statuses = append(statuses, fmt.Sprintf("initcontainer %s is terminated with reason %q and message %q", ics.Name, ics.State.Terminated.Reason, ics.State.Terminated.Message))
292315
}
293-
294-
if p.Status.Phase != "", {
295-
return false, err
296-
} else if err != nil {
297-
klog.Error(err)
316+
}
317+
return statuses
318+
}
319+
320+
func podCompletionCheckFn(name string) toolswatch.ConditionFunc {
321+
return func(event watch.Event) (bool, error) {
322+
p, ok := event.Object.(*corev1.Pod)
323+
if !ok {
324+
klog.Errorf("expecting Pod but received event with kind: %s", event.Object.GetObjectKind())
325+
return false, fmt.Errorf("expecting Pod but received event with kind: %s", event.Object.GetObjectKind())
326+
}
327+
switch phase := p.Status.Phase; phase {
328+
case corev1.PodPending:
329+
klog.V(4).Infof("Pod %s is pending", name)
330+
// There are two cases at the moment we want to bottle up the waiting message where
331+
// the details would be lost if we waited until the pod failed.
332+
// Case 1: "reason: SignatureValidationFailed".
333+
// The message looks like 'image pull failed for quay.io/openshift-release-dev/ocp-release@sha256:digest because the signature validation failed: Source image rejected: A signature was required, but no signature exists'
334+
// We do not need Case 1 if https://github.com/kubernetes/kubernetes/pull/127918 lands into OCP.
335+
// Case 2: "reason: ErrImagePull".
336+
// The message looks like '...: reading manifest sha256:... in quay.io/openshift-release-dev/ocp-release: manifest unknown'
337+
// In case those keywords are changed in the future Kubernetes implementation, we will have to follow up accordingly.
338+
// Otherwise, we will lose these details in the waiting message. It brings no other harms.
339+
if statuses := collectStatuses(p.Status, func(reason, message string) bool {
340+
return reason == "SignatureValidationFailed" ||
341+
(reason == "ErrImagePull" && strings.Contains(message, "manifest unknown"))
342+
}); len(statuses) > 0 {
343+
klog.Errorf("Pod %s failed at pending with reason %q and message %q and status %s", name, p.Status.Reason, p.Status.Message, strings.Join(statuses, ","))
344+
return false, fmt.Errorf("pod %s failed at pending with reason %q and message %q and status %s", name, p.Status.Reason, p.Status.Message, strings.Join(statuses, ","))
345+
}
298346
return false, nil
299-
} else if !done {
300-
klog.V(2).Infof("Job %s in namespace %s is not ready, continuing to wait.", job.ObjectMeta.Name, job.ObjectMeta.Namespace)
347+
case corev1.PodRunning:
348+
klog.V(4).Infof("Pod %s is running, waiting for its completion ...", name)
301349
return false, nil
350+
case corev1.PodSucceeded:
351+
klog.Infof("Pod %s succeeded", name)
352+
return true, nil
353+
case corev1.PodFailed:
354+
statuses := collectStatuses(p.Status, func(reason, message string) bool { return message != "" })
355+
klog.Errorf("Pod %s failed with reason %q and message %q and status %s", name, p.Status.Reason, p.Status.Message, strings.Join(statuses, ","))
356+
return false, fmt.Errorf("pod %s failed with reason %q and message %q and status %s", name, p.Status.Reason, p.Status.Message, strings.Join(statuses, ","))
357+
default:
358+
klog.Errorf("Pod %s is with unexpected phase %s", name, phase)
359+
return false, fmt.Errorf("pod %s is with unexpected phase %s", name, phase)
302360
}
303-
return true, nil
304-
})
361+
}
362+
}
363+
364+
func waitForPodCompletion(ctx context.Context, podListerWatcher PodListerWatcher, name string) error {
365+
fieldSelector := fields.OneTermEqualSelector("metadata.name", name).String()
366+
_, err := toolswatch.UntilWithSync(
367+
ctx,
368+
&cache.ListWatch{
369+
ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) {
370+
return podListerWatcher.List(ctx, metav1.ListOptions{FieldSelector: fieldSelector})
371+
},
372+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
373+
return podListerWatcher.Watch(ctx, metav1.ListOptions{FieldSelector: fieldSelector})
374+
},
375+
},
376+
&corev1.Pod{},
377+
nil,
378+
podCompletionCheckFn(name),
379+
)
380+
return err
305381
}
306382

307383
// prunePods deletes the older, finished pods in the namespace.
@@ -326,15 +402,15 @@ func (r *payloadRetriever) prunePods(ctx context.Context) error {
326402
}
327403
// end transitional job pruning
328404

329-
pods, err := r.kubeClient.BatchV1().Pods(r.namespace).List(ctx, metav1.ListOptions{
405+
pods, err := r.kubeClient.CoreV1().Pods(r.namespace).List(ctx, metav1.ListOptions{
330406
LabelSelector: "k8s-app=retrieve-openshift-release",
331407
})
332408
if err != nil {
333409
errs = append(errs, err)
334410
}
335411

336-
for _, pod := pods.Items {
337-
err := r.kubeClient.V1().Pods(r.namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
412+
for _, pod := range pods.Items {
413+
err := r.kubeClient.CoreV1().Pods(r.namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
338414
if err != nil {
339415
errs = append(errs, errors.Wrapf(err, "failed to delete pod %v", pod.Name))
340416
}

vendor/k8s.io/client-go/tools/watch/informerwatcher.go

Lines changed: 150 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)