Skip to content

Commit 9cf515b

Browse files
Set Stricter Predicates For Compaction Controller (#1158)
* reconcile only compaction jobs, not jobs of any type * compaction controller predicate to allow reconciles only if necessary job.Status params change
1 parent ce7d0cd commit 9cf515b

File tree

4 files changed

+142
-20
lines changed

4 files changed

+142
-20
lines changed

internal/controller/compaction/reconciler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ func (r *Reconciler) createCompactionJob(ctx context.Context, logger logr.Logger
297297
APIVersion: druidv1alpha1.SchemeGroupVersion.String(),
298298
BlockOwnerDeletion: ptr.To(true),
299299
Controller: ptr.To(true),
300-
Kind: "Etcd",
300+
Kind: druidv1alpha1.SchemeGroupVersion.WithKind("Etcd").Kind,
301301
Name: etcd.Name,
302302
UID: etcd.UID,
303303
},

internal/controller/compaction/register.go

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212

1313
batchv1 "k8s.io/api/batch/v1"
1414
coordinationv1 "k8s.io/api/coordination/v1"
15-
apiequality "k8s.io/apimachinery/pkg/api/equality"
15+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1616
ctrl "sigs.k8s.io/controller-runtime"
1717
"sigs.k8s.io/controller-runtime/pkg/client"
1818
"sigs.k8s.io/controller-runtime/pkg/controller"
@@ -32,7 +32,7 @@ func (r *Reconciler) RegisterWithManager(mgr ctrl.Manager) error {
3232
}).
3333
For(&druidv1alpha1.Etcd{}).
3434
WithEventFilter(predicate.
35-
Or(snapshotRevisionChanged(), jobStatusChanged())).
35+
Or(snapshotRevisionChanged(), compactionJobStatusChanged())).
3636
Owns(&coordinationv1.Lease{}).
3737
Owns(&batchv1.Job{}).
3838
Complete(r)
@@ -79,8 +79,42 @@ func snapshotRevisionChanged() predicate.Predicate {
7979
}
8080
}
8181

