Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,35 @@ func FillDefaults(opr *operatorv1.LogStorage) {
}
}

func validateLogStorage(spec *operatorv1.LogStorageSpec) (error, string) {
err, warning := validateReplicasForNodeCount(spec)
if err != nil {
return err, ""
}
if err := validateComponentResources(spec); err != nil {
return err, ""
}
return nil, warning
}

func validateReplicasForNodeCount(spec *operatorv1.LogStorageSpec) (error, string) {
if spec.Nodes == nil || spec.Indices == nil || spec.Indices.Replicas == nil {
return nil, ""
}

replicas := int(*spec.Indices.Replicas)
nodeCount := int(spec.Nodes.Count)
if replicas > 0 && nodeCount <= replicas {
return fmt.Errorf("LogStorage spec.indices.replicas (%d) must be less than spec.nodes.count (%d); replica shards cannot be allocated when there are not enough nodes. For a single-node Elasticsearch cluster, set spec.indices.replicas to 0", replicas, nodeCount), ""
}

if replicas > 0 && nodeCount == replicas+1 {
return nil, fmt.Sprintf("LogStorage spec.nodes.count (%d) is only 1 more than spec.indices.replicas (%d); this may prevent voluntary pod evictions (e.g., node repaving) due to PodDisruptionBudget constraints. If this is expected for your environment, no action is needed. Otherwise, consider setting spec.nodes.count to at least %d", nodeCount, replicas, replicas+2)
}

return nil, ""
}

