Skip to content

Commit 9111c33

Browse files
wkingwking
authored andcommitted
WIP: pkg/cvo/updatepayload: Drop the Job controller for release-manifests downloads
Previously, the CVO launched a Job and waited for it to complete to get manifests for an incoming release payload. But the Job controller doesn't bubble up details about why the pod has trouble (e.g. Init:SignatureValidationFailed), so to get those details, we need direct access to the Pod. The Job controller doesn't seem like it's adding much value here, so we're dropping it and monitoring the Pod ourselves.
1 parent cecc63f commit 9111c33

File tree

2 files changed

+99
-112
lines changed

2 files changed

+99
-112
lines changed

lib/resourcebuilder/batch.go

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,32 +7,10 @@ import (
77
batchv1 "k8s.io/api/batch/v1"
88
corev1 "k8s.io/api/core/v1"
99
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10-
"k8s.io/apimachinery/pkg/util/wait"
1110
batchclientv1 "k8s.io/client-go/kubernetes/typed/batch/v1"
1211
"k8s.io/klog/v2"
1312
)
1413

15-
// WaitForJobCompletion waits for job to complete.
16-
func WaitForJobCompletion(ctx context.Context, client batchclientv1.JobsGetter, job *batchv1.Job) error {
17-
return wait.PollUntilContextCancel(ctx, defaultObjectPollInterval, true, func(localCtx context.Context) (bool, error) {
18-
j, err := client.Jobs(job.Namespace).Get(localCtx, job.Name, metav1.GetOptions{})
19-
if err != nil {
20-
return false, fmt.Errorf("error getting Job %s: %v", job.Name, err)
21-
}
22-
23-
if done, err := checkJobHealth(localCtx, j); err != nil && done {
24-
return false, err
25-
} else if err != nil {
26-
klog.Error(err)
27-
return false, nil
28-
} else if !done {
29-
klog.V(2).Infof("Job %s in namespace %s is not ready, continuing to wait.", job.ObjectMeta.Name, job.ObjectMeta.Namespace)
30-
return false, nil
31-
}
32-
return true, nil
33-
})
34-
}
35-
3614
func (b *builder) checkJobHealth(ctx context.Context, job *batchv1.Job) error {
3715
if b.mode == InitializingMode {
3816
return nil

pkg/cvo/updatepayload.go

Lines changed: 99 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@ import (
1313

1414
"github.com/pkg/errors"
1515

16-
batchv1 "k8s.io/api/batch/v1"
1716
corev1 "k8s.io/api/core/v1"
1817
"k8s.io/apimachinery/pkg/api/resource"
1918
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2019
utilerrors "k8s.io/apimachinery/pkg/util/errors"
2120
randutil "k8s.io/apimachinery/pkg/util/rand"
21+
"k8s.io/apimachinery/pkg/util/wait"
2222
"k8s.io/client-go/kubernetes"
2323
"k8s.io/klog/v2"
2424
"k8s.io/utils/ptr"
@@ -162,9 +162,9 @@ func (r *payloadRetriever) targetUpdatePayloadDir(ctx context.Context, update co
162162
payloadHash := base64.RawURLEncoding.EncodeToString(hash.Sum(nil))
163163
tdir := filepath.Join(r.workingDir, payloadHash)
164164

165-
// Prune older jobs and directories while gracefully handling errors.
166-
if err := r.pruneJobs(ctx, 0); err != nil {
167-
klog.Warningf("failed to prune jobs: %v", err)
165+
// Prune older pods and directories while gracefully handling errors.
166+
if err := r.prunePods(ctx); err != nil {
167+
klog.Warningf("failed to prune pods: %v", err)
168168
}
169169

170170
if err := payload.ValidateDirectory(tdir); os.IsNotExist(err) {
@@ -217,123 +217,132 @@ func (r *payloadRetriever) fetchUpdatePayloadToDir(ctx context.Context, dir stri
217217
return container
218218
}
219219

220-
job := &batchv1.Job{
220+
pod := &v1.Pod{
221221
ObjectMeta: metav1.ObjectMeta{
222222
Name: name,
223223
Namespace: namespace,
224+
Labels: map[string]string{
225+
"k8s-app": "retrieve-openshift-release",
226+
},
224227
},
225-
Spec: batchv1.JobSpec{
228+
Spec: corev1.PodSpec{
226229
ActiveDeadlineSeconds: deadline,
227-
Template: corev1.PodTemplateSpec{
228-
Spec: corev1.PodSpec{
229-
InitContainers: []corev1.Container{
230-
setContainerDefaults(corev1.Container{
231-
Name: "cleanup",
232-
Command: []string{"sh", "-c", "rm -fR ./*"},
233-
WorkingDir: baseDir,
234-
}),
235-
setContainerDefaults(corev1.Container{
236-
Name: "make-temporary-directory",
237-
Command: []string{"mkdir", tmpDir},
238-
}),
239-
setContainerDefaults(corev1.Container{
240-
Name: "move-operator-manifests-to-temporary-directory",
241-
Command: []string{
242-
"mv",
243-
filepath.Join(payload.DefaultPayloadDir, payload.CVOManifestDir),
244-
filepath.Join(tmpDir, payload.CVOManifestDir),
245-
},
246-
}),
247-
setContainerDefaults(corev1.Container{
248-
Name: "move-release-manifests-to-temporary-directory",
249-
Command: []string{
250-
"mv",
251-
filepath.Join(payload.DefaultPayloadDir, payload.ReleaseManifestDir),
252-
filepath.Join(tmpDir, payload.ReleaseManifestDir),
253-
},
254-
}),
230+
InitContainers: []corev1.Container{
231+
setContainerDefaults(corev1.Container{
232+
Name: "cleanup",
233+
Command: []string{"sh", "-c", "rm -fR ./*"},
234+
WorkingDir: baseDir,
235+
}),
236+
setContainerDefaults(corev1.Container{
237+
Name: "make-temporary-directory",
238+
Command: []string{"mkdir", tmpDir},
239+
}),
240+
setContainerDefaults(corev1.Container{
241+
Name: "move-operator-manifests-to-temporary-directory",
242+
Command: []string{
243+
"mv",
244+
filepath.Join(payload.DefaultPayloadDir, payload.CVOManifestDir),
245+
filepath.Join(tmpDir, payload.CVOManifestDir),
255246
},
256-
Containers: []corev1.Container{
257-
setContainerDefaults(corev1.Container{
258-
Name: "rename-to-final-location",
259-
Command: []string{"mv", tmpDir, dir},
260-
}),
247+
}),
248+
setContainerDefaults(corev1.Container{
249+
Name: "move-release-manifests-to-temporary-directory",
250+
Command: []string{
251+
"mv",
252+
filepath.Join(payload.DefaultPayloadDir, payload.ReleaseManifestDir),
253+
filepath.Join(tmpDir, payload.ReleaseManifestDir),
261254
},
262-
Volumes: []corev1.Volume{{
263-
Name: "payloads",
264-
VolumeSource: corev1.VolumeSource{
265-
HostPath: &corev1.HostPathVolumeSource{
266-
Path: targetUpdatePayloadsDir,
267-
},
268-
},
269-
}},
270-
NodeName: nodename,
271-
NodeSelector: map[string]string{
272-
nodeSelectorKey: "",
255+
}),
256+
},
257+
Containers: []corev1.Container{
258+
setContainerDefaults(corev1.Container{
259+
Name: "rename-to-final-location",
260+
Command: []string{"mv", tmpDir, dir},
261+
}),
262+
},
263+
Volumes: []corev1.Volume{{
264+
Name: "payloads",
265+
VolumeSource: corev1.VolumeSource{
266+
HostPath: &corev1.HostPathVolumeSource{
267+
Path: targetUpdatePayloadsDir,
273268
},
274-
PriorityClassName: "openshift-user-critical",
275-
Tolerations: []corev1.Toleration{{
276-
Key: nodeSelectorKey,
277-
}},
278-
RestartPolicy: corev1.RestartPolicyOnFailure,
279269
},
270+
}},
271+
NodeName: nodename,
272+
NodeSelector: map[string]string{
273+
nodeSelectorKey: "",
280274
},
275+
PriorityClassName: "openshift-user-critical",
276+
Tolerations: []corev1.Toleration{{
277+
Key: nodeSelectorKey,
278+
}},
279+
RestartPolicy: corev1.RestartPolicyOnFailure,
281280
},
282-
}
281+
},
283282

284-
if _, err := r.kubeClient.BatchV1().Jobs(job.Namespace).Create(ctx, job, metav1.CreateOptions{}); err != nil {
283+
if _, err := r.kubeClient.V1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
285284
return err
286285
}
287-
return resourcebuilder.WaitForJobCompletion(ctx, r.kubeClient.BatchV1(), job)
286+
287+
return wait.PollUntilContextCancel(ctx, 3 * time.Second, true, func(localCtx context.Context) (bool, error) {
288+
p, err := r.kubeClient.V1().Pods(pod.Namespace).Get(localCtx, pod.Name, metav1.GetOptions{})
289+
if err != nil {
290+
klog.Warningf("unable to get OpenShift release retrieval pod: %v", err)
291+
return false, nil
292+
}
293+
294+
if p.Status.Phase != "", {
295+
return false, err
296+
} else if err != nil {
297+
klog.Error(err)
298+
return false, nil
299+
} else if !done {
300+
klog.V(2).Infof("Job %s in namespace %s is not ready, continuing to wait.", job.ObjectMeta.Name, job.ObjectMeta.Namespace)
301+
return false, nil
302+
}
303+
return true, nil
304+
})
288305
}
289306

290-
// pruneJobs deletes the older, finished jobs in the namespace.
291-
// retain - the number of newest jobs to keep.
292-
func (r *payloadRetriever) pruneJobs(ctx context.Context, retain int) error {
307+
// prunePods deletes the older, finished pods in the namespace.
308+
func (r *payloadRetriever) prunePods(ctx context.Context) error {
309+
var errs []error
310+
311+
// begin transitional job pruning, in case any dangled from earlier versions
293312
jobs, err := r.kubeClient.BatchV1().Jobs(r.namespace).List(ctx, metav1.ListOptions{})
294313
if err != nil {
295-
return err
296-
}
297-
if len(jobs.Items) <= retain {
298-
return nil
314+
errs = append(errs, err)
299315
}
300316

301-
// Select jobs to be deleted
302-
var deleteJobs []batchv1.Job
303317
for _, job := range jobs.Items {
304-
switch {
305-
// Ignore jobs not beginning with operatorName
306-
case !strings.HasPrefix(job.Name, r.operatorName+"-"):
307-
break
308-
default:
309-
deleteJobs = append(deleteJobs, job)
318+
if !strings.HasPrefix(job.Name, r.operatorName+"-") {
319+
// Ignore jobs not beginning with operatorName
320+
continue
321+
}
322+
err := r.kubeClient.BatchV1().Jobs(r.namespace).Delete(ctx, job.Name, metav1.DeleteOptions{})
323+
if err != nil {
324+
errs = append(errs, errors.Wrapf(err, "failed to delete job %v", job.Name))
310325
}
311326
}
312-
if len(deleteJobs) <= retain {
313-
return nil
314-
}
327+
// end transitional job pruning
315328

316-
// Sort jobs by StartTime to determine the newest. nil StartTime is assumed newest.
317-
sort.Slice(deleteJobs, func(i, j int) bool {
318-
if deleteJobs[i].Status.StartTime == nil {
319-
return false
320-
}
321-
if deleteJobs[j].Status.StartTime == nil {
322-
return true
323-
}
324-
return deleteJobs[i].Status.StartTime.Before(deleteJobs[j].Status.StartTime)
329+
pods, err := r.kubeClient.BatchV1().Pods(r.namespace).List(ctx, metav1.ListOptions{
330+
LabelSelector: "k8s-app=retrieve-openshift-release",
325331
})
332+
if err != nil {
333+
errs = append(errs, err)
334+
}
326335

327-
var errs []error
328-
for _, job := range deleteJobs[:len(deleteJobs)-retain] {
329-
err := r.kubeClient.BatchV1().Jobs(r.namespace).Delete(ctx, job.Name, metav1.DeleteOptions{})
336+
for _, pod := pods.Items {
337+
err := r.kubeClient.V1().Pods(r.namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
330338
if err != nil {
331-
errs = append(errs, errors.Wrapf(err, "failed to delete job %v", job.Name))
339+
errs = append(errs, errors.Wrapf(err, "failed to delete pod %v", pod.Name))
332340
}
333341
}
342+
334343
agg := utilerrors.NewAggregate(errs)
335344
if agg != nil {
336-
return fmt.Errorf("error deleting jobs: %v", agg.Error())
345+
return fmt.Errorf("error deleting pods: %v", agg.Error())
337346
}
338347
return nil
339348
}

0 commit comments

Comments
 (0)