Skip to content

Commit e6689c7

Browse files
committed
test
1 parent c66f1a9 commit e6689c7

File tree

3 files changed

+73
-45
lines changed

3 files changed

+73
-45
lines changed

internal/controller/ingestorcluster_controller.go

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
corev1 "k8s.io/api/core/v1"
2525
k8serrors "k8s.io/apimachinery/pkg/api/errors"
2626
"k8s.io/apimachinery/pkg/runtime"
27+
"k8s.io/apimachinery/pkg/types"
2728
ctrl "sigs.k8s.io/controller-runtime"
2829
"sigs.k8s.io/controller-runtime/pkg/client"
2930
"sigs.k8s.io/controller-runtime/pkg/controller"
@@ -141,11 +142,33 @@ func (r *IngestorClusterReconciler) SetupWithManager(mgr ctrl.Manager) error {
141142
&enterpriseApi.IngestorCluster{},
142143
)).
143144
Watches(&enterpriseApi.BusConfiguration{},
144-
handler.EnqueueRequestForOwner(
145-
mgr.GetScheme(),
146-
mgr.GetRESTMapper(),
147-
&enterpriseApi.IngestorCluster{},
148-
)).
145+
handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, obj client.Object) []reconcile.Request {
146+
bc, ok := obj.(*enterpriseApi.BusConfiguration)
147+
if !ok {
148+
return nil
149+
}
150+
var list enterpriseApi.IngestorClusterList
151+
if err := r.Client.List(ctx, &list); err != nil {
152+
return nil
153+
}
154+
var reqs []reconcile.Request
155+
for _, ic := range list.Items {
156+
ns := ic.Spec.BusConfigurationRef.Namespace
157+
if ns == "" {
158+
ns = ic.Namespace
159+
}
160+
if ic.Spec.BusConfigurationRef.Name == bc.Name && ns == bc.Namespace {
161+
reqs = append(reqs, reconcile.Request{
162+
NamespacedName: types.NamespacedName{
163+
Name: ic.Name,
164+
Namespace: ic.Namespace,
165+
},
166+
})
167+
}
168+
}
169+
return reqs
170+
}),
171+
).
149172
WithOptions(controller.Options{
150173
MaxConcurrentReconciles: enterpriseApi.TotalWorker,
151174
}).

pkg/splunk/enterprise/ingestorcluster.go

Lines changed: 44 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -58,50 +58,14 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr
5858

5959
cr.Kind = "IngestorCluster"
6060

61-
// Bus config
62-
busConfig := enterpriseApi.BusConfiguration{}
63-
if cr.Spec.BusConfigurationRef.Name != "" {
64-
ns := cr.GetNamespace()
65-
if cr.Spec.BusConfigurationRef.Namespace != "" {
66-
ns = cr.Spec.BusConfigurationRef.Namespace
67-
}
68-
err = client.Get(context.Background(), types.NamespacedName{
69-
Name: cr.Spec.BusConfigurationRef.Name,
70-
Namespace: ns,
71-
}, &busConfig)
72-
if err != nil {
73-
return result, err
74-
}
75-
}
76-
7761
// Validate and updates defaults for CR
78-
err = validateIngestorClusterSpec(ctx, client, cr, &busConfig)
62+
err = validateIngestorClusterSpec(ctx, client, cr)
7963
if err != nil {
8064
eventPublisher.Warning(ctx, "validateIngestorClusterSpec", fmt.Sprintf("validate ingestor cluster spec failed %s", err.Error()))
8165
scopedLog.Error(err, "Failed to validate ingestor cluster spec")
8266
return result, err
8367
}
8468

85-
// If bus config is updated
86-
if !reflect.DeepEqual(cr.Status.BusConfiguration, busConfig.Spec) && cr.Status.Phase == enterpriseApi.PhaseReady {
87-
namespaceScopedSecret, err := ApplySplunkConfig(ctx, client, cr, cr.Spec.CommonSplunkSpec, SplunkIngestor)
88-
if err != nil {
89-
scopedLog.Error(err, "create or update general config failed", "error", err.Error())
90-
eventPublisher.Warning(ctx, "ApplySplunkConfig", fmt.Sprintf("create or update general config failed with error %s", err.Error()))
91-
return result, err
92-
}
93-
94-
mgr := newIngestorClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient)
95-
96-
err = mgr.handlePushBusChange(ctx, cr, busConfig, client)
97-
if err != nil {
98-
scopedLog.Error(err, "Failed to update conf file for Bus/Pipeline config change after pod creation")
99-
return result, err
100-
}
101-
102-
cr.Status.BusConfiguration = busConfig.Spec
103-
}
104-
10569
// Initialize phase
10670
cr.Status.Phase = enterpriseApi.PhaseError
10771

