@@ -19,6 +19,7 @@ package sharder
1919import (
2020 "context"
2121 "fmt"
22+ "sync"
2223 "time"
2324
2425 "github.com/go-logr/logr"
@@ -28,6 +29,7 @@ import (
2829 apierrors "k8s.io/apimachinery/pkg/api/errors"
2930 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3031 "k8s.io/apimachinery/pkg/runtime/schema"
32+ "k8s.io/apimachinery/pkg/types"
3133 "k8s.io/apimachinery/pkg/util/sets"
3234 "k8s.io/utils/clock"
3335 "sigs.k8s.io/controller-runtime/pkg/client"
@@ -117,6 +119,7 @@ func (r *Reconciler) NewOperation(ctx context.Context, controllerRing *shardingv
117119 Namespaces : namespaces ,
118120 HashRing : hashRing ,
119121 Shards : shards ,
122+ Concurrency : int (* r .Config .Controller .Sharder .ConcurrentMoves ),
120123 }, nil
121124}
122125
@@ -152,38 +155,73 @@ type Operation struct {
152155 Namespaces sets.Set [string ]
153156 HashRing * consistenthash.Ring
154157 Shards leases.Shards
158+
159+ Concurrency int
160+ }
161+
162+ type workItem struct {
163+ drain bool
164+ gr metav1.GroupResource
165+ gvk schema.GroupVersionKind
166+ key client.ObjectKey
167+ resourceVersion string
168+ currentShard string
155169}
156170
157171func (o * Operation ) ResyncControllerRing (ctx context.Context , log logr.Logger ) error {
158- allErrs := & multierror.Error {
159- ErrorFormat : utilerrors .FormatErrors ,
172+ var (
173+ wg sync.WaitGroup
174+ errs = make (chan error )
175+ work = make (chan * workItem , o .Concurrency )
176+ )
177+
178+ // Compile all objects that need to be moved or drained, and add them to the queue.
179+ // The buffer limit of the queue applies backpressure on the work generator (throttling list paging as needed).
180+ wg .Go (func () {
181+ o .compileWorkItemsForRing (ctx , work , errs )
182+ close (work )
183+ })
184+
185+ // read work items from the queue and perform drains/movements with the configured concurrency
186+ for i := 0 ; i < o .Concurrency ; i ++ {
187+ wg .Go (func () {
188+ for o .processNextWorkItem (ctx , log , work , errs ) {
189+ }
190+ })
160191 }
161192
162- // resync all ring resources
193+ // wait for all processors, then stop collecting errors
194+ go func () {
195+ wg .Wait ()
196+ close (errs )
197+ }()
198+
199+ // collect all errors and return a combined error if any occurred
200+ allErrs := & multierror.Error {ErrorFormat : utilerrors .FormatErrors }
201+ for err := range errs {
202+ allErrs = multierror .Append (allErrs , err )
203+ }
204+
205+ return allErrs .ErrorOrNil ()
206+ }
207+
208+ func (o * Operation ) compileWorkItemsForRing (ctx context.Context , work chan <- * workItem , errs chan <- error ) {
209+ // check all ring resources
163210 for _ , ringResource := range o .ControllerRing .Spec .Resources {
164- allErrs = multierror .Append (allErrs ,
165- o .resyncResource (ctx , log , ringResource .GroupResource , false ),
166- )
211+ errs <- o .compileWorkItemsForResource (ctx , ringResource .GroupResource , false , work )
167212
168213 for _ , controlledResource := range ringResource .ControlledResources {
169- allErrs = multierror .Append (allErrs ,
170- o .resyncResource (ctx , log , controlledResource , true ),
171- )
214+ errs <- o .compileWorkItemsForResource (ctx , controlledResource , true , work )
172215 }
173216 }
174-
175- // collect all errors and return a combined error if any occurred
176- return allErrs .ErrorOrNil ()
177217}
178218
179- func (o * Operation ) resyncResource (
219+ func (o * Operation ) compileWorkItemsForResource (
180220 ctx context.Context ,
181- log logr.Logger ,
182221 gr metav1.GroupResource ,
183222 controlled bool ,
223+ work chan <- * workItem ,
184224) error {
185- log = log .WithValues ("resource" , gr )
186-
187225 gvks , err := o .Client .RESTMapper ().KindsFor (schema.GroupVersionResource {Group : gr .Group , Resource : gr .Resource })
188226 if err != nil {
189227 return fmt .Errorf ("error determining kinds for resource %q: %w" , gr .String (), err )
@@ -202,7 +240,11 @@ func (o *Operation) resyncResource(
202240 return nil
203241 }
204242
205- allErrs = multierror .Append (allErrs , o .resyncObject (ctx , log , gr , controlled , obj .(* metav1.PartialObjectMetadata )))
243+ if w , err := o .workItemForObject (gr , controlled , obj .(* metav1.PartialObjectMetadata )); err != nil {
244+ allErrs = multierror .Append (allErrs , err )
245+ } else if w != nil {
246+ work <- w
247+ }
206248 return nil
207249 },
208250 // List a recent version from the API server's watch cache by setting resourceVersion=0. This reduces the load on etcd
@@ -226,27 +268,23 @@ var (
226268 KeyForController = key .ForController
227269)
228270
229- func (o * Operation ) resyncObject (
230- ctx context.Context ,
231- log logr.Logger ,
271+ func (o * Operation ) workItemForObject (
232272 gr metav1.GroupResource ,
233273 controlled bool ,
234274 obj * metav1.PartialObjectMetadata ,
235- ) error {
236- log = log .WithValues ("object" , client .ObjectKeyFromObject (obj ))
237-
275+ ) (* workItem , error ) {
238276 keyFunc := KeyForObject
239277 if controlled {
240278 keyFunc = KeyForController
241279 }
242280
243281 hashKey , err := keyFunc (obj )
244282 if err != nil {
245- return err
283+ return nil , err
246284 }
247285 if hashKey == "" {
248286 // object should not be assigned
249- return nil
287+ return nil , nil
250288 }
251289
252290 var (
@@ -256,45 +294,109 @@ func (o *Operation) resyncObject(
256294
257295 if desiredShard == "" {
258296 // if no shard is available, there's nothing we can do
259- return nil
297+ return nil , nil
260298 }
261299
262300 if desiredShard == currentShard {
263301 // object is correctly assigned, nothing to do here
264- return nil
302+ return nil , nil
303+ }
304+
305+ w := & workItem {
306+ gr : gr ,
307+ gvk : obj .GroupVersionKind (),
308+ key : client .ObjectKeyFromObject (obj ),
309+ resourceVersion : obj .ResourceVersion ,
310+ currentShard : currentShard ,
265311 }
266312
267313 if currentShard != "" && o .Shards .ByID (currentShard ).State .IsAvailable () && ! controlled {
268314 // If the object should be moved and the current shard is still available, we need to drain it.
269315 // We only drain non-controlled objects, the controller's main object is used as a synchronization point for
270316 // preventing concurrent reconciliations.
271- log .V (1 ).Info ("Draining object from shard" , "currentShard" , currentShard )
317+ w .drain = true
318+ }
319+
320+ // At this point, the object is either unassigned or the current shard is not available.
321+ // We send a (potentially empty) patch to trigger an assignment by the sharder webhook.
322+ return w , nil
323+ }
324+
325+ func (o * Operation ) processNextWorkItem (
326+ ctx context.Context ,
327+ log logr.Logger ,
328+ work <- chan * workItem ,
329+ errs chan <- error ,
330+ ) bool {
331+ select {
332+ case <- ctx .Done ():
333+ // stop when context is cancelled
334+ return false
335+ case w , ok := <- work :
336+ if ! ok {
337+ // stop when work queue is closed (all items have been processed)
338+ return false
339+ }
272340
273- patch := client .MergeFromWithOptions (obj .DeepCopy (), client.MergeFromWithOptimisticLock {})
274- metav1 .SetMetaDataLabel (& obj .ObjectMeta , o .ControllerRing .LabelDrain (), "true" )
275- if err := o .Client .Patch (ctx , obj , patch ); err != nil {
276- return fmt .Errorf ("error draining %s %q: %w" , gr .String (), client .ObjectKeyFromObject (obj ), err )
341+ obj := & metav1.PartialObjectMetadata {}
342+ obj .SetGroupVersionKind (w .gvk )
343+ obj .SetName (w .key .Name )
344+ obj .SetNamespace (w .key .Namespace )
345+ obj .SetResourceVersion (w .resourceVersion )
346+
347+ log = log .WithValues ("resource" , w .gr , "object" , w .key )
348+ if w .drain {
349+ log .V (1 ).Info ("Draining object from shard" , "currentShard" , w .currentShard )
350+ errs <- o .drainObject (ctx , obj , w .gr )
351+ } else {
352+ log .V (1 ).Info ("Moving object" )
353+ errs <- o .moveObject (ctx , obj , w .gr )
277354 }
355+ }
278356
279- shardingmetrics .DrainsTotal .WithLabelValues (
280- o .ControllerRing .Name , gr .Group , gr .Resource ,
281- ).Inc ()
357+ return true
358+ }
359+
360+ func (o * Operation ) drainObject (
361+ ctx context.Context ,
362+ obj * metav1.PartialObjectMetadata ,
363+ gr metav1.GroupResource ,
364+ ) error {
365+ patch := fmt .Sprintf (
366+ // - use optimistic locking by including the object's current resourceVersion
367+ // - add drain label; object will go through the sharder webhook when shard removes the drain label, which will
368+ // perform the assignment
369+ `{"metadata":{"resourceVersion":"%s","labels":{"%s":"true"}}}` ,
370+ obj .ResourceVersion , o .ControllerRing .LabelDrain (),
371+ )
282372
283- // object will go through the sharder webhook when shard removes the drain label, which will perform the assignment
284- return nil
373+ if err := o . Client . Patch ( ctx , obj , client . RawPatch ( types . MergePatchType , [] byte ( patch ))); err != nil {
374+ return fmt . Errorf ( "error draining %s %q: %w" , gr . String (), client . ObjectKeyFromObject ( obj ), err )
285375 }
286376
287- // At this point, the object is either unassigned or the current shard is not available.
288- // We send a (potentially empty) patch to trigger an assignment by the sharder webhook.
289- log .V (1 ).Info ("Moving object" )
290-
291- patch := client .MergeFromWithOptions (obj .DeepCopy (), client.MergeFromWithOptimisticLock {})
292- // remove drain label if it is still present, this might happen when trying to drain an object from a shard that
293- // just got unavailable
294- delete (obj .Labels , o .ControllerRing .LabelShard ())
295- delete (obj .Labels , o .ControllerRing .LabelDrain ())
296- if err := o .Client .Patch (ctx , obj , patch ); err != nil {
297- return fmt .Errorf ("error triggering assignment for %s %q: %w" , gr .String (), client .ObjectKeyFromObject (obj ), err )
377+ shardingmetrics .DrainsTotal .WithLabelValues (
378+ o .ControllerRing .Name , gr .Group , gr .Resource ,
379+ ).Inc ()
380+
381+ return nil
382+ }
383+
384+ func (o * Operation ) moveObject (
385+ ctx context.Context ,
386+ obj * metav1.PartialObjectMetadata ,
387+ gr metav1.GroupResource ,
388+ ) error {
389+ patch := fmt .Sprintf (
390+ // - use optimistic locking by including the object's current resourceVersion
391+ // - remove shard label
392+ // - remove drain label if it is still present, this might happen when trying to drain an object from a shard that
393+ // just got unavailable
394+ `{"metadata":{"resourceVersion":"%s","labels":{"%s":null,"%s":null}}}` ,
395+ obj .ResourceVersion , o .ControllerRing .LabelShard (), o .ControllerRing .LabelDrain (),
396+ )
397+
398+ if err := o .Client .Patch (ctx , obj , client .RawPatch (types .MergePatchType , []byte (patch ))); err != nil {
399+ return fmt .Errorf ("error moving %s %q: %w" , gr .String (), client .ObjectKeyFromObject (obj ), err )
298400 }
299401
300402 shardingmetrics .MovementsTotal .WithLabelValues (
0 commit comments