Skip to content

Commit 5037ba1

Browse files
committed
use state tracker annotation
1 parent f3a9380 commit 5037ba1

File tree

14 files changed

+618
-55
lines changed

14 files changed

+618
-55
lines changed

internal/generated/controller/cluster/handler_v20250312.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,12 @@ func (h *Handlerv20250312) HandleDeleting(ctx context.Context, cluster *akov2gen
254254
}
255255

256256
func (h *Handlerv20250312) handleUpserted(ctx context.Context, currentState state.ResourceState, cluster *akov2generated.Cluster) (ctrlstate.Result, error) {
257-
update, err := ctrlstate.ShouldUpdate(cluster)
257+
deps, err := h.getDependencies(ctx, cluster)
258+
if err != nil {
259+
return result.Error(currentState, fmt.Errorf("failed to resolve Cluster dependencies: %w", err))
260+
}
261+
262+
update, err := ctrlstate.ShouldUpdate(cluster, deps...)
258263
if err != nil {
259264
return result.Error(currentState, reconcile.TerminalError(err))
260265
}
@@ -263,11 +268,6 @@ func (h *Handlerv20250312) handleUpserted(ctx context.Context, currentState stat
263268
return result.NextState(currentState, "Cluster is up to date. No update required.")
264269
}
265270

266-
deps, err := h.getDependencies(ctx, cluster)
267-
if err != nil {
268-
return result.Error(currentState, fmt.Errorf("failed to resolve Cluster dependencies: %w", err))
269-
}
270-
271271
body := &v20250312sdk.ClusterDescription20240805{}
272272
params := &v20250312sdk.UpdateClusterApiParams{
273273
ClusterName: *cluster.Spec.V20250312.Entry.Name,
@@ -319,12 +319,15 @@ func (h *Handlerv20250312) patchStatus(ctx context.Context, cluster *akov2genera
319319
return fmt.Errorf("failed to translate Cluster from Atlas: %w", err)
320320
}
321321

322-
err = h.kubeClient.Status().Patch(ctx, clusterCopy, client.MergeFrom(cluster))
322+
deps, err := h.getDependencies(ctx, cluster)
323323
if err != nil {
324-
return fmt.Errorf("failed to patch Cluster status: %w", err)
324+
return fmt.Errorf("failed to resolve Cluster dependencies: %w", err)
325325
}
326326

327-
return nil
327+
return ctrlstate.NewPatcher(cluster).
328+
UpdateStateTracker(deps...).
329+
UpdateStatus().
330+
Patch(ctx, h.kubeClient)
328331
}
329332

330333
func (h *Handlerv20250312) handleAtlasClusterState(

internal/generated/controller/flexcluster/handler_v20250312.go

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,12 @@ func (h *Handlerv20250312) handleUpserting(ctx context.Context, flexcluster *ako
231231

232232
// HandleIdle handles the creating and updating state for flex version v20250312
233233
func (h *Handlerv20250312) handleIdle(ctx context.Context, flexcluster *akov2generated.FlexCluster, currentState, finalState state.ResourceState) (ctrlstate.Result, error) {
234-
update, err := ctrlstate.ShouldUpdate(flexcluster)
234+
deps, err := h.getDependencies(ctx, flexcluster)
235+
if err != nil {
236+
return result.Error(currentState, fmt.Errorf("failed to get dependencies: %w", err))
237+
}
238+
239+
update, err := ctrlstate.ShouldUpdate(flexcluster, deps...)
235240
if err != nil {
236241
return result.Error(currentState, reconcile.TerminalError(err))
237242
}
@@ -240,11 +245,6 @@ func (h *Handlerv20250312) handleIdle(ctx context.Context, flexcluster *akov2gen
240245
return result.NextState(currentState, "Flex cluster up to date. No update required.")
241246
}
242247

243-
deps, err := h.getDependencies(ctx, flexcluster)
244-
if err != nil {
245-
return result.Error(currentState, fmt.Errorf("failed to get dependencies: %w", err))
246-
}
247-
248248
body := &v20250312sdk.FlexClusterDescriptionUpdate20241113{}
249249
params := &v20250312sdk.UpdateFlexClusterApiParams{
250250
FlexClusterDescriptionUpdate20241113: body,
@@ -264,15 +264,20 @@ func (h *Handlerv20250312) handleIdle(ctx context.Context, flexcluster *akov2gen
264264
if err != nil {
265265
return result.Error(currentState, fmt.Errorf("failed to get update cluster: %w", err))
266266
}
267+
267268
flexclusterCopy := flexcluster.DeepCopy()
268269
if _, err := h.translator.FromAPI(flexclusterCopy, atlasFlexCluster); err != nil {
269270
return result.Error(currentState, fmt.Errorf("failed to translate update cluster response: %w", err))
270271
}
271272

272-
err = h.kubeClient.Status().Patch(ctx, flexclusterCopy, client.MergeFrom(flexcluster))
273-
if err != nil {
274-
return result.Error(currentState, fmt.Errorf("failed to patch cluster status: %w", err))
273+
if err := ctrlstate.
274+
NewPatcher(flexclusterCopy).
275+
UpdateStateTracker(deps...).
276+
UpdateStatus().
277+
Patch(ctx, h.kubeClient); err != nil {
278+
return result.Error(currentState, fmt.Errorf("failed to patch cluster: %w", err))
275279
}
280+
276281
return result.NextState(finalState, "Updating Flex Cluster.")
277282
}
278283

@@ -296,9 +301,13 @@ func (h *Handlerv20250312) patchStatus(ctx context.Context, flexcluster *akov2ge
296301
return nil, fmt.Errorf("failed to translate get cluster response: %w", err)
297302
}
298303

299-
err = h.kubeClient.Status().Patch(ctx, flexclusterCopy, client.MergeFrom(flexcluster))
300-
if err != nil {
301-
return nil, fmt.Errorf("failed to patch cluster status: %w", err)
304+
if err := ctrlstate.
305+
NewPatcher(flexclusterCopy).
306+
UpdateStateTracker(deps...).
307+
UpdateStatus().
308+
Patch(ctx, h.kubeClient); err != nil {
309+
return nil, fmt.Errorf("failed to patch cluster: %w", err)
302310
}
311+
303312
return atlasFlexCluster, nil
304313
}

internal/generated/controller/group/handler_v20250312.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,7 @@ func (h *Handlerv20250312) HandleInitial(ctx context.Context, group *akov2genera
7070
return result.Error(state.StateInitial, fmt.Errorf("failed to translate group from Atlas: %w", err))
7171
}
7272

73-
err = h.kubeClient.Status().Patch(ctx, groupCopy, client.MergeFrom(group))
74-
if err != nil {
73+
if err := ctrlstate.NewPatcher(groupCopy).UpdateStatus().UpdateStateTracker().Patch(ctx, h.kubeClient); err != nil {
7574
return result.Error(state.StateInitial, fmt.Errorf("failed to patch group status: %w", err))
7675
}
7776

@@ -96,8 +95,7 @@ func (h *Handlerv20250312) HandleImportRequested(ctx context.Context, group *ako
9695
return result.Error(state.StateImportRequested, fmt.Errorf("failed to translate Group from Atlas: %w", err))
9796
}
9897

99-
err = h.kubeClient.Status().Patch(ctx, groupCopy, client.MergeFrom(group))
100-
if err != nil {
98+
if err := ctrlstate.NewPatcher(groupCopy).UpdateStatus().UpdateStateTracker().Patch(ctx, h.kubeClient); err != nil {
10199
return result.Error(state.StateImportRequested, fmt.Errorf("failed to patch Group status: %w", err))
102100
}
103101

@@ -203,9 +201,8 @@ func (h *Handlerv20250312) handleUpserted(ctx context.Context, currentState stat
203201
return result.Error(currentState, err)
204202
}
205203

206-
err = h.kubeClient.Status().Patch(ctx, groupCopy, client.MergeFrom(group))
207-
if err != nil {
208-
return result.Error(currentState, err)
204+
if err := ctrlstate.NewPatcher(groupCopy).UpdateStateTracker().UpdateStatus().Patch(ctx, h.kubeClient); err != nil {
205+
return result.Error(currentState, fmt.Errorf("failed to patch group: %w", err))
209206
}
210207

211208
return result.NextState(state.StateUpdated, "Group is updated.")

internal/generated/crds/crds.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2007,7 +2007,7 @@ spec:
20072007
Alphanumeric string that authenticates this database user against the database specified in `databaseName`. To authenticate with SCRAM-SHA, you must specify this parameter. This parameter doesn't appear in this response.
20082008
properties:
20092009
key:
2010-
default: .data.password
2010+
default: password
20112011
description: Key of the secret data containing the sensitive
20122012
field value, defaults to "password".
20132013
type: string

internal/indexer/indexer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ func RegisterAll(ctx context.Context, c cluster.Cluster, logger *zap.Logger) err
7878
indexers = append(indexers,
7979
NewAtlasDataFederationByProjectIDIndexer(ctx, c.GetClient(), logger),
8080
indexer.NewFlexClusterByGroupIndexer(logger),
81+
indexer.NewClusterByGroupIndexer(logger),
8182
)
8283
}
8384
return Register(ctx, c, indexers...)
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package state
2+
3+
import "strings"
4+
5+
const (
6+
AnnotationReapplyTimestamp = "mongodb.internal.com/reapply-timestamp"
7+
AnnotationStateTracker = "mongodb.internal.com/state-tracker"
8+
)
9+
10+
var jsonPatchReplacer = strings.NewReplacer("/", "~1", "~", "~0")

pkg/controller/state/patcher.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package state
2+
3+
import (
4+
"context"
5+
6+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
7+
"k8s.io/apimachinery/pkg/runtime"
8+
"sigs.k8s.io/controller-runtime/pkg/client"
9+
)
10+
11+
const FieldOwner = "mongodb-atlas-kubernetes"
12+
13+
type Patcher struct {
14+
patchedObj *unstructured.Unstructured
15+
obj client.Object
16+
statusChanged, objectChanged bool
17+
fieldOwner string
18+
err error
19+
}
20+
21+
func NewPatcher(obj client.Object) *Patcher {
22+
patchedObj := &unstructured.Unstructured{}
23+
patchedObj.SetAPIVersion(obj.GetObjectKind().GroupVersionKind().GroupVersion().String())
24+
patchedObj.SetKind(obj.GetObjectKind().GroupVersionKind().Kind)
25+
patchedObj.SetName(obj.GetName())
26+
patchedObj.SetNamespace(obj.GetNamespace())
27+
patchedObj.SetGeneration(obj.GetGeneration())
28+
return &Patcher{patchedObj: patchedObj, obj: obj, fieldOwner: FieldOwner}
29+
}
30+
31+
// UpdateStateTracker updates the state tracker annotation on the given object.
32+
func (p *Patcher) UpdateStateTracker(dependencies ...client.Object) *Patcher {
33+
if p.err != nil {
34+
return p
35+
}
36+
37+
stateTracker := ComputeStateTracker(p.obj, dependencies...)
38+
39+
annotations := p.patchedObj.GetAnnotations()
40+
if annotations == nil {
41+
annotations = make(map[string]string)
42+
}
43+
annotations[AnnotationStateTracker] = stateTracker
44+
p.patchedObj.SetAnnotations(annotations)
45+
46+
p.objectChanged = true
47+
return p
48+
}
49+
50+
// UpdateStatus updates the status of the given object.
51+
func (p *Patcher) UpdateStatus() *Patcher {
52+
if p.err != nil {
53+
return p
54+
}
55+
56+
content, err := runtime.DefaultUnstructuredConverter.ToUnstructured(p.obj)
57+
if err != nil {
58+
p.err = err
59+
return p
60+
}
61+
62+
if err := unstructured.SetNestedField(p.patchedObj.Object, content["status"], "status"); err != nil {
63+
p.err = err
64+
return p
65+
}
66+
67+
unstructured.RemoveNestedField(p.patchedObj.Object, "status", "conditions")
68+
69+
p.statusChanged = true
70+
return p
71+
}
72+
73+
func (p *Patcher) patchObject(ctx context.Context, c client.Client) {
74+
if p.err != nil || !p.objectChanged {
75+
return
76+
}
77+
78+
applyConfig := client.ApplyConfigurationFromUnstructured(p.patchedObj)
79+
err := c.Apply(ctx, applyConfig, client.FieldOwner(FieldOwner), client.ForceOwnership)
80+
p.err = err
81+
}
82+
83+
func (p *Patcher) patchStatus(ctx context.Context, c client.Client) {
84+
if p.err != nil || !p.statusChanged {
85+
return
86+
}
87+
88+
// SSA Apply() method for sub-resources is not yet supported, so we use Patch here.
89+
// See the following issue for more details: https://github.com/kubernetes-sigs/controller-runtime/issues/3183
90+
patchedCopy := p.patchedObj.DeepCopy()
91+
err := c.Status().Patch(ctx, patchedCopy, client.Apply, client.FieldOwner(FieldOwner), client.ForceOwnership)
92+
p.err = err
93+
}
94+
95+
// Patch applies the patches to the given object and updates both status and the annotations if they were modified.
96+
func (p *Patcher) Patch(ctx context.Context, c client.Client) error {
97+
if p.err != nil {
98+
return p.err
99+
}
100+
101+
p.patchStatus(ctx, c)
102+
p.patchObject(ctx, c)
103+
104+
return p.err
105+
}

0 commit comments

Comments
 (0)