Skip to content

Commit 3125779

Browse files
committed
CSPL-4022 Fix update behaviour
1 parent 10a4fc0 commit 3125779

File tree

6 files changed

+144
-94
lines changed

6 files changed

+144
-94
lines changed

internal/controller/indexercluster_controller.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
corev1 "k8s.io/api/core/v1"
3232
k8serrors "k8s.io/apimachinery/pkg/api/errors"
3333
"k8s.io/apimachinery/pkg/runtime"
34+
"k8s.io/apimachinery/pkg/types"
3435
ctrl "sigs.k8s.io/controller-runtime"
3536
"sigs.k8s.io/controller-runtime/pkg/client"
3637
"sigs.k8s.io/controller-runtime/pkg/controller"
@@ -171,6 +172,34 @@ func (r *IndexerClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
171172
mgr.GetRESTMapper(),
172173
&enterpriseApi.IndexerCluster{},
173174
)).
175+
Watches(&enterpriseApi.BusConfiguration{},
176+
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request {
177+
bc, ok := obj.(*enterpriseApi.BusConfiguration)
178+
if !ok {
179+
return nil
180+
}
181+
var list enterpriseApi.IndexerClusterList
182+
if err := r.Client.List(ctx, &list); err != nil {
183+
return nil
184+
}
185+
var reqs []reconcile.Request
186+
for _, ic := range list.Items {
187+
ns := ic.Spec.BusConfigurationRef.Namespace
188+
if ns == "" {
189+
ns = ic.Namespace
190+
}
191+
if ic.Spec.BusConfigurationRef.Name == bc.Name && ns == bc.Namespace {
192+
reqs = append(reqs, reconcile.Request{
193+
NamespacedName: types.NamespacedName{
194+
Name: ic.Name,
195+
Namespace: ic.Namespace,
196+
},
197+
})
198+
}
199+
}
200+
return reqs
201+
}),
202+
).
174203
WithOptions(controller.Options{
175204
MaxConcurrentReconciles: enterpriseApi.TotalWorker,
176205
}).

internal/controller/ingestorcluster_controller.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
corev1 "k8s.io/api/core/v1"
2525
k8serrors "k8s.io/apimachinery/pkg/api/errors"
2626
"k8s.io/apimachinery/pkg/runtime"
27+
"k8s.io/apimachinery/pkg/types"
2728
ctrl "sigs.k8s.io/controller-runtime"
2829
"sigs.k8s.io/controller-runtime/pkg/client"
2930
"sigs.k8s.io/controller-runtime/pkg/controller"
@@ -141,11 +142,33 @@ func (r *IngestorClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
141142
&enterpriseApi.IngestorCluster{},
142143
)).
143144
Watches(&enterpriseApi.BusConfiguration{},
144-
handler.EnqueueRequestForOwner(
145-
mgr.GetScheme(),
146-
mgr.GetRESTMapper(),
147-
&enterpriseApi.IngestorCluster{},
148-
)).
145+
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request {
146+
bc, ok := obj.(*enterpriseApi.BusConfiguration)
147+
if !ok {
148+
return nil
149+
}
150+
var list enterpriseApi.IngestorClusterList
151+
if err := r.Client.List(ctx, &list); err != nil {
152+
return nil
153+
}
154+
var reqs []reconcile.Request
155+
for _, ic := range list.Items {
156+
ns := ic.Spec.BusConfigurationRef.Namespace
157+
if ns == "" {
158+
ns = ic.Namespace
159+
}
160+
if ic.Spec.BusConfigurationRef.Name == bc.Name && ns == bc.Namespace {
161+
reqs = append(reqs, reconcile.Request{
162+
NamespacedName: types.NamespacedName{
163+
Name: ic.Name,
164+
Namespace: ic.Namespace,
165+
},
166+
})
167+
}
168+
}
169+
return reqs
170+
}),
171+
).
149172
WithOptions(controller.Options{
150173
MaxConcurrentReconciles: enterpriseApi.TotalWorker,
151174
}).

pkg/splunk/enterprise/indexercluster.go

Lines changed: 50 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"context"
2020
"errors"
2121
"fmt"
22+
"reflect"
2223
"regexp"
2324
"sort"
2425
"strconv"
@@ -66,24 +67,8 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller
6667
// Update the CR Status
6768
defer updateCRStatus(ctx, client, cr, &err)
6869

