Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions operator/pkg/labels/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ const (
// PodNodeIDKey is used to store the Redpanda NodeID of this pod.
PodNodeIDKey = "operator.redpanda.com/node-id"

// NodePoolSpecKey is used to store the NodePoolSpec in a StatefulSet's annotations.
// This allows the operator to correctly reconstruct a NodePoolSpec even
// after it was removed from Spec already.
NodePoolSpecKey = "cluster.redpanda.com/node-pool-spec"

nameKeyRedpandaVal = "redpanda"
nameKeyConsoleVal = "redpanda-console"
managedByOperatorVal = "redpanda-operator"
Expand Down
75 changes: 13 additions & 62 deletions operator/pkg/nodepools/pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@ package nodepools

import (
"context"
"encoding/json"
"fmt"
"slices"
"strings"

vectorizedv1alpha1 "github.com/redpanda-data/redpanda-operator/operator/api/vectorized/v1alpha1"
"github.com/redpanda-data/redpanda-operator/operator/pkg/labels"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -89,70 +88,22 @@ outer:
continue
}

replicas := sts.Spec.Replicas
if st, ok := cluster.Status.NodePools[npName]; ok {
replicas = &st.CurrentReplicas
}

var redpandaContainer *corev1.Container
for i := range sts.Spec.Template.Spec.Containers {
container := sts.Spec.Template.Spec.Containers[i]
if container.Name == "redpanda" {
redpandaContainer = &container
break
var np vectorizedv1alpha1.NodePoolSpec
if nodePoolSpecJSON, ok := sts.Annotations[labels.NodePoolSpecKey]; ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to work with already existing NodePool specs? If there is no annotation here, what's the expected behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As part of the reconciliation, this is set. So once this version is deployed, it will update the StatefulSet and add the annotation.in addition, NodePools are not yet in production. The only case not covered is already in-deleting nodepools - but since there's no prod clusters yet, we'll be fine

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we expect it to always be present, I feel we should handle the else case here either either an error or panic.

if err := json.Unmarshal([]byte(nodePoolSpecJSON), &np); err != nil {
return nil, fmt.Errorf("failed to synthesize deleted nodePool %s from its annotation %s", npName, labels.NodePoolSpecKey)
}
}
if redpandaContainer == nil {
return nil, fmt.Errorf("redpanda container not defined in STS %s template", sts.Name)
} else {
return nil, fmt.Errorf("could not find annotation %s on StatefulSet with name %s", labels.NodePoolSpecKey, sts.Name)
}

var datadirVcCapacity resource.Quantity
var datadirVcStorageClassName string
// Desired replicas for deleted NodePools is always zero.
np.Replicas = ptr.To(int32(0))

var cacheVcExists bool
var cacheVcCapacity resource.Quantity
var cacheVcStorageClassName string

for i := range sts.Spec.VolumeClaimTemplates {
vct := sts.Spec.VolumeClaimTemplates[i]
if vct.Name == "datadir" {
datadirVcCapacity = vct.Spec.Resources.Requests[corev1.ResourceStorage]
if vct.Spec.StorageClassName != nil {
datadirVcStorageClassName = ptr.Deref(vct.Spec.StorageClassName, "")
}
}
if vct.Name == "shadow-index-cache" {
cacheVcExists = true
cacheVcCapacity = vct.Spec.Resources.Requests[corev1.ResourceStorage]
if vct.Spec.StorageClassName != nil {
cacheVcStorageClassName = ptr.Deref(vct.Spec.StorageClassName, "")
}
}
}

np := vectorizedv1alpha1.NodePoolSpecWithDeleted{
NodePoolSpec: vectorizedv1alpha1.NodePoolSpec{
Name: npName,
Replicas: replicas,
Resources: vectorizedv1alpha1.RedpandaResourceRequirements{
ResourceRequirements: redpandaContainer.Resources,
},
Tolerations: sts.Spec.Template.Spec.Tolerations,
NodeSelector: sts.Spec.Template.Spec.NodeSelector,
Storage: vectorizedv1alpha1.StorageSpec{
Capacity: datadirVcCapacity,
StorageClassName: datadirVcStorageClassName,
},
},
Deleted: true,
}
if cacheVcExists {
np.CloudCacheStorage = vectorizedv1alpha1.StorageSpec{
Capacity: cacheVcCapacity,
StorageClassName: cacheVcStorageClassName,
}
}
nodePoolsWithDeleted = append(nodePoolsWithDeleted, &np)
nodePoolsWithDeleted = append(nodePoolsWithDeleted, &vectorizedv1alpha1.NodePoolSpecWithDeleted{
NodePoolSpec: np,
Deleted: true,
})
}
return nodePoolsWithDeleted, nil
}
8 changes: 8 additions & 0 deletions operator/pkg/resources/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,11 +428,19 @@ func (r *StatefulSetResource) obj(
nodePoolSelector = clusterLabels.AsAPISelector()
}

nodePoolSpecJSON, err := json.Marshal(r.nodePool.NodePoolSpec)
if err != nil {
return nil, fmt.Errorf("failed to marshal NodePoolSpec as JSON: %w", err)
}

ss := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: r.Key().Namespace,
Name: r.Key().Name,
Labels: nodePoolLabels,
Annotations: map[string]string{
labels.NodePoolSpecKey: string(nodePoolSpecJSON),
},
},
TypeMeta: metav1.TypeMeta{
Kind: "StatefulSet",
Expand Down
3 changes: 1 addition & 2 deletions operator/pkg/resources/statefulset_scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,8 @@ func (r *StatefulSetResource) handleScaling(ctx context.Context) error {
return r.setCurrentReplicas(ctx, *r.nodePool.Replicas, r.nodePool.Name, r.logger)
}

if ptr.Deref(r.nodePool.Replicas, 0) == npCurrentReplicas && !r.nodePool.Deleted {
if ptr.Deref(r.nodePool.Replicas, 0) == npCurrentReplicas {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not need this special case anymore (great!)

log.V(logger.DebugLevel).Info("No scaling changes required for this nodepool", "replicas", *r.nodePool.Replicas, "spec replicas", *r.LastObservedState.Spec.Replicas) // No changes to replicas, we do nothing here

return nil
}

Expand Down