func validateComponentResources(spec *operatorv1.LogStorageSpec) error {
if spec.ComponentResources == nil {
return fmt.Errorf("LogStorage spec.ComponentResources is nil %+v", spec)
Expand Down Expand Up @@ -232,13 +261,16 @@ func (r *LogStorageInitializer) Reconcile(ctx context.Context, request reconcile

// Default and validate the object.
FillDefaults(ls)
err = validateComponentResources(&ls.Spec)
err, warning := validateLogStorage(&ls.Spec)
if err != nil {
// Invalid - mark it as such and return.
r.setConditionDegraded(ctx, ls, reqLogger)
r.status.SetDegraded(operatorv1.ResourceValidationError, "An error occurred while validating LogStorage", err, reqLogger)
return reconcile.Result{}, err
}
if warning != "" {
reqLogger.Info(warning)
}

pullSecrets, err := utils.GetNetworkingPullSecrets(install, r.client)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,46 @@ var _ = Describe("LogStorage Initializing controller", func() {
Expect(ls.Status.State).Should(Equal(operatorv1.TigeraStatusReady))
})

It("sets a degraded status when replicas >= node count", func() {
var replicas int32 = 1
ls := &operatorv1.LogStorage{}
ls.Name = "tigera-secure"
FillDefaults(ls)
ls.Spec.Indices.Replicas = &replicas
ls.Spec.Nodes.Count = 1
Expect(cli.Create(ctx, ls)).ShouldNot(HaveOccurred())

r, err := NewTestInitializer(cli, scheme, mockStatus, operatorv1.ProviderNone, dns.DefaultClusterDomain)
Expect(err).ShouldNot(HaveOccurred())
_, err = r.Reconcile(ctx, reconcile.Request{})
Expect(err).Should(HaveOccurred())
Expect(mockStatus.AssertNumberOfCalls(GinkgoT(), "SetDegraded", 1)).Should(BeTrue())

ls = &operatorv1.LogStorage{}
Expect(cli.Get(ctx, client.ObjectKey{Name: "tigera-secure"}, ls)).ShouldNot(HaveOccurred())
Expect(ls.Status.State).Should(Equal(operatorv1.TigeraStatusDegraded))
})

It("logs a warning but does not degrade when node count only exceeds replicas by 1", func() {
var replicas int32 = 1
ls := &operatorv1.LogStorage{}
ls.Name = "tigera-secure"
FillDefaults(ls)
ls.Spec.Indices.Replicas = &replicas
ls.Spec.Nodes.Count = 2
Expect(cli.Create(ctx, ls)).ShouldNot(HaveOccurred())

r, err := NewTestInitializer(cli, scheme, mockStatus, operatorv1.ProviderNone, dns.DefaultClusterDomain)
Expect(err).ShouldNot(HaveOccurred())
_, err = r.Reconcile(ctx, reconcile.Request{})
Expect(err).ShouldNot(HaveOccurred())
Expect(mockStatus.AssertNumberOfCalls(GinkgoT(), "SetDegraded", 0)).Should(BeTrue())

ls = &operatorv1.LogStorage{}
Expect(cli.Get(ctx, client.ObjectKey{Name: "tigera-secure"}, ls)).ShouldNot(HaveOccurred())
Expect(ls.Status.State).Should(Equal(operatorv1.TigeraStatusReady))
})

It("handles LogStorage deletion", func() {
// Create a LogStorage instance.
ls := &operatorv1.LogStorage{}
Expand Down Expand Up @@ -352,6 +392,93 @@ var _ = Describe("LogStorage Initializing controller", func() {
})
})

Context("validateReplicasForNodeCount", func() {
It("should return an error when replicas is 1 and node count is 1", func() {
var replicas int32 = 1
spec := &operatorv1.LogStorageSpec{
Nodes: &operatorv1.Nodes{Count: 1},
Indices: &operatorv1.Indices{Replicas: &replicas},
}
err, warning := validateReplicasForNodeCount(spec)
Expect(err).NotTo(BeNil())
Expect(warning).To(BeEmpty())
})

It("should return an error when replicas equals node count", func() {
var replicas int32 = 2
spec := &operatorv1.LogStorageSpec{
Nodes: &operatorv1.Nodes{Count: 2},
Indices: &operatorv1.Indices{Replicas: &replicas},
}
err, warning := validateReplicasForNodeCount(spec)
Expect(err).NotTo(BeNil())
Expect(warning).To(BeEmpty())
})

It("should return a warning when node count is only 1 more than replicas", func() {
var replicas int32 = 1
spec := &operatorv1.LogStorageSpec{
Nodes: &operatorv1.Nodes{Count: 2},
Indices: &operatorv1.Indices{Replicas: &replicas},
}
err, warning := validateReplicasForNodeCount(spec)
Expect(err).To(BeNil())
Expect(warning).To(ContainSubstring("only 1 more than"))
})

It("should return a warning when node count is 3 and replicas is 2", func() {
var replicas int32 = 2
spec := &operatorv1.LogStorageSpec{
Nodes: &operatorv1.Nodes{Count: 3},
Indices: &operatorv1.Indices{Replicas: &replicas},
}
err, warning := validateReplicasForNodeCount(spec)
Expect(err).To(BeNil())
Expect(warning).To(ContainSubstring("only 1 more than"))
})

It("should return no error or warning when node count exceeds replicas by 2 or more", func() {
var replicas int32 = 1
spec := &operatorv1.LogStorageSpec{
Nodes: &operatorv1.Nodes{Count: 3},
Indices: &operatorv1.Indices{Replicas: &replicas},
}
err, warning := validateReplicasForNodeCount(spec)
Expect(err).To(BeNil())
Expect(warning).To(BeEmpty())
})

It("should return no error or warning when replicas is 0 and node count is 1", func() {
var replicas int32 = 0
spec := &operatorv1.LogStorageSpec{
Nodes: &operatorv1.Nodes{Count: 1},
Indices: &operatorv1.Indices{Replicas: &replicas},
}
err, warning := validateReplicasForNodeCount(spec)
Expect(err).To(BeNil())
Expect(warning).To(BeEmpty())
})

It("should return no error or warning when indices is nil", func() {
spec := &operatorv1.LogStorageSpec{
Nodes: &operatorv1.Nodes{Count: 1},
}
err, warning := validateReplicasForNodeCount(spec)
Expect(err).To(BeNil())
Expect(warning).To(BeEmpty())
})

It("should return no error or warning when nodes is nil", func() {
var replicas int32 = 1
spec := &operatorv1.LogStorageSpec{
Indices: &operatorv1.Indices{Replicas: &replicas},
}
err, warning := validateReplicasForNodeCount(spec)
Expect(err).To(BeNil())
Expect(warning).To(BeEmpty())
})
})

Context("validateComponentResources", func() {
ls := operatorv1.LogStorage{Spec: operatorv1.LogStorageSpec{}}

Expand Down
Loading