82-
// jobStatusChanged is a predicate that is `true` if the status of a job changes.
83-
func jobStatusChanged() predicate.Predicate {
82+
// compactionJobStatusChanged is a predicate that is `true` if the status of a compaction job changes.
83+
func compactionJobStatusChanged() predicate.Predicate {
84+
equalInt32Ptr := func(a, b *int32) bool {
85+
if a == nil && b == nil {
86+
return true
87+
}
88+
if a == nil || b == nil {
89+
return false
90+
}
91+
return *a == *b
92+
}
93+
94+
isCompactionJob := func(obj client.Object) bool {
95+
job, ok := obj.(*batchv1.Job)
96+
if !ok {
97+
return false
98+
}
99+
100+
etcdKind := druidv1alpha1.SchemeGroupVersion.WithKind("Etcd").Kind
101+
// Extract etcd name from the job's owner reference
102+
for _, ownerRef := range job.OwnerReferences {
103+
if ownerRef.Kind == etcdKind && ownerRef.APIVersion == druidv1alpha1.SchemeGroupVersion.String() {
104+
etcdObjMeta := metav1.ObjectMeta{
105+
Name: ownerRef.Name,
106+
Namespace: job.Namespace,
107+
}
108+
expectedCompactionJobName := druidv1alpha1.GetCompactionJobName(etcdObjMeta)
109+
return job.Name == expectedCompactionJobName
110+
}
111+
}
112+
return false
113+
}
114+
115+
// statusChange compares only the critical JobStatus fields that should trigger reconciliation.
116+
// It checks Active, Succeeded, Failed, Terminating, and Ready fields.
117+
// Returns false if the new job has active pods (Status.Active > 0) to prevent reconciliation during active compaction job execution.
84118
statusChange := func(objOld, objNew client.Object) bool {
85119
jobOld, ok := objOld.(*batchv1.Job)
86120
if !ok {
@@ -90,15 +124,29 @@ func jobStatusChanged() predicate.Predicate {
90124
if !ok {
91125
return false
92126
}
93-
return !apiequality.Semantic.DeepEqual(jobOld.Status, jobNew.Status)
127+
128+
oldStatus := jobOld.Status
129+
newStatus := jobNew.Status
130+
131+
// Prevent reconciliation when the job has active pods
132+
if newStatus.Active > 0 {
133+
return false
134+
}
135+
136+
// Compare only the critical status fields
137+
return oldStatus.Active != newStatus.Active ||
138+
oldStatus.Succeeded != newStatus.Succeeded ||
139+
oldStatus.Failed != newStatus.Failed ||
140+
!equalInt32Ptr(oldStatus.Terminating, newStatus.Terminating) ||
141+
!equalInt32Ptr(oldStatus.Ready, newStatus.Ready)
94142
}
95143

96144
return predicate.Funcs{
97145
CreateFunc: func(_ event.CreateEvent) bool {
98146
return false
99147
},
100148
UpdateFunc: func(event event.UpdateEvent) bool {
101-
return statusChange(event.ObjectOld, event.ObjectNew)
149+
return isCompactionJob(event.ObjectNew) && statusChange(event.ObjectOld, event.ObjectNew)
102150
},
103151
GenericFunc: func(_ event.GenericEvent) bool {
104152
return false

internal/controller/compaction/register_test.go

Lines changed: 73 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ import (
1111
"testing"
1212
"time"
1313

14+
druidv1alpha1 "github.com/gardener/etcd-druid/api/core/v1alpha1"
15+
"github.com/gardener/etcd-druid/test/utils"
16+
1417
batchv1 "k8s.io/api/batch/v1"
1518
coordinationv1 "k8s.io/api/coordination/v1"
1619
corev1 "k8s.io/api/core/v1"
@@ -166,50 +169,94 @@ func TestJobStatusChangedForUpdateEvents(t *testing.T) {
166169
tests := []struct {
167170
name string
168171
isObjectJob bool
172+
isObjectCompactionJob bool
169173
isStatusChanged bool
170174
shouldAllowUpdateEvent bool
171175
}{
172176
{
173177
name: "object is not a job",
174178
isObjectJob: false,
179+
isObjectCompactionJob: false,
180+
shouldAllowUpdateEvent: false,
181+
},
182+
{
183+
name: "object is a non-compaction job, and status is not changed",
184+
isObjectJob: true,
185+
isObjectCompactionJob: false,
186+
isStatusChanged: false,
187+
shouldAllowUpdateEvent: false,
188+
},
189+
{
190+
name: "object is a non-compaction job, and status is changed",
191+
isObjectJob: true,
192+
isObjectCompactionJob: false,
193+
isStatusChanged: true,
175194
shouldAllowUpdateEvent: false,
176195
},
177196
{
178-
name: "object is a job, but status is not changed",
197+
name: "object is a compaction job, but status is not changed",
179198
isObjectJob: true,
199+
isObjectCompactionJob: true,
180200
isStatusChanged: false,
181201
shouldAllowUpdateEvent: false,
182202
},
183203
{
184-
name: "object is a job, and status is changed",
204+
name: "object is a compaction job, and status is changed",
185205
isObjectJob: true,
206+
isObjectCompactionJob: true,
186207
isStatusChanged: true,
187208
shouldAllowUpdateEvent: true,
188209
},
189210
}
190211

191212
g := NewWithT(t)
192213
t.Parallel()
193-
predicate := jobStatusChanged()
214+
predicate := compactionJobStatusChanged()
194215
for _, test := range tests {
195216
t.Run(test.name, func(t *testing.T) {
196217
t.Parallel()
197-
obj, oldObj := createObjectsForJobStatusChangedPredicate(g, "etcd-test-compaction-job", test.isObjectJob, test.isStatusChanged)
218+
obj, oldObj := createObjectsForJobStatusChangedPredicate(g, druidv1alpha1.GetCompactionJobName(metav1.ObjectMeta{Name: utils.TestEtcdName}), test.isObjectJob, test.isObjectCompactionJob, test.isStatusChanged)
198219
g.Expect(predicate.Update(event.UpdateEvent{ObjectOld: oldObj, ObjectNew: obj})).To(Equal(test.shouldAllowUpdateEvent))
199220
})
200221
}
201222
}
202223

203-
func createObjectsForJobStatusChangedPredicate(g *WithT, name string, isJobObj, isStatusChanged bool) (obj client.Object, oldObj client.Object) {
224+
func createObjectsForJobStatusChangedPredicate(g *WithT, name string, isJobObj, isCompactionJob, isStatusChanged bool) (obj client.Object, oldObj client.Object) {
204225
// if the object is not a job object, create a config map (random type chosen, could have been anything else as well).
205226
if !isJobObj {
206227
obj = createConfigMap(g, name)
207228
oldObj = createConfigMap(g, name)
208229
return
209230
}
231+
// If the object is a job but not a compaction job, create a regular job
232+
if !isCompactionJob {
233+
obj = createNonCompactionJob(g, name)
234+
oldObj = createNonCompactionJob(g, name)
235+
return
236+
}
237+
210238
now := time.Now()
239+
240+
etcdName := utils.TestEtcdName
241+
etcdKind := druidv1alpha1.SchemeGroupVersion.WithKind("Etcd").Kind
242+
243+
// Create proper owner reference for compaction job
244+
ownerRef := metav1.OwnerReference{
245+
APIVersion: druidv1alpha1.SchemeGroupVersion.String(),
246+
Kind: etcdKind,
247+
Name: etcdName,
248+
UID: "test-etcd-uid-12345",
249+
Controller: ptr.To(true),
250+
BlockOwnerDeletion: ptr.To(true),
251+
}
252+
211253
// create job objects
212254
oldObj = &batchv1.Job{
255+
ObjectMeta: metav1.ObjectMeta{
256+
Name: name,
257+
Namespace: utils.TestNamespace,
258+
OwnerReferences: []metav1.OwnerReference{ownerRef},
259+
},
213260
Status: batchv1.JobStatus{
214261
Active: 1,
215262
StartTime: &metav1.Time{
@@ -219,6 +266,11 @@ func createObjectsForJobStatusChangedPredicate(g *WithT, name string, isJobObj,
219266
}
220267
if isStatusChanged {
221268
obj = &batchv1.Job{
269+
ObjectMeta: metav1.ObjectMeta{
270+
Name: name,
271+
Namespace: utils.TestNamespace,
272+
OwnerReferences: []metav1.OwnerReference{ownerRef},
273+
},
222274
Status: batchv1.JobStatus{
223275
Succeeded: 1,
224276
StartTime: &metav1.Time{
@@ -290,6 +342,22 @@ func createConfigMap(g *WithT, name string) *corev1.ConfigMap {
290342
}
291343
}
292344

345+
func createNonCompactionJob(g *WithT, name string) *batchv1.Job {
346+
randInt := generateRandomInt(g)
347+
return &batchv1.Job{
348+
ObjectMeta: metav1.ObjectMeta{
349+
Name: name,
350+
Namespace: utils.TestNamespace,
351+
},
352+
Spec: batchv1.JobSpec{
353+
ActiveDeadlineSeconds: ptr.To(int64(randInt)),
354+
},
355+
Status: batchv1.JobStatus{
356+
Active: 1,
357+
},
358+
}
359+
}
360+
293361
func generateRandomInt(g *WithT) int {
294362
randInt, err := rand.Int(rand.Reader, big.NewInt(1000))
295363
g.Expect(err).NotTo(HaveOccurred())

test/integration/controllers/compaction/reconciler_test.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -128,10 +128,13 @@ var _ = Describe("Compaction Controller", func() {
128128
}, timeout, pollingInterval).Should(BeNil())
129129

130130
// Update job status as failed
131-
j.Status.Conditions = []batchv1.JobCondition{
132-
{
133-
Type: batchv1.JobFailed,
134-
Status: corev1.ConditionTrue,
131+
j.Status = batchv1.JobStatus{
132+
Failed: 1,
133+
Conditions: []batchv1.JobCondition{
134+
{
135+
Type: batchv1.JobFailed,
136+
Status: corev1.ConditionTrue,
137+
},
135138
},
136139
}
137140
j.Status.StartTime = &metav1.Time{Time: time.Now()}
@@ -182,10 +185,13 @@ var _ = Describe("Compaction Controller", func() {
182185
}, timeout, pollingInterval).Should(BeNil())
183186

184187
// Update job status as succeeded
185-
j.Status.Conditions = []batchv1.JobCondition{
186-
{
187-
Type: batchv1.JobComplete,
188-
Status: corev1.ConditionTrue,
188+
j.Status = batchv1.JobStatus{
189+
Succeeded: 1,
190+
Conditions: []batchv1.JobCondition{
191+
{
192+
Type: batchv1.JobComplete,
193+
Status: corev1.ConditionTrue,
194+
},
189195
},
190196
}
191197
Expect(k8sClient.Status().Update(context.TODO(), j)).To(Succeed())

0 commit comments

Comments
 (0)