@@ -24,6 +24,8 @@ import (
2424 "sigs.k8s.io/controller-runtime/pkg/client"
2525 logf "sigs.k8s.io/controller-runtime/pkg/log"
2626 "sigs.k8s.io/controller-runtime/pkg/reconcile"
27+
28+ shardingv1alpha1 "github.com/timebertt/kubernetes-controller-sharding/pkg/apis/sharding/v1alpha1"
2729)
2830
2931// Reconciler wraps another reconciler to ensure that the controller correctly handles the shard and drain labels.
@@ -35,12 +37,10 @@ type Reconciler struct {
3537 Object client.Object
3638 // Client is used to read and patch the controller's objects.
3739 Client client.Client
40+ // ControllerRingName is the name of the manager's ControllerRing.
41+ ControllerRingName string
3842 // ShardName is the shard ID of the manager.
3943 ShardName string
40- // LabelShard is the shard label specific to the manager's ControllerRing.
41- LabelShard string
42- // LabelDrain is the drain label specific to the manager's ControllerRing.
43- LabelDrain string
4444 // Do is the actual Reconciler.
4545 Do reconcile.Reconciler
4646}
@@ -62,24 +62,28 @@ func (r *Reconciler) Reconcile(ctx context.Context, request reconcile.Request) (
6262 return reconcile.Result {}, fmt .Errorf ("error retrieving object from store for determining responsibility: %w" , err )
6363 }
6464
65- labels := obj .GetLabels ()
65+ var (
66+ labels = obj .GetLabels ()
67+ labelShard = shardingv1alpha1 .LabelShard (r .ControllerRingName )
68+ labelDrain = shardingv1alpha1 .LabelDrain (r .ControllerRingName )
69+ )
6670
6771 // check if we are responsible for this object
6872 // Note that objects should already be filtered by the cache and the predicate for being assigned to this shard.
6973 // However, we still need to do a final check before reconciling here. The controller might requeue the object with
7074 // a delay or exponential. This might trigger another reconciliation even after observing a label change.
71- if shard , ok := labels [r . LabelShard ]; ! ok || shard != r .ShardName {
75+ if shard , ok := labels [labelShard ]; ! ok || shard != r .ShardName {
7276 log .V (1 ).Info ("Ignoring object as it is assigned to different shard" , "shard" , shard )
7377 return reconcile.Result {}, nil
7478 }
7579
76- if _ , drain := labels [r . LabelDrain ]; drain {
80+ if _ , drain := labels [labelDrain ]; drain {
7781 log .V (1 ).Info ("Draining object" )
7882
7983 // acknowledge drain operation
8084 patch := client .MergeFromWithOptions (obj .DeepCopyObject ().(client.Object ), client.MergeFromWithOptimisticLock {})
81- delete (labels , r . LabelShard )
82- delete (labels , r . LabelDrain )
85+ delete (labels , labelShard )
86+ delete (labels , labelDrain )
8387
8488 if err := r .Client .Patch (ctx , obj , patch ); err != nil {
8589 return reconcile.Result {}, fmt .Errorf ("error draining object: %w" , err )
0 commit comments