Skip to content

Commit 143dbe0

Browse files
committed
CSPL-4360 Add Splunk restart
1 parent f992c40 commit 143dbe0

File tree

6 files changed

+77
-20
lines changed

6 files changed

+77
-20
lines changed

docs/IndexIngestionSeparation.md

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
---
2+
title: Index and Ingestion Separation
3+
parent: Deploy & Configure
4+
nav_order: 6
5+
---
6+
17
# Background
28

39
Separation between ingestion and indexing services within Splunk Operator for Kubernetes enables the operator to independently manage the ingestion service while maintaining seamless integration with the indexing service.
@@ -10,7 +16,7 @@ This separation enables:
1016
# Important Note
1117

1218
> [!WARNING]
13-
> **As of now, only brand new deployments are supported for Index and Ingestion Separation. No migration path is implemented, described or tested for existing deployments to move from a standard model to Index & Ingestion separation model.**
19+
> **For customers deploying SmartBus on CMP, the Splunk Operator for Kubernetes (SOK) manages the configuration and lifecycle of the ingestor tier. The following SOK guide provides implementation details for setting up ingestion separation and integrating with existing indexers. This reference is primarily intended for CMP users leveraging SOK-managed ingestors.**
1420
1521
# Document Variables
1622

@@ -38,7 +44,7 @@ SQS message bus inputs can be found in the table below.
3844
| endpoint | string | [Optional, if not provided formed based on region] AWS SQS Service endpoint
3945
| dlq | string | [Required] Name of the dead letter queue |
4046

41-
Change of any of the bus inputs triggers the restart of Splunk so that appropriate .conf files are correctly refreshed and consumed.
47+
**First provisioning or update of any of the bus inputs requires Ingestor Cluster and Indexer Cluster Splunkd restart, but this restart is implemented automatically and done by SOK.**
4248