69-
// Bus config
70-
busConfig := enterpriseApi.BusConfiguration{}
71-
if cr.Spec.BusConfigurationRef.Name != "" {
72-
ns := cr.GetNamespace()
73-
if cr.Spec.BusConfigurationRef.Namespace != "" {
74-
ns = cr.Spec.BusConfigurationRef.Namespace
75-
}
76-
err = client.Get(context.Background(), types.NamespacedName{
77-
Name: cr.Spec.BusConfigurationRef.Name,
78-
Namespace: ns,
79-
}, &busConfig)
80-
if err != nil {
81-
return result, err
82-
}
83-
}
84-
8570
// validate and updates defaults for CR
86-
err = validateIndexerClusterSpec(ctx, client, cr, &busConfig)
71+
err = validateIndexerClusterSpec(ctx, client, cr)
8772
if err != nil {
8873
eventPublisher.Warning(ctx, "validateIndexerClusterSpec", fmt.Sprintf("validate indexercluster spec failed %s", err.Error()))
8974
scopedLog.Error(err, "Failed to validate indexercluster spec")
@@ -92,6 +77,7 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller
9277

9378
// updates status after function completes
9479
cr.Status.ClusterManagerPhase = enterpriseApi.PhaseError
80+
replicasBefore := cr.Status.Replicas
9581
cr.Status.Replicas = cr.Spec.Replicas
9682
cr.Status.Selector = fmt.Sprintf("app.kubernetes.io/instance=splunk-%s-indexer", cr.GetName())
9783
if cr.Status.Peers == nil {
@@ -257,14 +243,36 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller
257243

258244
// no need to requeue if everything is ready
259245
if cr.Status.Phase == enterpriseApi.PhaseReady {
246+
// Bus config
247+
busConfig := enterpriseApi.BusConfiguration{}
260248
if cr.Spec.BusConfigurationRef.Name != "" {
261-
err = mgr.handlePullBusChange(ctx, cr, busConfig, client)
249+
ns := cr.GetNamespace()
250+
if cr.Spec.BusConfigurationRef.Namespace != "" {
251+
ns = cr.Spec.BusConfigurationRef.Namespace
252+
}
253+
err = client.Get(context.Background(), types.NamespacedName{
254+
Name: cr.Spec.BusConfigurationRef.Name,
255+
Namespace: ns,
256+
}, &busConfig)
262257
if err != nil {
263-
scopedLog.Error(err, "Failed to update conf file for Bus/Pipeline config change after pod creation")
264258
return result, err
265259
}
266260
}
267-
cr.Status.BusConfiguration = busConfig.Spec
261+
262+
// If bus config is updated
263+
if cr.Spec.BusConfigurationRef.Name != "" {
264+
if !reflect.DeepEqual(cr.Status.BusConfiguration, busConfig.Spec) || replicasBefore < cr.Spec.Replicas {
265+
mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient)
266+
267+
err = mgr.handlePullBusChange(ctx, cr, busConfig, client)
268+
if err != nil {
269+
scopedLog.Error(err, "Failed to update conf file for Bus/Pipeline config change after pod creation")
270+
return result, err
271+
}
272+
273+
cr.Status.BusConfiguration = busConfig.Spec
274+
}
275+
}
268276

269277
//update MC
270278
//Retrieve monitoring console ref from CM Spec
@@ -345,31 +353,16 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient,
345353
eventPublisher, _ := newK8EventPublisher(client, cr)
346354
cr.Kind = "IndexerCluster"
347355

348-
// Bus config
349-
busConfig := enterpriseApi.BusConfiguration{}
350-
if cr.Spec.BusConfigurationRef.Name != "" {
351-
ns := cr.GetNamespace()
352-
if cr.Spec.BusConfigurationRef.Namespace != "" {
353-
ns = cr.Spec.BusConfigurationRef.Namespace
354-
}
355-
err := client.Get(context.Background(), types.NamespacedName{
356-
Name: cr.Spec.BusConfigurationRef.Name,
357-
Namespace: ns,
358-
}, &busConfig)
359-
if err != nil {
360-
return result, err
361-
}
362-
}
363-
364356
// validate and updates defaults for CR
365-
err := validateIndexerClusterSpec(ctx, client, cr, &busConfig)
357+
err := validateIndexerClusterSpec(ctx, client, cr)
366358
if err != nil {
367359
return result, err
368360
}
369361

370362
// updates status after function completes
371363
cr.Status.Phase = enterpriseApi.PhaseError
372364
cr.Status.ClusterMasterPhase = enterpriseApi.PhaseError
365+
replicasBefore := cr.Status.Replicas
373366
cr.Status.Replicas = cr.Spec.Replicas
374367
cr.Status.Selector = fmt.Sprintf("app.kubernetes.io/instance=splunk-%s-indexer", cr.GetName())
375368
if cr.Status.Peers == nil {
@@ -538,29 +531,37 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient,
538531

539532
// no need to requeue if everything is ready
540533
if cr.Status.Phase == enterpriseApi.PhaseReady {
534+
// Bus config
535+
busConfig := enterpriseApi.BusConfiguration{}
541536
if cr.Spec.BusConfigurationRef.Name != "" {
542-
busConfig := enterpriseApi.BusConfiguration{}
543537
ns := cr.GetNamespace()
544538
if cr.Spec.BusConfigurationRef.Namespace != "" {
545539
ns = cr.Spec.BusConfigurationRef.Namespace
546540
}
547-
err := client.Get(context.Background(), types.NamespacedName{
541+
err = client.Get(context.Background(), types.NamespacedName{
548542
Name: cr.Spec.BusConfigurationRef.Name,
549543
Namespace: ns,
550544
}, &busConfig)
551545
if err != nil {
552546
return result, err
553547
}
548+
}
554549

555-
err = mgr.handlePullBusChange(ctx, cr, busConfig, client)
556-
if err != nil {
557-
scopedLog.Error(err, "Failed to update conf file for Bus/Pipeline config change after pod creation")
558-
return result, err
550+
// If bus config is updated
551+
if cr.Spec.BusConfigurationRef.Name != "" {
552+
if !reflect.DeepEqual(cr.Status.BusConfiguration, busConfig.Spec) || replicasBefore < cr.Spec.Replicas {
553+
mgr := newIndexerClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient)
554+
555+
err = mgr.handlePullBusChange(ctx, cr, busConfig, client)
556+
if err != nil {
557+
scopedLog.Error(err, "Failed to update conf file for Bus/Pipeline config change after pod creation")
558+
return result, err
559+
}
560+
561+
cr.Status.BusConfiguration = busConfig.Spec
559562
}
560563
}
561564

562-
cr.Status.BusConfiguration = busConfig.Spec
563-
564565
//update MC
565566
//Retrieve monitoring console ref from CM Spec
566567
cmMonitoringConsoleConfigRef, err := RetrieveCMSpec(ctx, client, cr)
@@ -1126,7 +1127,7 @@ func getIndexerStatefulSet(ctx context.Context, client splcommon.ControllerClien
11261127
}
11271128

11281129
// validateIndexerClusterSpec checks validity and makes default updates to a IndexerClusterSpec, and returns error if something is wrong.
1129-
func validateIndexerClusterSpec(ctx context.Context, c splcommon.ControllerClient, cr *enterpriseApi.IndexerCluster, busConfig *enterpriseApi.BusConfiguration) error {
1130+
func validateIndexerClusterSpec(ctx context.Context, c splcommon.ControllerClient, cr *enterpriseApi.IndexerCluster) error {
11301131
// We cannot have 0 replicas in IndexerCluster spec, since this refers to number of indexers in an indexer cluster
11311132
if cr.Spec.Replicas == 0 {
11321133
cr.Spec.Replicas = 1
@@ -1231,7 +1232,7 @@ func (mgr *indexerClusterPodManager) handlePullBusChange(ctx context.Context, ne
12311232
// Only update config for pods that exist
12321233
readyReplicas := newCR.Status.ReadyReplicas
12331234

1234-
// List all pods for this IngestorCluster StatefulSet
1235+
// List all pods for this IndexerCluster StatefulSet
12351236
var updateErr error
12361237
for n := 0; n < int(readyReplicas); n++ {
12371238
memberName := GetSplunkStatefulsetPodName(SplunkIndexer, newCR.GetName(), int32(n))
@@ -1245,10 +1246,10 @@ func (mgr *indexerClusterPodManager) handlePullBusChange(ctx context.Context, ne
12451246
afterDelete := false
12461247
if (busConfig.Spec.SQS.QueueName != "" && newCR.Status.BusConfiguration.SQS.QueueName != "" && busConfig.Spec.SQS.QueueName != newCR.Status.BusConfiguration.SQS.QueueName) ||
12471248
(busConfig.Spec.Type != "" && newCR.Status.BusConfiguration.Type != "" && busConfig.Spec.Type != newCR.Status.BusConfiguration.Type) {
1248-
if err := splunkClient.DeleteConfFileProperty("outputs", fmt.Sprintf("remote_queue:%s", busConfig.Spec.SQS.QueueName)); err != nil {
1249+
if err := splunkClient.DeleteConfFileProperty("outputs", fmt.Sprintf("remote_queue:%s", newCR.Status.BusConfiguration.SQS.QueueName)); err != nil {
12491250
updateErr = err
12501251
}
1251-
if err := splunkClient.DeleteConfFileProperty("inputs", fmt.Sprintf("remote_queue:%s", busConfig.Spec.SQS.QueueName)); err != nil {
1252+
if err := splunkClient.DeleteConfFileProperty("inputs", fmt.Sprintf("remote_queue:%s", newCR.Status.BusConfiguration.SQS.QueueName)); err != nil {
12521253
updateErr = err
12531254
}
12541255
afterDelete = true

pkg/splunk/enterprise/indexercluster_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1388,7 +1388,7 @@ func TestGetIndexerStatefulSet(t *testing.T) {
13881388
cr.Spec.ClusterManagerRef.Name = "manager1"
13891389
test := func(want string) {
13901390
f := func() (interface{}, error) {
1391-
if err := validateIndexerClusterSpec(ctx, c, &cr, &busConfig); err != nil {
1391+
if err := validateIndexerClusterSpec(ctx, c, &cr); err != nil {
13921392
t.Errorf("validateIndexerClusterSpec() returned error: %v", err)
13931393
}
13941394
return getIndexerStatefulSet(ctx, c, &cr)
@@ -1434,7 +1434,7 @@ func TestGetIndexerStatefulSet(t *testing.T) {
14341434
test(`{"kind":"StatefulSet","apiVersion":"apps/v1","metadata":{"name":"splunk-stack1-indexer","namespace":"test","creationTimestamp":null,"labels":{"app.kubernetes.io/component":"indexer","app.kubernetes.io/instance":"splunk-stack1-indexer","app.kubernetes.io/managed-by":"splunk-operator","app.kubernetes.io/name":"indexer","app.kubernetes.io/part-of":"splunk-manager1-indexer","app.kubernetes.io/test-extra-label":"test-extra-label-value"},"ownerReferences":[{"apiVersion":"","kind":"","name":"stack1","uid":"","controller":true}]},"spec":{"replicas":1,"selector":{"matchLabels":{"app.kubernetes.io/component":"indexer","app.kubernetes.io/instance":"splunk-stack1-indexer","app.kubernetes.io/managed-by":"splunk-operator","app.kubernetes.io/name":"indexer","app.kubernetes.io/part-of":"splunk-manager1-indexer"}},"template":{"metadata":{"creationTimestamp":null,"labels":{"app.kubernetes.io/component":"indexer","app.kubernetes.io/instance":"splunk-stack1-indexer","app.kubernetes.io/managed-by":"splunk-operator","app.kubernetes.io/name":"indexer","app.kubernetes.io/part-of":"splunk-manager1-indexer","app.kubernetes.io/test-extra-label":"test-extra-label-value"},"annotations":{"traffic.sidecar.istio.io/excludeOutboundPorts":"8089,8191,9997","traffic.sidecar.istio.io/includeInboundPorts":"8000,8088"}},"spec":{"volumes":[{"name":"splunk-test-probe-configmap","configMap":{"name":"splunk-test-probe-configmap","defaultMode":365}},{"name":"mnt-splunk-secrets","secret":{"secretName":"splunk-stack1-indexer-secret-v1","defaultMode":420}}],"containers":[{"name":"splunk","image":"splunk/splunk","ports":[{"name":"http-splunkweb","containerPort":8000,"protocol":"TCP"},{"name":"http-hec","containerPort":8088,"protocol":"TCP"},{"name":"https-splunkd","containerPort":8089,"protocol":"TCP"},{"name":"tcp-s2s","containerPort":9997,"protocol":"TCP"},{"name":"user-defined","containerPort":32000,"protocol":"UDP"}],"env":[{"name":"TEST_ENV_VAR","value":"test_value"},{"name":"SPLUNK_CLUSTER_MASTER_URL","value":"splunk-manager1-cluster-manager-service"},{"name":"SPLUNK_HOME","value":"/opt/splunk"},{"name":"SPLUNK_START_ARGS","value":"--accept-license"},{"name":"SPLUNK_DEFAULTS_URL","value":"/mnt/splunk-secrets/default.yml"},{"name":"SPLUNK_HOME_OWNERSHIP_ENFORCEMENT","value":"false"},{"name":"SPLUNK_ROLE","value":"splunk_indexer"},{"name":"SPLUNK_DECLARATIVE_ADMIN_PASSWORD","value":"true"},{"name":"SPLUNK_OPERATOR_K8_LIVENESS_DRIVER_FILE_PATH","value":"/tmp/splunk_operator_k8s/probes/k8_liveness_driver.sh"},{"name":"SPLUNK_GENERAL_TERMS","value":"--accept-sgt-current-at-splunk-com"},{"name":"SPLUNK_SKIP_CLUSTER_BUNDLE_PUSH","value":"true"}],"resources":{"limits":{"cpu":"4","memory":"8Gi"},"requests":{"cpu":"100m","memory":"512Mi"}},"volumeMounts":[{"name":"pvc-etc","mountPath":"/opt/splunk/etc"},{"name":"pvc-var","mountPath":"/opt/splunk/var"},{"name":"splunk-test-probe-configmap","mountPath":"/mnt/probes"},{"name":"mnt-splunk-secrets","mountPath":"/mnt/splunk-secrets"}],"livenessProbe":{"exec":{"command":["/mnt/probes/livenessProbe.sh"]},"initialDelaySeconds":30,"timeoutSeconds":30,"periodSeconds":30,"failureThreshold":3},"readinessProbe":{"exec":{"command":["/mnt/probes/readinessProbe.sh"]},"initialDelaySeconds":10,"timeoutSeconds":5,"periodSeconds":5,"failureThreshold":3},"startupProbe":{"exec":{"command":["/mnt/probes/startupProbe.sh"]},"initialDelaySeconds":40,"timeoutSeconds":30,"periodSeconds":30,"failureThreshold":12},"imagePullPolicy":"IfNotPresent","securityContext":{"capabilities":{"add":["NET_BIND_SERVICE"],"drop":["ALL"]},"privileged":false,"runAsUser":41812,"runAsNonRoot":true,"allowPrivilegeEscalation":false,"seccompProfile":{"type":"RuntimeDefault"}}}],"serviceAccountName":"defaults","securityContext":{"runAsUser":41812,"runAsNonRoot":true,"fsGroup":41812,"fsGroupChangePolicy":"OnRootMismatch"},"affinity":{"podAntiAffinity":{"preferredDuringSchedulingIgnoredDuringExecution":[{"weight":100,"podAffinityTerm":{"labelSelector":{"matchExpressions":[{"key":"app.kubernetes.io/instance","operator":"In","values":["splunk-stack1-indexer"]}]},"topologyKey":"kubernetes.io/hostname"}}]}},"schedulerName":"default-scheduler"}},"volumeClaimTemplates":[{"metadata":{"name":"pvc-etc","namespace":"test","creationTimestamp":null,"labels":{"app.kubernetes.io/component":"indexer","app.kubernetes.io/instance":"splunk-stack1-indexer","app.kubernetes.io/managed-by":"splunk-operator","app.kubernetes.io/name":"indexer","app.kubernetes.io/part-of":"splunk-manager1-indexer","app.kubernetes.io/test-extra-label":"test-extra-label-value"}},"spec":{"accessModes":["ReadWriteOnce"],"resources":{"requests":{"storage":"10Gi"}}},"status":{}},{"metadata":{"name":"pvc-var","namespace":"test","creationTimestamp":null,"labels":{"app.kubernetes.io/component":"indexer","app.kubernetes.io/instance":"splunk-stack1-indexer","app.kubernetes.io/managed-by":"splunk-operator","app.kubernetes.io/name":"indexer","app.kubernetes.io/part-of":"splunk-manager1-indexer","app.kubernetes.io/test-extra-label":"test-extra-label-value"}},"spec":{"accessModes":["ReadWriteOnce"],"resources":{"requests":{"storage":"100Gi"}}},"status":{}}],"serviceName":"splunk-stack1-indexer-headless","podManagementPolicy":"Parallel","updateStrategy":{"type":"OnDelete"}},"status":{"replicas":0,"availableReplicas":0}}`)
14351435

14361436
cr.Spec.ClusterManagerRef.Namespace = "other"
1437-
if err := validateIndexerClusterSpec(ctx, c, &cr, &busConfig); err == nil {
1437+
if err := validateIndexerClusterSpec(ctx, c, &cr); err == nil {
14381438
t.Errorf("validateIndexerClusterSpec() error expected on multisite IndexerCluster referencing a cluster manager located in a different namespace")
14391439
}
14401440
}

0 commit comments

Comments
 (0)