5555)
5656
5757const (
58- ownerKey = ".metadata.controller"
59- ownerKind = "RabbitmqCluster"
60- deletionFinalizer = "deletion.finalizers.rabbitmqclusters.rabbitmq.com"
58+ ownerKey = ".metadata.controller"
59+ ownerKind = "RabbitmqCluster"
60+ deletionFinalizer = "deletion.finalizers.rabbitmqclusters.rabbitmq.com"
61+ pluginsUpdateAnnotation = "rabbitmq.com/pluginsUpdatedAt"
6162)
6263
6364// RabbitmqClusterReconciler reconciles a RabbitmqCluster object
@@ -183,7 +184,6 @@ func (r *RabbitmqClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, er
183184 operationResult , apiError = controllerutil .CreateOrUpdate (ctx , r , resource , func () error {
184185 return builder .Update (resource )
185186 })
186-
187187 return apiError
188188 })
189189 r .logAndRecordOperationResult (rabbitmqCluster , resource , operationResult , err )
@@ -194,11 +194,13 @@ func (r *RabbitmqClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, er
194194 "namespace" , rabbitmqCluster .Namespace ,
195195 "name" , rabbitmqCluster .Name )
196196 }
197-
198197 return ctrl.Result {}, err
199198 }
200199
201- r .restartStatefulSetIfNeeded (ctx , builder , operationResult , rabbitmqCluster )
200+ r .annotatePluginsConfigMapIfUpdated (ctx , builder , operationResult , rabbitmqCluster )
201+ if restarted := r .restartStatefulSetIfNeeded (ctx , builder , operationResult , rabbitmqCluster ); restarted {
202+ return ctrl.Result {RequeueAfter : time .Second * 10 }, nil
203+ }
202204 }
203205
204206 // Set ReconcileSuccess to true here because all CRUD operations to Kube API related
@@ -214,18 +216,13 @@ func (r *RabbitmqClusterReconciler) Reconcile(req ctrl.Request) (ctrl.Result, er
214216 return ctrl.Result {}, err
215217 }
216218
217- if ok , err := r .allReplicasReady (ctx , rabbitmqCluster ); ! ok {
218- // only enable plugins when all pods of the StatefulSet become ready
219- // requeue request after 10 seconds without error
220- logger .Info ("Not all replicas ready yet; requeuing request to enable plugins on RabbitmqCluster" ,
221- "namespace" , rabbitmqCluster .Namespace ,
222- "name" , rabbitmqCluster .Name )
223- return ctrl.Result {RequeueAfter : time .Second * 10 }, err
224- }
225-
226- if err := r .enablePlugins (rabbitmqCluster ); err != nil {
219+ requeueAfter , err := r .setPluginsIfNeeded (ctx , rabbitmqCluster )
220+ if err != nil {
227221 return ctrl.Result {}, err
228222 }
223+ if requeueAfter > 0 {
224+ return ctrl.Result {RequeueAfter : requeueAfter }, nil
225+ }
229226
230227 logger .Info ("Finished reconciling RabbitmqCluster" ,
231228 "namespace" , rabbitmqCluster .Namespace ,
@@ -312,70 +309,154 @@ func (r *RabbitmqClusterReconciler) setAdminStatus(ctx context.Context, rmq *rab
312309 return nil
313310}
314311
315- // restartStatefulSetIfNeeded - helper function that annotates the StatefulSet PodTemplate with current timestamp
316- // to trigger a restart of the all pods in the StatefulSet when builder requires StatefulSet to be updated
317- func (r * RabbitmqClusterReconciler ) restartStatefulSetIfNeeded (ctx context.Context , builder resource.ResourceBuilder , operationResult controllerutil.OperationResult , rmq * rabbitmqv1beta1.RabbitmqCluster ) {
318- if builder .UpdateRequiresStsRestart () && operationResult == controllerutil .OperationResultUpdated {
319- if err := clientretry .RetryOnConflict (clientretry .DefaultRetry , func () error {
320- sts := & appsv1.StatefulSet {ObjectMeta : metav1.ObjectMeta {Name : rmq .ChildResourceName ("server" ), Namespace : rmq .Namespace }}
321- if err := r .Get (ctx , types.NamespacedName {Name : sts .Name , Namespace : sts .Namespace }, sts ); err != nil {
322- return err
323- }
324- if sts .Spec .Template .ObjectMeta .Annotations == nil {
325- sts .Spec .Template .ObjectMeta .Annotations = make (map [string ]string )
326- }
327- sts .Spec .Template .ObjectMeta .Annotations ["rabbitmq.com/restartAt" ] = time .Now ().Format (time .RFC3339 )
328- return r .Update (ctx , sts )
329- }); err != nil {
330- msg := fmt .Sprintf ("Failed to restart StatefulSet %s of Namespace %s; rabbitmq.conf configuration may be outdated" , rmq .ChildResourceName ("server" ), rmq .Namespace )
331- r .Log .Error (err , msg )
332- r .Recorder .Event (rmq , corev1 .EventTypeWarning , "FailedUpdate" , msg )
312+ // Adds an arbitrary annotation (rabbitmq.com/lastRestartAt) to the StatefulSet PodTemplate to trigger a StatefulSet restart
313+ // if builder requires StatefulSet to be updated.
314+ func (r * RabbitmqClusterReconciler ) restartStatefulSetIfNeeded (
315+ ctx context.Context ,
316+ builder resource.ResourceBuilder ,
317+ operationResult controllerutil.OperationResult ,
318+ rmq * rabbitmqv1beta1.RabbitmqCluster ) (restarted bool ) {
319+
320+ if ! (builder .UpdateRequiresStsRestart () && operationResult == controllerutil .OperationResultUpdated ) {
321+ return false
322+ }
323+
324+ if err := clientretry .RetryOnConflict (clientretry .DefaultRetry , func () error {
325+ sts := & appsv1.StatefulSet {ObjectMeta : metav1.ObjectMeta {Name : rmq .ChildResourceName ("server" ), Namespace : rmq .Namespace }}
326+ if err := r .Get (ctx , types.NamespacedName {Name : sts .Name , Namespace : sts .Namespace }, sts ); err != nil {
327+ return err
333328 }
334- msg := fmt .Sprintf ("Restarted StatefulSet %s of Namespace %s" , rmq .ChildResourceName ("server" ), rmq .Namespace )
335- r .Log .Info (msg )
336- r .Recorder .Event (rmq , corev1 .EventTypeNormal , "SuccessfulUpdate" , msg )
329+ if sts .Spec .Template .ObjectMeta .Annotations == nil {
330+ sts .Spec .Template .ObjectMeta .Annotations = make (map [string ]string )
331+ }
332+ sts .Spec .Template .ObjectMeta .Annotations ["rabbitmq.com/lastRestartAt" ] = time .Now ().Format (time .RFC3339 )
333+ return r .Update (ctx , sts )
334+ }); err != nil {
335+ msg := fmt .Sprintf ("failed to restart StatefulSet %s of Namespace %s; rabbitmq.conf configuration may be outdated" , rmq .ChildResourceName ("server" ), rmq .Namespace )
336+ r .Log .Error (err , msg )
337+ r .Recorder .Event (rmq , corev1 .EventTypeWarning , "FailedUpdate" , msg )
338+ return false
337339 }
340+
341+ msg := fmt .Sprintf ("restarted StatefulSet %s of Namespace %s" , rmq .ChildResourceName ("server" ), rmq .Namespace )
342+ r .Log .Info (msg )
343+ r .Recorder .Event (rmq , corev1 .EventTypeNormal , "SuccessfulUpdate" , msg )
344+ return true
338345}
339346
340- // allReplicasReady - helper function that checks if StatefulSet replicas are all ready
341- func (r * RabbitmqClusterReconciler ) allReplicasReady (ctx context.Context , rmq * rabbitmqv1beta1.RabbitmqCluster ) (bool , error ) {
342- sts := & appsv1.StatefulSet {}
347+ // There are 2 paths how plugins are set:
348+ // 1. When SatefulSet is (re)started, the up-to-date plugins list (ConfigMap copied by the init container) is read by RabbitMQ nodes during node start up.
349+ // 2. When the plugins ConfigMap is changed, 'rabbitmq-plugins set' updates the plugins on every node (without the need to re-start the nodes).
350+ // This method implements the 2nd path.
351+ func (r * RabbitmqClusterReconciler ) setPluginsIfNeeded (ctx context.Context , rmq * rabbitmqv1beta1.RabbitmqCluster ) (requeueAfter time.Duration , err error ) {
352+ configMap := corev1.ConfigMap {}
353+ if err := r .Get (ctx , types.NamespacedName {Namespace : rmq .Namespace , Name : rmq .ChildResourceName (resource .PluginsConfig )}, & configMap ); err != nil {
354+ return 0 , client .IgnoreNotFound (err )
355+ }
343356
344- if err := r .Get (ctx , types.NamespacedName {Name : rmq .ChildResourceName ("server" ), Namespace : rmq .Namespace }, sts ); err != nil {
345- return false , client .IgnoreNotFound (err )
357+ pluginsUpdatedAt , ok := configMap .Annotations [pluginsUpdateAnnotation ]
358+ if ! ok {
359+ return 0 , nil // plugins configMap was not updated
346360 }
347361
348- if sts .Status .ReadyReplicas < * sts .Spec .Replicas {
349- return false , nil
362+ annotationTime , err := time .Parse (time .RFC3339 , pluginsUpdatedAt )
363+ if err != nil {
364+ return 0 , err
365+ }
366+ if time .Since (annotationTime ).Seconds () < 2 {
367+ // plugins configMap was updated very recently
368+ // give StatefulSet controller some time to trigger restart of StatefulSet if necessary
369+ // otherwise, there would be race conditions where we exec into containers losing the connection due to pods being terminated
370+ r .Log .Info ("requeuing request to set plugins on RabbitmqCluster" ,
371+ "namespace" , rmq .Namespace ,
372+ "name" , rmq .Name )
373+ return 2 * time .Second , nil
350374 }
351375
352- return true , nil
353- }
376+ ready , err := r .allReplicasReadyAndUpdated (ctx , rmq )
377+ if err != nil {
378+ return 0 , err
379+ }
380+ if ! ready {
381+ r .Log .Info ("not all replicas ready yet; requeuing request to set plugins on RabbitmqCluster" ,
382+ "namespace" , rmq .Namespace ,
383+ "name" , rmq .Name )
384+ return 15 * time .Second , err
385+ }
354386
355- // enablePlugins - helper function to set the list of enabled plugins in a given RabbitmqCluster pods
356- // `rabbitmq-plugins set` disables plugins that are not in the provided list
357- func (r * RabbitmqClusterReconciler ) enablePlugins (rmq * rabbitmqv1beta1.RabbitmqCluster ) error {
358387 plugins := resource .NewRabbitmqPlugins (rmq .Spec .Rabbitmq .AdditionalPlugins )
359388 for i := int32 (0 ); i < * rmq .Spec .Replicas ; i ++ {
360389 podName := fmt .Sprintf ("%s-%d" , rmq .ChildResourceName ("server" ), i )
361390 rabbitCommand := fmt .Sprintf ("rabbitmq-plugins set %s" , plugins .AsString (" " ))
362-
363391 stdout , stderr , err := r .exec (rmq .Namespace , podName , "rabbitmq" , "sh" , "-c" , rabbitCommand )
364-
365392 if err != nil {
366-
367- r .Log .Error (err , fmt .Sprintf (
368- "Failed to enable plugins on pod %s in namespace %s, running command %s with output: %s %s" ,
369- podName , rmq .Namespace , rabbitCommand , stdout , stderr ))
370-
371- return err
393+ r .Log .Error (err , "failed to set plugins" ,
394+ "namespace" , rmq .Namespace ,
395+ "name" , rmq .Name ,
396+ "pod" , podName ,
397+ "command" , rabbitCommand ,
398+ "stdout" , stdout ,
399+ "stderr" , stderr )
400+ return 0 , err
372401 }
373402 }
374-
375- r .Log .Info ("Successfully enabled plugins on RabbitmqCluster" ,
403+ r .Log .Info ("successfully set plugins on RabbitmqCluster" ,
376404 "namespace" , rmq .Namespace ,
377405 "name" , rmq .Name )
378- return nil
406+
407+ delete (configMap .Annotations , pluginsUpdateAnnotation )
408+ if err := r .Update (ctx , & configMap ); err != nil {
409+ return 0 , client .IgnoreNotFound (err )
410+ }
411+
412+ return 0 , nil
413+ }
414+
415+ func (r * RabbitmqClusterReconciler ) allReplicasReadyAndUpdated (ctx context.Context , rmq * rabbitmqv1beta1.RabbitmqCluster ) (bool , error ) {
416+ sts := & appsv1.StatefulSet {}
417+
418+ if err := r .Get (ctx , types.NamespacedName {Name : rmq .ChildResourceName ("server" ), Namespace : rmq .Namespace }, sts ); err != nil {
419+ return false , client .IgnoreNotFound (err )
420+ }
421+
422+ desiredReplicas := * sts .Spec .Replicas
423+ if sts .Status .ReadyReplicas < desiredReplicas ||
424+ sts .Status .UpdatedReplicas < desiredReplicas { // StatefulSet rolling update is ongoing
425+ return false , nil
426+ }
427+
428+ return true , nil
429+ }
430+
431+ // Annotates the plugins ConfigMap if it was updated such that 'rabbitmq-plugins set' will be called on the RabbitMQ nodes at a later point in time
432+ func (r * RabbitmqClusterReconciler ) annotatePluginsConfigMapIfUpdated (
433+ ctx context.Context ,
434+ builder resource.ResourceBuilder ,
435+ operationResult controllerutil.OperationResult ,
436+ rmq * rabbitmqv1beta1.RabbitmqCluster ) {
437+
438+ if _ , ok := builder .(* resource.RabbitmqPluginsConfigMapBuilder ); ! ok {
439+ return
440+ }
441+ if operationResult != controllerutil .OperationResultUpdated {
442+ return
443+ }
444+
445+ if retryOnConflictErr := clientretry .RetryOnConflict (clientretry .DefaultRetry , func () error {
446+ configMap := corev1.ConfigMap {}
447+ if err := r .Get (ctx , types.NamespacedName {Namespace : rmq .Namespace , Name : rmq .ChildResourceName (resource .PluginsConfig )}, & configMap ); err != nil {
448+ return client .IgnoreNotFound (err )
449+ }
450+ if configMap .Annotations == nil {
451+ configMap .Annotations = make (map [string ]string )
452+ }
453+ configMap .Annotations [pluginsUpdateAnnotation ] = time .Now ().Format (time .RFC3339 )
454+ return r .Update (ctx , & configMap )
455+ }); retryOnConflictErr != nil {
456+ msg := fmt .Sprintf ("Failed to annotate ConfigMap %s of Namespace %s; enabled_plugins may be outdated" , rmq .ChildResourceName (resource .PluginsConfig ), rmq .Namespace )
457+ r .Log .Error (retryOnConflictErr , msg )
458+ r .Recorder .Event (rmq , corev1 .EventTypeWarning , "FailedUpdate" , msg )
459+ }
379460}
380461
381462func (r * RabbitmqClusterReconciler ) exec (namespace , podName , containerName string , command ... string ) (string , string , error ) {
@@ -407,12 +488,9 @@ func (r *RabbitmqClusterReconciler) exec(namespace, podName, containerName strin
407488 Stdin : nil ,
408489 Tty : false ,
409490 })
410-
411491 if err != nil {
412-
413492 return stdOut .String (), stdErr .String (), err
414493 }
415-
416494 if stdErr .Len () > 0 {
417495 return stdOut .String (), stdErr .String (), fmt .Errorf ("%v" , stdErr )
418496 }
0 commit comments