@@ -7,18 +7,21 @@ import (
7
7
"fmt"
8
8
"os"
9
9
"path/filepath"
10
+ "sort"
10
11
"strings"
11
12
13
+ "github.com/pkg/errors"
14
+
12
15
batchv1 "k8s.io/api/batch/v1"
13
16
corev1 "k8s.io/api/core/v1"
14
17
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18
+ utilerrors "k8s.io/apimachinery/pkg/util/errors"
15
19
randutil "k8s.io/apimachinery/pkg/util/rand"
16
20
"k8s.io/client-go/kubernetes"
21
+ "k8s.io/klog"
17
22
"k8s.io/utils/pointer"
18
23
19
- "k8s.io/klog"
20
24
configv1 "github.com/openshift/api/config/v1"
21
-
22
25
"github.com/openshift/cluster-version-operator/lib/resourcebuilder"
23
26
"github.com/openshift/cluster-version-operator/pkg/payload"
24
27
"github.com/openshift/cluster-version-operator/pkg/verify"
@@ -189,13 +192,76 @@ func (r *payloadRetriever) fetchUpdatePayloadToDir(ctx context.Context, dir stri
189
192
},
190
193
}
191
194
192
- _ , err := r .kubeClient .BatchV1 ().Jobs (job .Namespace ).Create (job )
195
+ // Prune older jobs while gracefully handling errors.
196
+ err := r .pruneJobs (2 )
197
+ if err != nil {
198
+ klog .Warningf ("failed to prune jobs: %v" , err )
199
+ }
200
+
201
+ _ , err = r .kubeClient .BatchV1 ().Jobs (job .Namespace ).Create (job )
193
202
if err != nil {
194
203
return err
195
204
}
196
205
return resourcebuilder .WaitForJobCompletion (ctx , r .kubeClient .BatchV1 (), job )
197
206
}
198
207
208
+ // pruneJobs deletes the older, finished jobs in the namespace.
209
+ // retain - the number of newest jobs to keep.
210
+ func (r * payloadRetriever ) pruneJobs (retain int ) error {
211
+ jobs , err := r .kubeClient .BatchV1 ().Jobs (r .namespace ).List (metav1.ListOptions {})
212
+ if err != nil {
213
+ return err
214
+ }
215
+ if len (jobs .Items ) <= retain {
216
+ return nil
217
+ }
218
+
219
+ // Select jobs to be deleted
220
+ var deleteJobs []batchv1.Job
221
+ for _ , job := range jobs .Items {
222
+ switch {
223
+ // Ignore jobs not begining with operatorName
224
+ case ! strings .HasPrefix (job .Name , r .operatorName + "-" ):
225
+ break
226
+ // Ignore jobs that have not yet started
227
+ case job .Status .StartTime == nil :
228
+ break
229
+ // Ignore jobs that are still active
230
+ case job .Status .Active == 1 :
231
+ break
232
+ default :
233
+ deleteJobs = append (deleteJobs , job )
234
+ }
235
+ }
236
+ if len (deleteJobs ) <= retain {
237
+ return nil
238
+ }
239
+
240
+ // Sort jobs by StartTime to determine the newest. nil StartTime is assumed newest.
241
+ sort .Slice (deleteJobs , func (i , j int ) bool {
242
+ if deleteJobs [i ].Status .StartTime == nil {
243
+ return false
244
+ }
245
+ if deleteJobs [j ].Status .StartTime == nil {
246
+ return true
247
+ }
248
+ return deleteJobs [i ].Status .StartTime .Before (deleteJobs [j ].Status .StartTime )
249
+ })
250
+
251
+ var errs []error
252
+ for _ , job := range deleteJobs [:len (deleteJobs )- retain ] {
253
+ err := r .kubeClient .BatchV1 ().Jobs (r .namespace ).Delete (job .Name , & metav1.DeleteOptions {})
254
+ if err != nil {
255
+ errs = append (errs , errors .Wrapf (err , "failed to delete job %v" , job .Name ))
256
+ }
257
+ }
258
+ agg := utilerrors .NewAggregate (errs )
259
+ if agg != nil {
260
+ return fmt .Errorf ("error deleting jobs: %v" , agg .Error ())
261
+ }
262
+ return nil
263
+ }
264
+
199
265
// copyPayloadCmd returns command that copies cvo and release manifests from deafult location
200
266
// to the target dir.
201
267
// It is made up of 2 commands:
0 commit comments