@@ -19,6 +19,7 @@ import (
1919 "context"
2020 "errors"
2121 "fmt"
22+ "reflect"
2223 "regexp"
2324 "sort"
2425 "strconv"
@@ -66,24 +67,8 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller
6667 // Update the CR Status
6768 defer updateCRStatus (ctx , client , cr , & err )
6869
69- // Bus config
70- busConfig := enterpriseApi.BusConfiguration {}
71- if cr .Spec .BusConfigurationRef .Name != "" {
72- ns := cr .GetNamespace ()
73- if cr .Spec .BusConfigurationRef .Namespace != "" {
74- ns = cr .Spec .BusConfigurationRef .Namespace
75- }
76- err = client .Get (context .Background (), types.NamespacedName {
77- Name : cr .Spec .BusConfigurationRef .Name ,
78- Namespace : ns ,
79- }, & busConfig )
80- if err != nil {
81- return result , err
82- }
83- }
84-
8570 // validate and updates defaults for CR
86- err = validateIndexerClusterSpec (ctx , client , cr , & busConfig )
71+ err = validateIndexerClusterSpec (ctx , client , cr )
8772 if err != nil {
8873 eventPublisher .Warning (ctx , "validateIndexerClusterSpec" , fmt .Sprintf ("validate indexercluster spec failed %s" , err .Error ()))
8974 scopedLog .Error (err , "Failed to validate indexercluster spec" )
@@ -92,6 +77,9 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller
9277
9378 // updates status after function completes
9479 cr .Status .ClusterManagerPhase = enterpriseApi .PhaseError
80+ if cr .Status .Replicas < cr .Spec .Replicas {
81+ cr .Status .BusConfiguration = enterpriseApi.BusConfigurationSpec {}
82+ }
9583 cr .Status .Replicas = cr .Spec .Replicas
9684 cr .Status .Selector = fmt .Sprintf ("app.kubernetes.io/instance=splunk-%s-indexer" , cr .GetName ())
9785 if cr .Status .Peers == nil {
@@ -257,14 +245,36 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller
257245
258246 // no need to requeue if everything is ready
259247 if cr .Status .Phase == enterpriseApi .PhaseReady {
248+ // Bus config
249+ busConfig := enterpriseApi.BusConfiguration {}
260250 if cr .Spec .BusConfigurationRef .Name != "" {
261- err = mgr .handlePullBusChange (ctx , cr , busConfig , client )
251+ ns := cr .GetNamespace ()
252+ if cr .Spec .BusConfigurationRef .Namespace != "" {
253+ ns = cr .Spec .BusConfigurationRef .Namespace
254+ }
255+ err = client .Get (context .Background (), types.NamespacedName {
256+ Name : cr .Spec .BusConfigurationRef .Name ,
257+ Namespace : ns ,
258+ }, & busConfig )
262259 if err != nil {
263- scopedLog .Error (err , "Failed to update conf file for Bus/Pipeline config change after pod creation" )
264260 return result , err
265261 }
266262 }
267- cr .Status .BusConfiguration = busConfig .Spec
263+
264+ // If bus config is updated
265+ if cr .Spec .BusConfigurationRef .Name != "" {
266+ if ! reflect .DeepEqual (cr .Status .BusConfiguration , busConfig .Spec ) {
267+ mgr := newIndexerClusterPodManager (scopedLog , cr , namespaceScopedSecret , splclient .NewSplunkClient )
268+
269+ err = mgr .handlePullBusChange (ctx , cr , busConfig , client )
270+ if err != nil {
271+ scopedLog .Error (err , "Failed to update conf file for Bus/Pipeline config change after pod creation" )
272+ return result , err
273+ }
274+
275+ cr .Status .BusConfiguration = busConfig .Spec
276+ }
277+ }
268278
269279 //update MC
270280 //Retrieve monitoring console ref from CM Spec
@@ -345,31 +355,18 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient,
345355 eventPublisher , _ := newK8EventPublisher (client , cr )
346356 cr .Kind = "IndexerCluster"
347357
348- // Bus config
349- busConfig := enterpriseApi.BusConfiguration {}
350- if cr .Spec .BusConfigurationRef .Name != "" {
351- ns := cr .GetNamespace ()
352- if cr .Spec .BusConfigurationRef .Namespace != "" {
353- ns = cr .Spec .BusConfigurationRef .Namespace
354- }
355- err := client .Get (context .Background (), types.NamespacedName {
356- Name : cr .Spec .BusConfigurationRef .Name ,
357- Namespace : ns ,
358- }, & busConfig )
359- if err != nil {
360- return result , err
361- }
362- }
363-
364358 // validate and updates defaults for CR
365- err := validateIndexerClusterSpec (ctx , client , cr , & busConfig )
359+ err := validateIndexerClusterSpec (ctx , client , cr )
366360 if err != nil {
367361 return result , err
368362 }
369363
370364 // updates status after function completes
371365 cr .Status .Phase = enterpriseApi .PhaseError
372366 cr .Status .ClusterMasterPhase = enterpriseApi .PhaseError
367+ if cr .Status .Replicas < cr .Spec .Replicas {
368+ cr .Status .BusConfiguration = enterpriseApi.BusConfigurationSpec {}
369+ }
373370 cr .Status .Replicas = cr .Spec .Replicas
374371 cr .Status .Selector = fmt .Sprintf ("app.kubernetes.io/instance=splunk-%s-indexer" , cr .GetName ())
375372 if cr .Status .Peers == nil {
@@ -538,29 +535,37 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient,
538535
539536 // no need to requeue if everything is ready
540537 if cr .Status .Phase == enterpriseApi .PhaseReady {
538+ // Bus config
539+ busConfig := enterpriseApi.BusConfiguration {}
541540 if cr .Spec .BusConfigurationRef .Name != "" {
542- busConfig := enterpriseApi.BusConfiguration {}
543541 ns := cr .GetNamespace ()
544542 if cr .Spec .BusConfigurationRef .Namespace != "" {
545543 ns = cr .Spec .BusConfigurationRef .Namespace
546544 }
547- err : = client .Get (context .Background (), types.NamespacedName {
545+ err = client .Get (context .Background (), types.NamespacedName {
548546 Name : cr .Spec .BusConfigurationRef .Name ,
549547 Namespace : ns ,
550548 }, & busConfig )
551549 if err != nil {
552550 return result , err
553551 }
552+ }
554553
555- err = mgr .handlePullBusChange (ctx , cr , busConfig , client )
556- if err != nil {
557- scopedLog .Error (err , "Failed to update conf file for Bus/Pipeline config change after pod creation" )
558- return result , err
554+ // If bus config is updated
555+ if cr .Spec .BusConfigurationRef .Name != "" {
556+ if ! reflect .DeepEqual (cr .Status .BusConfiguration , busConfig .Spec ) {
557+ mgr := newIndexerClusterPodManager (scopedLog , cr , namespaceScopedSecret , splclient .NewSplunkClient )
558+
559+ err = mgr .handlePullBusChange (ctx , cr , busConfig , client )
560+ if err != nil {
561+ scopedLog .Error (err , "Failed to update conf file for Bus/Pipeline config change after pod creation" )
562+ return result , err
563+ }
564+
565+ cr .Status .BusConfiguration = busConfig .Spec
559566 }
560567 }
561568
562- cr .Status .BusConfiguration = busConfig .Spec
563-
564569 //update MC
565570 //Retrieve monitoring console ref from CM Spec
566571 cmMonitoringConsoleConfigRef , err := RetrieveCMSpec (ctx , client , cr )
@@ -1126,7 +1131,7 @@ func getIndexerStatefulSet(ctx context.Context, client splcommon.ControllerClien
11261131}
11271132
11281133// validateIndexerClusterSpec checks validity and makes default updates to a IndexerClusterSpec, and returns error if something is wrong.
1129- func validateIndexerClusterSpec (ctx context.Context , c splcommon.ControllerClient , cr * enterpriseApi.IndexerCluster , busConfig * enterpriseApi. BusConfiguration ) error {
1134+ func validateIndexerClusterSpec (ctx context.Context , c splcommon.ControllerClient , cr * enterpriseApi.IndexerCluster ) error {
11301135 // We cannot have 0 replicas in IndexerCluster spec, since this refers to number of indexers in an indexer cluster
11311136 if cr .Spec .Replicas == 0 {
11321137 cr .Spec .Replicas = 1
@@ -1231,7 +1236,7 @@ func (mgr *indexerClusterPodManager) handlePullBusChange(ctx context.Context, ne
12311236 // Only update config for pods that exist
12321237 readyReplicas := newCR .Status .ReadyReplicas
12331238
1234- // List all pods for this IngestorCluster StatefulSet
1239+ // List all pods for this IndexerCluster StatefulSet
12351240 var updateErr error
12361241 for n := 0 ; n < int (readyReplicas ); n ++ {
12371242 memberName := GetSplunkStatefulsetPodName (SplunkIndexer , newCR .GetName (), int32 (n ))
@@ -1245,10 +1250,10 @@ func (mgr *indexerClusterPodManager) handlePullBusChange(ctx context.Context, ne
12451250 afterDelete := false
12461251 if (busConfig .Spec .SQS .QueueName != "" && newCR .Status .BusConfiguration .SQS .QueueName != "" && busConfig .Spec .SQS .QueueName != newCR .Status .BusConfiguration .SQS .QueueName ) ||
12471252 (busConfig .Spec .Type != "" && newCR .Status .BusConfiguration .Type != "" && busConfig .Spec .Type != newCR .Status .BusConfiguration .Type ) {
1248- if err := splunkClient .DeleteConfFileProperty ("outputs" , fmt .Sprintf ("remote_queue:%s" , busConfig . Spec .SQS .QueueName )); err != nil {
1253+ if err := splunkClient .DeleteConfFileProperty ("outputs" , fmt .Sprintf ("remote_queue:%s" , newCR . Status . BusConfiguration .SQS .QueueName )); err != nil {
12491254 updateErr = err
12501255 }
1251- if err := splunkClient .DeleteConfFileProperty ("inputs" , fmt .Sprintf ("remote_queue:%s" , busConfig . Spec .SQS .QueueName )); err != nil {
1256+ if err := splunkClient .DeleteConfFileProperty ("inputs" , fmt .Sprintf ("remote_queue:%s" , newCR . Status . BusConfiguration .SQS .QueueName )); err != nil {
12521257 updateErr = err
12531258 }
12541259 afterDelete = true
0 commit comments