Skip to content

Commit 0659bfb

Browse files
Merge pull request #1105 from hongkailiu/drop-version-job-controller
OTA-1307: pkg/cvo/updatepayload: Drop the Job controller for release-manifests downloads
2 parents b6b7345 + 4386a7b commit 0659bfb

File tree

7 files changed

+788
-118
lines changed

7 files changed

+788
-118
lines changed

lib/resourcebuilder/batch.go

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,33 +6,9 @@ import (
66

77
batchv1 "k8s.io/api/batch/v1"
88
corev1 "k8s.io/api/core/v1"
9-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10-
"k8s.io/apimachinery/pkg/util/wait"
11-
batchclientv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
129
"k8s.io/klog/v2"
1310
)
1411

15-
// WaitForJobCompletion waits for job to complete.
16-
func WaitForJobCompletion(ctx context.Context, client batchclientv1.JobsGetter, job *batchv1.Job) error {
17-
return wait.PollUntilContextCancel(ctx, defaultObjectPollInterval, true, func(localCtx context.Context) (bool, error) {
18-
j, err := client.Jobs(job.Namespace).Get(localCtx, job.Name, metav1.GetOptions{})
19-
if err != nil {
20-
return false, fmt.Errorf("error getting Job %s: %v", job.Name, err)
21-
}
22-
23-
if done, err := checkJobHealth(localCtx, j); err != nil && done {
24-
return false, err
25-
} else if err != nil {
26-
klog.Error(err)
27-
return false, nil
28-
} else if !done {
29-
klog.V(2).Infof("Job %s in namespace %s is not ready, continuing to wait.", job.ObjectMeta.Name, job.ObjectMeta.Namespace)
30-
return false, nil
31-
}
32-
return true, nil
33-
})
34-
}
35-
3612
func (b *builder) checkJobHealth(ctx context.Context, job *batchv1.Job) error {
3713
if b.mode == InitializingMode {
3814
return nil

lib/resourcebuilder/interface.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"fmt"
66
"sync"
7-
"time"
87

98
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
109
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -88,7 +87,3 @@ func New(mapper *ResourceMapper, rest *rest.Config, m manifest.Manifest) (Interf
8887
}
8988
return f(rest, m), nil
9089
}
91-
92-
// defaultObjectPollInterval is the default interval to poll the API to determine whether an object
93-
// is ready. Use this when a more specific interval is not necessary.
94-
const defaultObjectPollInterval = 3 * time.Second

pkg/cvo/updatepayload.go

Lines changed: 174 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,26 @@ import (
77
"fmt"
88
"os"
99
"path/filepath"
10-
"sort"
1110
"strings"
1211
"time"
1312

1413
"github.com/pkg/errors"
1514

16-
batchv1 "k8s.io/api/batch/v1"
1715
corev1 "k8s.io/api/core/v1"
1816
"k8s.io/apimachinery/pkg/api/resource"
1917
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18+
"k8s.io/apimachinery/pkg/fields"
19+
"k8s.io/apimachinery/pkg/runtime"
2020
utilerrors "k8s.io/apimachinery/pkg/util/errors"
2121
randutil "k8s.io/apimachinery/pkg/util/rand"
22+
"k8s.io/apimachinery/pkg/watch"
2223
"k8s.io/client-go/kubernetes"
24+
"k8s.io/client-go/tools/cache"
25+
toolswatch "k8s.io/client-go/tools/watch"
2326
"k8s.io/klog/v2"
2427
"k8s.io/utils/ptr"
2528

2629
configv1 "github.com/openshift/api/config/v1"
27-
"github.com/openshift/cluster-version-operator/lib/resourcebuilder"
2830
"github.com/openshift/cluster-version-operator/pkg/payload"
2931
"github.com/openshift/library-go/pkg/verify"
3032
)
@@ -162,9 +164,10 @@ func (r *payloadRetriever) targetUpdatePayloadDir(ctx context.Context, update co
162164
payloadHash := base64.RawURLEncoding.EncodeToString(hash.Sum(nil))
163165
tdir := filepath.Join(r.workingDir, payloadHash)
164166

165-
// Prune older jobs and directories while gracefully handling errors.
166-
if err := r.pruneJobs(ctx, 0); err != nil {
167-
klog.Warningf("failed to prune jobs: %v", err)
167+
// Prune older pods and directories.
168+
if err := r.prunePods(ctx); err != nil {
169+
klog.Errorf("failed to prune pods: %v", err)
170+
return "", fmt.Errorf("failed to prune pods: %w", err)
168171
}
169172

170173
if err := payload.ValidateDirectory(tdir); os.IsNotExist(err) {
@@ -217,123 +220,205 @@ func (r *payloadRetriever) fetchUpdatePayloadToDir(ctx context.Context, dir stri
217220
return container
218221
}
219222

220-
job := &batchv1.Job{
223+
pod := &corev1.Pod{
221224
ObjectMeta: metav1.ObjectMeta{
222225
Name: name,
223226
Namespace: namespace,
227+
Labels: map[string]string{
228+
"k8s-app": "retrieve-openshift-release",
229+
},
224230
},
225-
Spec: batchv1.JobSpec{
231+
Spec: corev1.PodSpec{
226232
ActiveDeadlineSeconds: deadline,
227-
Template: corev1.PodTemplateSpec{
228-
Spec: corev1.PodSpec{
229-
InitContainers: []corev1.Container{
230-
setContainerDefaults(corev1.Container{
231-
Name: "cleanup",
232-
Command: []string{"sh", "-c", "rm -fR ./*"},
233-
WorkingDir: baseDir,
234-
}),
235-
setContainerDefaults(corev1.Container{
236-
Name: "make-temporary-directory",
237-
Command: []string{"mkdir", tmpDir},
238-
}),
239-
setContainerDefaults(corev1.Container{
240-
Name: "move-operator-manifests-to-temporary-directory",
241-
Command: []string{
242-
"mv",
243-
filepath.Join(payload.DefaultPayloadDir, payload.CVOManifestDir),
244-
filepath.Join(tmpDir, payload.CVOManifestDir),
245-
},
246-
}),
247-
setContainerDefaults(corev1.Container{
248-
Name: "move-release-manifests-to-temporary-directory",
249-
Command: []string{
250-
"mv",
251-
filepath.Join(payload.DefaultPayloadDir, payload.ReleaseManifestDir),
252-
filepath.Join(tmpDir, payload.ReleaseManifestDir),
253-
},
254-
}),
233+
InitContainers: []corev1.Container{
234+
setContainerDefaults(corev1.Container{
235+
Name: "cleanup",
236+
Command: []string{"sh", "-c", "rm -fR ./*"},
237+
WorkingDir: baseDir,
238+
}),
239+
setContainerDefaults(corev1.Container{
240+
Name: "make-temporary-directory",
241+
Command: []string{"mkdir", tmpDir},
242+
}),
243+
setContainerDefaults(corev1.Container{
244+
Name: "move-operator-manifests-to-temporary-directory",
245+
Command: []string{
246+
"mv",
247+
filepath.Join(payload.DefaultPayloadDir, payload.CVOManifestDir),
248+
filepath.Join(tmpDir, payload.CVOManifestDir),
255249
},
256-
Containers: []corev1.Container{
257-
setContainerDefaults(corev1.Container{
258-
Name: "rename-to-final-location",
259-
Command: []string{"mv", tmpDir, dir},
260-
}),
250+
}),
251+
setContainerDefaults(corev1.Container{
252+
Name: "move-release-manifests-to-temporary-directory",
253+
Command: []string{
254+
"mv",
255+
filepath.Join(payload.DefaultPayloadDir, payload.ReleaseManifestDir),
256+
filepath.Join(tmpDir, payload.ReleaseManifestDir),
261257
},
262-
Volumes: []corev1.Volume{{
263-
Name: "payloads",
264-
VolumeSource: corev1.VolumeSource{
265-
HostPath: &corev1.HostPathVolumeSource{
266-
Path: targetUpdatePayloadsDir,
267-
},
268-
},
269-
}},
270-
NodeName: nodename,
271-
NodeSelector: map[string]string{
272-
nodeSelectorKey: "",
258+
}),
259+
},
260+
Containers: []corev1.Container{
261+
setContainerDefaults(corev1.Container{
262+
Name: "rename-to-final-location",
263+
Command: []string{"mv", tmpDir, dir},
264+
}),
265+
},
266+
Volumes: []corev1.Volume{{
267+
Name: "payloads",
268+
VolumeSource: corev1.VolumeSource{
269+
HostPath: &corev1.HostPathVolumeSource{
270+
Path: targetUpdatePayloadsDir,
273271
},
274-
PriorityClassName: "openshift-user-critical",
275-
Tolerations: []corev1.Toleration{{
276-
Key: nodeSelectorKey,
277-
}},
278-
RestartPolicy: corev1.RestartPolicyOnFailure,
279272
},
273+
}},
274+
NodeName: nodename,
275+
NodeSelector: map[string]string{
276+
nodeSelectorKey: "",
280277
},
278+
PriorityClassName: "openshift-user-critical",
279+
Tolerations: []corev1.Toleration{{
280+
Key: nodeSelectorKey,
281+
}},
282+
RestartPolicy: corev1.RestartPolicyOnFailure,
281283
},
282284
}
283285

284-
if _, err := r.kubeClient.BatchV1().Jobs(job.Namespace).Create(ctx, job, metav1.CreateOptions{}); err != nil {
286+
klog.Infof("Spawning Pod %s ...", name)
287+
if _, err := r.kubeClient.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
285288
return err
286289
}
287-
return resourcebuilder.WaitForJobCompletion(ctx, r.kubeClient.BatchV1(), job)
290+
291+
return waitForPodCompletion(ctx, r.kubeClient.CoreV1().Pods(pod.Namespace), pod.Name)
288292
}
289293

290-
// pruneJobs deletes the older, finished jobs in the namespace.
291-
// retain - the number of newest jobs to keep.
292-
func (r *payloadRetriever) pruneJobs(ctx context.Context, retain int) error {
293-
jobs, err := r.kubeClient.BatchV1().Jobs(r.namespace).List(ctx, metav1.ListOptions{})
294-
if err != nil {
295-
return err
294+
type PodListerWatcher interface {
295+
List(ctx context.Context, opts metav1.ListOptions) (*corev1.PodList, error)
296+
Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
297+
}
298+
299+
func collectStatuses(status corev1.PodStatus, waitingConditionFunc func(reason, message string) bool) []string {
300+
var statuses []string
301+
for _, cs := range status.ContainerStatuses {
302+
if cs.State.Waiting != nil && waitingConditionFunc(cs.State.Waiting.Reason, cs.State.Waiting.Message) {
303+
statuses = append(statuses, fmt.Sprintf("container %s is waiting with reason %q and message %q", cs.Name, cs.State.Waiting.Reason, cs.State.Waiting.Message))
304+
}
305+
if cs.State.Terminated != nil && cs.State.Terminated.Message != "" {
306+
statuses = append(statuses, fmt.Sprintf("container %s is terminated with reason %q and message %q", cs.Name, cs.State.Terminated.Reason, cs.State.Terminated.Message))
307+
}
296308
}
297-
if len(jobs.Items) <= retain {
298-
return nil
309+
for _, ics := range status.InitContainerStatuses {
310+
if ics.State.Waiting != nil && waitingConditionFunc(ics.State.Waiting.Reason, ics.State.Waiting.Message) {
311+
statuses = append(statuses, fmt.Sprintf("initcontainer %s is waiting with reason %q and message %q", ics.Name, ics.State.Waiting.Reason, ics.State.Waiting.Message))
312+
}
313+
if ics.State.Terminated != nil && ics.State.Terminated.Message != "" {
314+
statuses = append(statuses, fmt.Sprintf("initcontainer %s is terminated with reason %q and message %q", ics.Name, ics.State.Terminated.Reason, ics.State.Terminated.Message))
315+
}
299316
}
317+
return statuses
318+
}
300319

301-
// Select jobs to be deleted
302-
var deleteJobs []batchv1.Job
303-
for _, job := range jobs.Items {
304-
switch {
305-
// Ignore jobs not beginning with operatorName
306-
case !strings.HasPrefix(job.Name, r.operatorName+"-"):
307-
break
320+
func podCompletionCheckFn(name string) toolswatch.ConditionFunc {
321+
return func(event watch.Event) (bool, error) {
322+
p, ok := event.Object.(*corev1.Pod)
323+
if !ok {
324+
klog.Errorf("expecting Pod but received event with kind: %s", event.Object.GetObjectKind())
325+
return false, fmt.Errorf("expecting Pod but received event with kind: %s", event.Object.GetObjectKind())
326+
}
327+
switch phase := p.Status.Phase; phase {
328+
case corev1.PodPending:
329+
klog.V(4).Infof("Pod %s is pending", name)
330+
// There are two cases at the moment we want to bottle up the waiting message where
331+
// the details would be lost if we waited until the pod failed.
332+
// Case 1: "reason: SignatureValidationFailed".
333+
// The message looks like 'image pull failed for quay.io/openshift-release-dev/ocp-release@sha256:digest because the signature validation failed: Source image rejected: A signature was required, but no signature exists'
334+
// We do not need Case 1 if https://github.com/kubernetes/kubernetes/pull/127918 lands into OCP.
335+
// Case 2: "reason: ErrImagePull".
336+
// The message looks like '...: reading manifest sha256:... in quay.io/openshift-release-dev/ocp-release: manifest unknown'
337+
// In case those keywords are changed in the future Kubernetes implementation, we will have to follow up accordingly.
338+
// Otherwise, we will lose these details in the waiting message. It brings no other harms.
339+
if statuses := collectStatuses(p.Status, func(reason, message string) bool {
340+
return reason == "SignatureValidationFailed" ||
341+
(reason == "ErrImagePull" && strings.Contains(message, "manifest unknown"))
342+
}); len(statuses) > 0 {
343+
klog.Errorf("Pod %s failed at pending with reason %q and message %q and status %s", name, p.Status.Reason, p.Status.Message, strings.Join(statuses, ","))
344+
return false, fmt.Errorf("pod %s failed at pending with reason %q and message %q and status %s", name, p.Status.Reason, p.Status.Message, strings.Join(statuses, ","))
345+
}
346+
return false, nil
347+
case corev1.PodRunning:
348+
klog.V(4).Infof("Pod %s is running, waiting for its completion ...", name)
349+
return false, nil
350+
case corev1.PodSucceeded:
351+
klog.Infof("Pod %s succeeded", name)
352+
return true, nil
353+
case corev1.PodFailed:
354+
statuses := collectStatuses(p.Status, func(reason, message string) bool { return message != "" })
355+
klog.Errorf("Pod %s failed with reason %q and message %q and status %s", name, p.Status.Reason, p.Status.Message, strings.Join(statuses, ","))
356+
return false, fmt.Errorf("pod %s failed with reason %q and message %q and status %s", name, p.Status.Reason, p.Status.Message, strings.Join(statuses, ","))
308357
default:
309-
deleteJobs = append(deleteJobs, job)
358+
klog.Errorf("Pod %s is with unexpected phase %s", name, phase)
359+
return false, fmt.Errorf("pod %s is with unexpected phase %s", name, phase)
310360
}
311361
}
312-
if len(deleteJobs) <= retain {
313-
return nil
362+
}
363+
364+
func waitForPodCompletion(ctx context.Context, podListerWatcher PodListerWatcher, name string) error {
365+
fieldSelector := fields.OneTermEqualSelector("metadata.name", name).String()
366+
_, err := toolswatch.UntilWithSync(
367+
ctx,
368+
&cache.ListWatch{
369+
ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) {
370+
return podListerWatcher.List(ctx, metav1.ListOptions{FieldSelector: fieldSelector})
371+
},
372+
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
373+
return podListerWatcher.Watch(ctx, metav1.ListOptions{FieldSelector: fieldSelector})
374+
},
375+
},
376+
&corev1.Pod{},
377+
nil,
378+
podCompletionCheckFn(name),
379+
)
380+
return err
381+
}
382+
383+
// prunePods deletes the older, finished pods in the namespace.
384+
func (r *payloadRetriever) prunePods(ctx context.Context) error {
385+
var errs []error
386+
387+
// begin transitional job pruning, in case any dangled from earlier versions
388+
jobs, err := r.kubeClient.BatchV1().Jobs(r.namespace).List(ctx, metav1.ListOptions{})
389+
if err != nil {
390+
errs = append(errs, err)
314391
}
315392

316-
// Sort jobs by StartTime to determine the newest. nil StartTime is assumed newest.
317-
sort.Slice(deleteJobs, func(i, j int) bool {
318-
if deleteJobs[i].Status.StartTime == nil {
319-
return false
393+
for _, job := range jobs.Items {
394+
if !strings.HasPrefix(job.Name, r.operatorName+"-") {
395+
// Ignore jobs not beginning with operatorName
396+
continue
320397
}
321-
if deleteJobs[j].Status.StartTime == nil {
322-
return true
398+
err := r.kubeClient.BatchV1().Jobs(r.namespace).Delete(ctx, job.Name, metav1.DeleteOptions{})
399+
if err != nil {
400+
errs = append(errs, errors.Wrapf(err, "failed to delete job %v", job.Name))
323401
}
324-
return deleteJobs[i].Status.StartTime.Before(deleteJobs[j].Status.StartTime)
402+
}
403+
// end transitional job pruning
404+
405+
pods, err := r.kubeClient.CoreV1().Pods(r.namespace).List(ctx, metav1.ListOptions{
406+
LabelSelector: "k8s-app=retrieve-openshift-release",
325407
})
408+
if err != nil {
409+
errs = append(errs, err)
410+
}
326411

327-
var errs []error
328-
for _, job := range deleteJobs[:len(deleteJobs)-retain] {
329-
err := r.kubeClient.BatchV1().Jobs(r.namespace).Delete(ctx, job.Name, metav1.DeleteOptions{})
412+
for _, pod := range pods.Items {
413+
err := r.kubeClient.CoreV1().Pods(r.namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
330414
if err != nil {
331-
errs = append(errs, errors.Wrapf(err, "failed to delete job %v", job.Name))
415+
errs = append(errs, errors.Wrapf(err, "failed to delete pod %v", pod.Name))
332416
}
333417
}
418+
334419
agg := utilerrors.NewAggregate(errs)
335420
if agg != nil {
336-
return fmt.Errorf("error deleting jobs: %v", agg.Error())
421+
return fmt.Errorf("error deleting pods: %v", agg.Error())
337422
}
338423
return nil
339424
}

0 commit comments

Comments
 (0)