@@ -131,7 +95,7 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr
13195
cr.Status.Selector = fmt.Sprintf("app.kubernetes.io/instance=splunk-%s-ingestor", cr.GetName())
13296

13397
// Create or update general config resources
134-
_, err = ApplySplunkConfig(ctx, client, cr, cr.Spec.CommonSplunkSpec, SplunkIngestor)
98+
namespaceScopedSecret, err := ApplySplunkConfig(ctx, client, cr, cr.Spec.CommonSplunkSpec, SplunkIngestor)
13599
if err != nil {
136100
scopedLog.Error(err, "create or update general config failed", "error", err.Error())
137101
eventPublisher.Warning(ctx, "ApplySplunkConfig", fmt.Sprintf("create or update general config failed with error %s", err.Error()))
@@ -241,6 +205,47 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr
241205
}
242206
cr.Status.Phase = phase
243207

208+
// Bus config
209+
busConfig := enterpriseApi.BusConfiguration{}
210+
if cr.Spec.BusConfigurationRef.Name != "" {
211+
ns := cr.GetNamespace()
212+
if cr.Spec.BusConfigurationRef.Namespace != "" {
213+
ns = cr.Spec.BusConfigurationRef.Namespace
214+
}
215+
err = client.Get(ctx, types.NamespacedName{
216+
Name: cr.Spec.BusConfigurationRef.Name,
217+
Namespace: ns,
218+
}, &busConfig)
219+
if err != nil {
220+
return result, err
221+
}
222+
}
223+
224+
scopedLog.Info("Reconciling IngestorCluster", "busConfig.Spec", busConfig.Spec)
225+
scopedLog.Info("Reconciling IngestorCluster", "cr.Status.BusConfiguration", cr.Status.BusConfiguration)
226+
227+
// If bus config is updated
228+
if !reflect.DeepEqual(cr.Status.BusConfiguration, busConfig.Spec) && cr.Status.Phase == enterpriseApi.PhaseReady {
229+
scopedLog.Info("Reconciling IngestorCluster", "!reflect.DeepEqual(cr.Status.BusConfiguration, busConfig.Spec) && cr.Status.Phase == enterpriseApi.PhaseReady", !reflect.DeepEqual(cr.Status.BusConfiguration, busConfig.Spec) && cr.Status.Phase == enterpriseApi.PhaseReady)
230+
231+
// namespaceScopedSecret, err := ApplySplunkConfig(ctx, client, cr, cr.Spec.CommonSplunkSpec, SplunkIngestor)
232+
// if err != nil {
233+
// scopedLog.Error(err, "create or update general config failed", "error", err.Error())
234+
// eventPublisher.Warning(ctx, "ApplySplunkConfig", fmt.Sprintf("create or update general config failed with error %s", err.Error()))
235+
// return result, err
236+
// }
237+
238+
mgr := newIngestorClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient)
239+
240+
err = mgr.handlePushBusChange(ctx, cr, busConfig, client)
241+
if err != nil {
242+
scopedLog.Error(err, "Failed to update conf file for Bus/Pipeline config change after pod creation")
243+
return result, err
244+
}
245+
246+
cr.Status.BusConfiguration = busConfig.Spec
247+
}
248+
244249
// No need to requeue if everything is ready
245250
if cr.Status.Phase == enterpriseApi.PhaseReady {
246251
// Upgrade fron automated MC to MC CRD
@@ -284,7 +289,7 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr
284289
}
285290

286291
// validateIngestorClusterSpec checks validity and makes default updates to a IngestorClusterSpec and returns error if something is wrong
287-
func validateIngestorClusterSpec(ctx context.Context, c splcommon.ControllerClient, cr *enterpriseApi.IngestorCluster, busConfig *enterpriseApi.BusConfiguration) error {
292+
func validateIngestorClusterSpec(ctx context.Context, c splcommon.ControllerClient, cr *enterpriseApi.IngestorCluster) error {
288293
// We cannot have 0 replicas in IngestorCluster spec since this refers to number of ingestion pods in an ingestor cluster
289294
if cr.Spec.Replicas < 3 {
290295
cr.Spec.Replicas = 3

pkg/splunk/enterprise/ingestorcluster_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,7 @@ func TestGetIngestorStatefulSet(t *testing.T) {
357357

358358
test := func(want string) {
359359
f := func() (interface{}, error) {
360-
if err := validateIngestorClusterSpec(ctx, c, &cr, &busConfig); err != nil {
360+
if err := validateIngestorClusterSpec(ctx, c, &cr); err != nil {
361361
t.Errorf("validateIngestorClusterSpec() returned error: %v", err)
362362
}
363363
return getIngestorStatefulSet(ctx, c, &cr)

0 commit comments

Comments
 (0)