Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions operator/internal/lifecycle/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,14 @@ func (r *ResourceClient[T, U]) WatchResources(builder Builder, cluster client.Ob
// custom mappings
builder.Watches(resourceType, handler.EnqueueRequestsFromMapFunc(func(ctx context.Context, o client.Object) []reconcile.Request {
if owner := r.ownershipResolver.OwnerForObject(o); owner != nil {
// NB: we do a Get here to make sure we've watched the given
// namespace for the client-side cache. If we haven't set up
// the namespace to be cached then we'll error, just ignore it.
toReconcile := cluster.DeepCopyObject().(client.Object)
if err := r.client.Get(context.Background(), *owner, toReconcile); err != nil {
return nil
}

return []reconcile.Request{{
NamespacedName: *owner,
}}
Expand Down Expand Up @@ -293,9 +301,27 @@ func (r *ResourceClient[T, U]) listAllOwnedResources(ctx context.Context, owner
if err != nil {
return nil, err
}
if legacyResolver, ok := r.ownershipResolver.(LegacyOwnershipResolver[T, U]); ok {
legacyMatching, err := r.listResources(ctx, resourceType, client.MatchingLabels(legacyResolver.GetLegacyOwnerLabels(owner)))
if err != nil {
return nil, err
}
// this may have duplicate entriea, make sure we filter them in the loop below
matching = append(matching, legacyMatching...)
}
filtered := []client.Object{}
seen := map[gvkObject]struct{}{}
for i := range matching {
object := matching[i]
key := gvkObject{
gvk: object.GetObjectKind().GroupVersionKind(),
nn: client.ObjectKeyFromObject(object),
}

if _, ok := seen[key]; ok {
continue
}
seen[key] = struct{}{}

// filter out unowned resources
mapping, err := getResourceScope(r.mapper, r.scheme, object)
Expand Down Expand Up @@ -397,9 +423,27 @@ func (r *ResourceClient[T, U]) fetchExistingPools(ctx context.Context, cluster U
if err != nil {
return nil, fmt.Errorf("listing StatefulSets: %w", err)
}
if legacyResolver, ok := r.ownershipResolver.(LegacyOwnershipResolver[T, U]); ok {
legacySets, err := r.listResources(ctx, &appsv1.StatefulSet{}, client.MatchingLabels(legacyResolver.GetLegacyOwnerLabels(cluster)))
if err != nil {
return nil, fmt.Errorf("listing legacy StatefulSets: %w", err)
}
// this may have duplicate entriea, make sure we filter them in the loop below
sets = append(sets, legacySets...)
}

existing := []*poolWithOrdinals{}
seen := map[gvkObject]struct{}{}
for _, set := range sets {
key := gvkObject{
gvk: set.GetObjectKind().GroupVersionKind(),
nn: client.ObjectKeyFromObject(set),
}
if _, ok := seen[key]; ok {
continue
}
seen[key] = struct{}{}

statefulSet := set.(*appsv1.StatefulSet)

if !r.nodePoolRenderer.IsNodePool(statefulSet) {
Expand Down
9 changes: 9 additions & 0 deletions operator/internal/lifecycle/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ type OwnershipResolver[T any, U Cluster[T]] interface {
OwnerForObject(object client.Object) *types.NamespacedName
}

// LegacyOwnershipResolver is an OwnershipResolver that also has supporrt
// for fetching resources provisioned by legacy implementations.
type LegacyOwnershipResolver[T any, U Cluster[T]] interface {
OwnershipResolver[T, U]
// GetLegacyOwnerLabels returns the minimal set of labels that
// can identify ownership of an object from legacy implementations.
GetLegacyOwnerLabels(cluster U) map[string]string
}

// SimpleResourceRenderer handles compilation of all desired
// resources to be created by a cluster. These resources should
// be "simple" in nature in that we don't need to manually control
Expand Down
27 changes: 14 additions & 13 deletions operator/internal/lifecycle/v2_ownership.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ func (m *V2OwnershipResolver) AddLabels(cluster *ClusterWithPools) map[string]st
// GetOwnerLabels returns the labels that can identify a resource belonging
// to a given cluster.
func (m *V2OwnershipResolver) GetOwnerLabels(cluster *ClusterWithPools) map[string]string {
return map[string]string{
m.namespaceLabel: cluster.GetNamespace(),
m.ownerLabel: cluster.GetName(),
m.operatorLabel: "v2",
}
}

// GetLegacyOwnerLabels returns the labels that can identify a resource belonging
// to a given cluster coming from our old flux-based operator.
func (m *V2OwnershipResolver) GetLegacyOwnerLabels(cluster *ClusterWithPools) map[string]string {
return map[string]string{
fluxNameLabel: cluster.Name,
fluxNamespaceLabel: cluster.Namespace,
Expand All @@ -56,21 +66,12 @@ func (m *V2OwnershipResolver) GetOwnerLabels(cluster *ClusterWithPools) map[stri

// ownerFromLabels returns the v2 cluster based on a resource's labels.
func (m *V2OwnershipResolver) ownerFromLabels(labels map[string]string) types.NamespacedName {
owner := labels[m.ownerLabel]
if owner == "" {
// fallback to flux labels
owner = labels[fluxNameLabel]
if labels[m.operatorLabel] != "v2" {
return types.NamespacedName{}
}

namespace := labels[m.namespaceLabel]
if namespace == "" {
// fallback to flux labels
namespace = labels[fluxNamespaceLabel]
}

return types.NamespacedName{
Namespace: namespace,
Name: owner,
Namespace: labels[m.namespaceLabel],
Name: labels[m.ownerLabel],
}
}

Expand Down
18 changes: 15 additions & 3 deletions operator/internal/lifecycle/v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,9 @@ func TestV2ResourceClient(t *testing.T) {
redpanda.Namespace = file.Name
cluster := NewClusterWithPools(redpanda)

ownerLabels := resourceClient.ownershipResolver.GetOwnerLabels(cluster)
legacyResolver := resourceClient.ownershipResolver.(LegacyOwnershipResolver[ClusterWithPools, *ClusterWithPools])
legacyOwnerLabels := legacyResolver.GetLegacyOwnerLabels(cluster)
ownerLabels := legacyResolver.GetOwnerLabels(cluster)

pools, err := resourceClient.nodePoolRenderer.Render(ctx, cluster)
require.NoError(t, err)
Expand All @@ -125,23 +127,33 @@ func TestV2ResourceClient(t *testing.T) {
if labels == nil {
labels = map[string]string{}
}

// copied from the original redpanda_controller normalization code
labels["helm.toolkit.fluxcd.io/name"] = cluster.Name
labels["helm.toolkit.fluxcd.io/namespace"] = cluster.Namespace
object.SetLabels(labels)

for label, value := range ownerLabels {
for label, value := range legacyOwnerLabels {
objectLabel, ok := labels[label]
require.True(t, ok, "no label %q found on %q: %s", label, object.GetObjectKind().GroupVersionKind().String(), client.ObjectKeyFromObject(object).String())
require.Equal(t, objectLabel, value)
}

require.Equal(t, client.ObjectKeyFromObject(cluster), *resourceClient.ownershipResolver.OwnerForObject(object))
// we don't pick up ownership based on legacy labels
require.Nil(t, resourceClient.ownershipResolver.OwnerForObject(object))

for label, value := range resourceClient.ownershipResolver.AddLabels(cluster) {
labels[label] = value
}
object.SetLabels(labels)

for label, value := range ownerLabels {
objectLabel, ok := labels[label]
require.True(t, ok, "no label %q found on %q: %s", label, object.GetObjectKind().GroupVersionKind().String(), client.ObjectKeyFromObject(object).String())
require.Equal(t, objectLabel, value)
}

require.Equal(t, client.ObjectKeyFromObject(cluster), *resourceClient.ownershipResolver.OwnerForObject(object))
}

for _, pool := range pools {
Expand Down