@@ -22,7 +22,6 @@ import (
2222 "fmt"
2323 "sort"
2424 "strconv"
25- "strings"
2625 "sync"
2726 "time"
2827
@@ -33,6 +32,7 @@ import (
3332 apierrors "k8s.io/apimachinery/pkg/api/errors"
3433 "k8s.io/apimachinery/pkg/api/meta"
3534 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
3636 "k8s.io/apimachinery/pkg/runtime"
3737 "k8s.io/apimachinery/pkg/runtime/schema"
3838 "k8s.io/apimachinery/pkg/types"
@@ -155,11 +155,13 @@ func (r *ShardConfigurationReconciler) Reconcile(ctx context.Context, req ctrl.R
155155 if shardCount <= 0 {
156156 return ctrl.Result {RequeueAfter : time .Second * 2 }, nil
157157 }
158- members := make ([]consistent.Member , 0 , shardCount )
159- for i := 0 ; i < shardCount ; i ++ {
160- members = append (members , Member {ID : i })
158+
159+ cc := newConsistentConfig (shardCount )
160+ resourceLists , err := r .d .ServerPreferredResources ()
161+ if err != nil && ! discovery .IsGroupDiscoveryFailedError (err ) {
162+ return ctrl.Result {}, err
161163 }
162- cc := newConsistentConfig ( members , shardCount )
164+
163165 for _ , resource := range cfg .Spec .Resources {
164166 if resource .Kind != "" {
165167 mapping , err := r .mapper .RESTMapping (schema.GroupKind {
@@ -175,14 +177,10 @@ func (r *ShardConfigurationReconciler) Reconcile(ctx context.Context, req ctrl.R
175177 return ctrl.Result {}, err
176178 }
177179
178- if err := r .UpdateShardLabel (ctx , cc , gvk , & cfg ); err != nil {
180+ if err := r .UpdateShardLabel (ctx , cc , gvk , & cfg , resource ); err != nil {
179181 return ctrl.Result {}, err
180182 }
181183 } else {
182- resourceLists , err := r .d .ServerPreferredResources ()
183- if err != nil && ! discovery .IsGroupDiscoveryFailedError (err ) {
184- return ctrl.Result {}, err
185- }
186184 for _ , resourceList := range resourceLists {
187185 if gv , err := schema .ParseGroupVersion (resourceList .GroupVersion ); err == nil && gv .Group == resource .APIGroup {
188186 for _ , apiResource := range resourceList .APIResources {
@@ -191,7 +189,7 @@ func (r *ShardConfigurationReconciler) Reconcile(ctx context.Context, req ctrl.R
191189 if err := r .RegisterResourceWatcher (gvk ); err != nil {
192190 return ctrl.Result {}, err
193191 }
194- if err := r .UpdateShardLabel (ctx , cc , gvk , & cfg ); err != nil {
192+ if err := r .UpdateShardLabel (ctx , cc , gvk , & cfg , resource ); err != nil {
195193 return ctrl.Result {}, err
196194 }
197195 }
@@ -204,30 +202,71 @@ func (r *ShardConfigurationReconciler) Reconcile(ctx context.Context, req ctrl.R
204202 return ctrl.Result {}, nil
205203}
206204
207- func (r * ShardConfigurationReconciler ) UpdateShardLabel (ctx context.Context , cc * consistent.Consistent , gvk schema.GroupVersionKind , cfg * shardapi.ShardConfiguration ) error {
208- log := log .FromContext (ctx )
205+ func (r * ShardConfigurationReconciler ) UpdateShardLabel (ctx context.Context , cc * consistent.Consistent , gvk schema.GroupVersionKind , cfg * shardapi.ShardConfiguration , ri shardapi. ResourceInfo ) error {
206+ logger := log .FromContext (ctx )
209207 shardKey := fmt .Sprintf ("shard.%s/%s" , shardapi .SchemeGroupVersion .Group , cfg .Name )
210- var list metav1.PartialObjectMetadataList
208+ nextShardKey := fmt .Sprintf ("next.%s/%s" , shardapi .SchemeGroupVersion .Group , cfg .Name )
209+
210+ var list unstructured.UnstructuredList
211211 list .SetGroupVersionKind (gvk )
212212 err := r .List (ctx , & list )
213213 if err != nil {
214214 return err
215215 }
216+
217+ ifShardKeyLabelNeedsToBeChanged := func (labels map [string ]string , shardKey string , member consistent.Member ) bool {
218+ return labels [shardKey ] != "" && labels [shardKey ] != member .String ()
219+ }
220+
216221 for _ , obj := range list .Items {
217- m := cc .LocateKey ([]byte (fmt .Sprintf ("%s/%s" , obj .GetNamespace (), obj .GetName ())))
222+ var key []byte
223+ if ri .ShardKey != nil {
224+ val , found := EvaluateJSONPath (obj .Object , * ri .ShardKey )
225+ if ! found {
226+ return fmt .Errorf ("failed to extract shard key from %s/%s %s/%s using jsonPath %s" , obj .GroupVersionKind ().Group , obj .GroupVersionKind ().Kind , obj .GetNamespace (), obj .GetName (), * ri .ShardKey )
227+ }
228+ key = []byte (fmt .Sprintf ("%s/%s" , obj .GetNamespace (), val ))
229+ } else {
230+ key = []byte (fmt .Sprintf ("%s/%s" , obj .GetNamespace (), obj .GetName ()))
231+ }
232+ m := cc .LocateKey (key )
233+ changed := false
234+ labels := obj .GetLabels ()
235+ if labels == nil {
236+ labels = make (map [string ]string )
237+ }
238+ if labels [shardKey ] == "" {
239+ labels [shardKey ] = m .String ()
240+ changed = true
241+ } else if ifShardKeyLabelNeedsToBeChanged (labels , shardKey , m ) {
242+ switch ri .UseCooperativeShardMigration {
243+ case true :
244+ if len (cfg .Status .Controllers ) > 0 && len (cfg .Status .Controllers [0 ].Pods ) > 0 {
245+ id , err := strconv .Atoi (labels [shardKey ])
246+ if err != nil {
247+ return err
248+ }
249+ if id >= len (cfg .Status .Controllers [0 ].Pods ) {
250+ labels [shardKey ] = m .String ()
251+ } else {
252+ labels [nextShardKey ] = m .String ()
253+ }
254+ }
255+ case false :
256+ labels [shardKey ] = m .String ()
257+ }
258+ changed = true
259+ }
218260
219- if obj . Labels [ shardKey ] != m . String () {
261+ if changed {
220262 opr , err := controllerutil .CreateOrPatch (ctx , r .Client , & obj , func () error {
221- if obj .Labels == nil {
222- obj .Labels = map [string ]string {}
223- }
224- obj .Labels [shardKey ] = m .String ()
263+ obj .SetLabels (labels )
225264 return nil
226265 })
227266 if err != nil {
228- log .Error (err , fmt .Sprintf ("failed to update labels for %s/%s %s/%s" , obj .GroupVersionKind ().Group , obj .GroupVersionKind ().Kind , obj .GetNamespace (), obj .GetName ()))
267+ logger .Error (err , fmt .Sprintf ("failed to update labels for %s/%s %s/%s" , obj .GroupVersionKind ().Group , obj .GroupVersionKind ().Kind , obj .GetNamespace (), obj .GetName ()))
229268 } else {
230- log .Info (fmt .Sprintf ("%s/%s %s/%s %s" , obj .GroupVersionKind ().Group , obj .GroupVersionKind ().Kind , obj .GetNamespace (), obj .GetName (), opr ))
269+ logger .Info (fmt .Sprintf ("%s/%s %s/%s %s" , obj .GroupVersionKind ().Group , obj .GroupVersionKind ().Kind , obj .GetNamespace (), obj .GetName (), opr ))
231270 }
232271 }
233272 }
@@ -360,40 +399,8 @@ func ListPods(ctx context.Context, kc client.Client, ref kmapi.TypedObjectRefere
360399 if err != nil {
361400 return nil , err
362401 }
363- sel , err := metav1 .LabelSelectorAsSelector (obj .Spec .Selector )
364- if err != nil {
365- return nil , err
366- }
367- var list metav1.PartialObjectMetadataList
368- list .SetGroupVersionKind (schema.GroupVersionKind {
369- Group : "" ,
370- Kind : "Pod" ,
371- Version : "v1" ,
372- })
373- err = kc .List (ctx , & list , client.MatchingLabelsSelector {Selector : sel })
374- if err != nil {
375- return nil , err
376- }
377- pods := make ([]string , 0 , len (list .Items ))
378- for _ , pod := range list .Items {
379- if metav1 .IsControlledBy (& pod , & obj ) {
380- pods = append (pods , pod .Name )
381- }
382- }
383- sort .Slice (pods , func (i , j int ) bool {
384- idx_i := strings .LastIndexByte (pods [i ], '-' )
385- idx_j := strings .LastIndexByte (pods [j ], '-' )
386- if idx_i == - 1 || idx_j == - 1 {
387- return pods [i ] < pods [j ]
388- }
389- oi , err_i := strconv .Atoi (pods [i ][idx_i + 1 :])
390- oj , err_j := strconv .Atoi (pods [j ][idx_j + 1 :])
391- if err_i != nil || err_j != nil {
392- return pods [i ] < pods [j ]
393- }
394- return oi < oj
395- })
396- return pods , nil
402+ // Important: Resharding feature is not available for stateful sets
403+ return buildPodList (obj .GetName (), * obj .Spec .Replicas ), nil
397404 case "DaemonSet" :
398405 var obj apps.DaemonSet
399406 err := kc .Get (ctx , client.ObjectKey {Name : ref .Name , Namespace : ref .Namespace }, & obj )
0 commit comments