4349
## Example
4450
```
@@ -425,6 +431,14 @@ In the following example, the dashboard presents ingestion and indexing data in
425431

426432
- [kube-prometheus-stack](https://github.com/prometheus-community/helm-charts/tree/main/charts/kube-prometheus-stack)
427433

434+
# App Installation for Ingestor Cluster Instances
435+
436+
Application installation is supported for Ingestor Cluster instances. However, as of now, applications are installed using local scope and if any application requires Splunk restart, there is no automated way to detect it and trigger automatically via Splunk Operator.
437+
438+
Therefore, to be able to enforce Splunk restart for each of the Ingestor Cluster pods, it is recommended to add/update IngestorCluster CR annotations/labels and apply the new configuration which will trigger the rolling restart of Splunk pods for Ingestor Cluster.
439+
440+
We are under the investigation on how to make it fully automated. What is more, ideally, update of annotations and labels should not trigger pod restart at all and we are investigating on how to fix this behaviour eventually.
441+
428442
# Example
429443

430444
1. Install CRDs and Splunk Operator for Kubernetes.

pkg/splunk/enterprise/indexercluster.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,15 @@ func ApplyIndexerClusterManager(ctx context.Context, client splcommon.Controller
305305
}
306306

307307
cr.Status.Bus = &bus.Spec
308+
309+
for i := int32(0); i < cr.Spec.Replicas; i++ {
310+
idxcClient := mgr.getClient(ctx, i)
311+
err = idxcClient.RestartSplunk()
312+
if err != nil {
313+
return result, err
314+
}
315+
scopedLog.Info("Restarted splunk", "indexer", i)
316+
}
308317
}
309318
}
310319

@@ -627,6 +636,15 @@ func ApplyIndexerCluster(ctx context.Context, client splcommon.ControllerClient,
627636
}
628637

629638
cr.Status.Bus = &bus.Spec
639+
640+
for i := int32(0); i < cr.Spec.Replicas; i++ {
641+
idxcClient := mgr.getClient(ctx, i)
642+
err = idxcClient.RestartSplunk()
643+
if err != nil {
644+
return result, err
645+
}
646+
scopedLog.Info("Restarted splunk", "indexer", i)
647+
}
630648
}
631649
}
632650

pkg/splunk/enterprise/ingestorcluster.go

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr
260260

261261
// If bus is updated
262262
if !reflect.DeepEqual(cr.Status.Bus, bus.Spec) || !reflect.DeepEqual(cr.Status.LargeMessageStore, lms.Spec) {
263-
mgr := newIngestorClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient)
263+
mgr := newIngestorClusterPodManager(scopedLog, cr, namespaceScopedSecret, splclient.NewSplunkClient, client)
264264
err = mgr.handlePushBusChange(ctx, cr, busCopy, lmsCopy, client)
265265
if err != nil {
266266
eventPublisher.Warning(ctx, "ApplyIngestorCluster", fmt.Sprintf("Failed to update conf file for Bus/Pipeline config change after pod creation: %s", err.Error()))
@@ -269,6 +269,15 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr
269269
}
270270

271271
cr.Status.Bus = &bus.Spec
272+
273+
for i := int32(0); i < cr.Spec.Replicas; i++ {
274+
ingClient := mgr.getClient(ctx, i)
275+
err = ingClient.RestartSplunk()
276+
if err != nil {
277+
return result, err
278+
}
279+
scopedLog.Info("Restarted splunk", "ingestor", i)
280+
}
272281
}
273282

274283
// Upgrade fron automated MC to MC CRD
@@ -311,6 +320,27 @@ func ApplyIngestorCluster(ctx context.Context, client client.Client, cr *enterpr
311320
return result, nil
312321
}
313322

323+
// getClient for ingestorClusterPodManager returns a SplunkClient for the member n
324+
func (mgr *ingestorClusterPodManager) getClient(ctx context.Context, n int32) *splclient.SplunkClient {
325+
reqLogger := log.FromContext(ctx)
326+
scopedLog := reqLogger.WithName("ingestorClusterPodManager.getClient").WithValues("name", mgr.cr.GetName(), "namespace", mgr.cr.GetNamespace())
327+
328+
// Get Pod Name
329+
memberName := GetSplunkStatefulsetPodName(SplunkIngestor, mgr.cr.GetName(), n)
330+
331+
// Get Fully Qualified Domain Name
332+
fqdnName := splcommon.GetServiceFQDN(mgr.cr.GetNamespace(),
333+
fmt.Sprintf("%s.%s", memberName, GetSplunkServiceName(SplunkIngestor, mgr.cr.GetName(), true)))
334+
335+
// Retrieve admin password from Pod
336+
adminPwd, err := splutil.GetSpecificSecretTokenFromPod(ctx, mgr.c, memberName, mgr.cr.GetNamespace(), "password")
337+
if err != nil {
338+
scopedLog.Error(err, "Couldn't retrieve the admin password from pod")
339+
}
340+
341+
return mgr.newSplunkClient(fmt.Sprintf("https://%s:8089", fqdnName), "admin", adminPwd)
342+
}
343+
314344
// validateIngestorClusterSpec checks validity and makes default updates to a IngestorClusterSpec and returns error if something is wrong
315345
func validateIngestorClusterSpec(ctx context.Context, c splcommon.ControllerClient, cr *enterpriseApi.IngestorCluster) error {
316346
// We cannot have 0 replicas in IngestorCluster spec since this refers to number of ingestion pods in an ingestor cluster
@@ -426,19 +456,21 @@ func getChangedBusFieldsForIngestor(bus *enterpriseApi.Bus, lms *enterpriseApi.L
426456
}
427457

428458
type ingestorClusterPodManager struct {
459+
c splcommon.ControllerClient
429460
log logr.Logger
430461
cr *enterpriseApi.IngestorCluster
431462
secrets *corev1.Secret
432463
newSplunkClient func(managementURI, username, password string) *splclient.SplunkClient
433464
}
434465

435466
// newIngestorClusterPodManager function to create pod manager this is added to write unit test case
436-
var newIngestorClusterPodManager = func(log logr.Logger, cr *enterpriseApi.IngestorCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc) ingestorClusterPodManager {
467+
var newIngestorClusterPodManager = func(log logr.Logger, cr *enterpriseApi.IngestorCluster, secret *corev1.Secret, newSplunkClient NewSplunkClientFunc, c splcommon.ControllerClient) ingestorClusterPodManager {
437468
return ingestorClusterPodManager{
438469
log: log,
439470
cr: cr,
440471
secrets: secret,
441472
newSplunkClient: newSplunkClient,
473+
c: c,
442474
}
443475
}
444476

pkg/splunk/enterprise/ingestorcluster_test.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,14 @@ import (
2525
"github.com/go-logr/logr"
2626
enterpriseApi "github.com/splunk/splunk-operator/api/v4"
2727
splclient "github.com/splunk/splunk-operator/pkg/splunk/client"
28+
splcommon "github.com/splunk/splunk-operator/pkg/splunk/common"
2829
spltest "github.com/splunk/splunk-operator/pkg/splunk/test"
2930
splutil "github.com/splunk/splunk-operator/pkg/splunk/util"
3031
"github.com/stretchr/testify/assert"
3132
appsv1 "k8s.io/api/apps/v1"
3233
corev1 "k8s.io/api/core/v1"
3334
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34-
"k8s.io/apimachinery/pkg/runtime"
3535
"k8s.io/apimachinery/pkg/types"
36-
"sigs.k8s.io/controller-runtime/pkg/client/fake"
3736
)
3837

3938
func init() {
@@ -56,11 +55,7 @@ func TestApplyIngestorCluster(t *testing.T) {
5655

5756
ctx := context.TODO()
5857

59-
scheme := runtime.NewScheme()
60-
_ = enterpriseApi.AddToScheme(scheme)
61-
_ = corev1.AddToScheme(scheme)
62-
_ = appsv1.AddToScheme(scheme)
63-
c := fake.NewClientBuilder().WithScheme(scheme).Build()
58+
c := spltest.NewMockClient()
6459

6560
// Object definitions
6661
provider := "sqs_smartbus"
@@ -273,8 +268,9 @@ func TestApplyIngestorCluster(t *testing.T) {
273268
// outputs.conf
274269
origNew := newIngestorClusterPodManager
275270
mockHTTPClient := &spltest.MockHTTPClient{}
276-
newIngestorClusterPodManager = func(l logr.Logger, cr *enterpriseApi.IngestorCluster, secret *corev1.Secret, _ NewSplunkClientFunc) ingestorClusterPodManager {
271+
newIngestorClusterPodManager = func(l logr.Logger, cr *enterpriseApi.IngestorCluster, secret *corev1.Secret, _ NewSplunkClientFunc, c splcommon.ControllerClient) ingestorClusterPodManager {
277272
return ingestorClusterPodManager{
273+
c: c,
278274
log: l, cr: cr, secrets: secret,
279275
newSplunkClient: func(uri, user, pass string) *splclient.SplunkClient {
280276
return &splclient.SplunkClient{ManagementURI: uri, Username: user, Password: pass, Client: mockHTTPClient}

pkg/splunk/enterprise/util_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2624,6 +2624,9 @@ func TestUpdateCRStatus(t *testing.T) {
26242624
WithStatusSubresource(&enterpriseApi.Standalone{}).
26252625
WithStatusSubresource(&enterpriseApi.MonitoringConsole{}).
26262626
WithStatusSubresource(&enterpriseApi.IndexerCluster{}).
2627+
WithStatusSubresource(&enterpriseApi.Bus{}).
2628+
WithStatusSubresource(&enterpriseApi.LargeMessageStore{}).
2629+
WithStatusSubresource(&enterpriseApi.IngestorCluster{}).
26272630
WithStatusSubresource(&enterpriseApi.SearchHeadCluster{})
26282631
c := builder.Build()
26292632
ctx := context.TODO()
@@ -3304,6 +3307,8 @@ func TestGetCurrentImage(t *testing.T) {
33043307
WithStatusSubresource(&enterpriseApi.MonitoringConsole{}).
33053308
WithStatusSubresource(&enterpriseApi.IndexerCluster{}).
33063309
WithStatusSubresource(&enterpriseApi.SearchHeadCluster{}).
3310+
WithStatusSubresource(&enterpriseApi.Bus{}).
3311+
WithStatusSubresource(&enterpriseApi.LargeMessageStore{}).
33073312
WithStatusSubresource(&enterpriseApi.IngestorCluster{})
33083313
client := builder.Build()
33093314
client.Create(ctx, &current)

test/index_and_ingestion_separation/index_and_ingestion_separation_test.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -433,14 +433,6 @@ var _ = Describe("indingsep test", func() {
433433
err = deployment.UpdateCR(ctx, bus)
434434
Expect(err).To(Succeed(), "Unable to deploy Bus with updated CR")
435435

436-
// Ensure that Ingestor Cluster has not been restarted
437-
testcaseEnvInst.Log.Info("Ensure that Ingestor Cluster has not been restarted")
438-
testenv.IngestorReady(ctx, deployment, testcaseEnvInst)
439-
440-
// Ensure that Indexer Cluster has not been restarted
441-
testcaseEnvInst.Log.Info("Ensure that Indexer Cluster has not been restarted")
442-
testenv.SingleSiteIndexersReady(ctx, deployment, testcaseEnvInst)
443-
444436
// Get instance of current Ingestor Cluster CR with latest config
445437
testcaseEnvInst.Log.Info("Get instance of current Ingestor Cluster CR with latest config")
446438
ingest := &enterpriseApi.IngestorCluster{}

0 commit comments

Comments
 (0)