Skip to content

Commit 9d7134d

Browse files
committed
use state tracker annotation
1 parent f3a9380 commit 9d7134d

File tree

14 files changed

+684
-56
lines changed

14 files changed

+684
-56
lines changed

internal/generated/controller/cluster/handler_v20250312.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,12 @@ func (h *Handlerv20250312) HandleDeleting(ctx context.Context, cluster *akov2gen
254254
}
255255

256256
func (h *Handlerv20250312) handleUpserted(ctx context.Context, currentState state.ResourceState, cluster *akov2generated.Cluster) (ctrlstate.Result, error) {
257-
update, err := ctrlstate.ShouldUpdate(cluster)
257+
deps, err := h.getDependencies(ctx, cluster)
258+
if err != nil {
259+
return result.Error(currentState, fmt.Errorf("failed to resolve Cluster dependencies: %w", err))
260+
}
261+
262+
update, err := ctrlstate.ShouldUpdate(cluster, deps...)
258263
if err != nil {
259264
return result.Error(currentState, reconcile.TerminalError(err))
260265
}
@@ -263,11 +268,6 @@ func (h *Handlerv20250312) handleUpserted(ctx context.Context, currentState stat
263268
return result.NextState(currentState, "Cluster is up to date. No update required.")
264269
}
265270

266-
deps, err := h.getDependencies(ctx, cluster)
267-
if err != nil {
268-
return result.Error(currentState, fmt.Errorf("failed to resolve Cluster dependencies: %w", err))
269-
}
270-
271271
body := &v20250312sdk.ClusterDescription20240805{}
272272
params := &v20250312sdk.UpdateClusterApiParams{
273273
ClusterName: *cluster.Spec.V20250312.Entry.Name,
@@ -319,12 +319,15 @@ func (h *Handlerv20250312) patchStatus(ctx context.Context, cluster *akov2genera
319319
return fmt.Errorf("failed to translate Cluster from Atlas: %w", err)
320320
}
321321

322-
err = h.kubeClient.Status().Patch(ctx, clusterCopy, client.MergeFrom(cluster))
322+
deps, err := h.getDependencies(ctx, cluster)
323323
if err != nil {
324-
return fmt.Errorf("failed to patch Cluster status: %w", err)
324+
return fmt.Errorf("failed to resolve Cluster dependencies: %w", err)
325325
}
326326

327-
return nil
327+
return ctrlstate.NewPatcher(cluster).
328+
UpdateStateTracker(deps...).
329+
UpdateStatus().
330+
Patch(ctx, h.kubeClient)
328331
}
329332

330333
func (h *Handlerv20250312) handleAtlasClusterState(

internal/generated/controller/flexcluster/handler_v20250312.go

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,12 @@ func (h *Handlerv20250312) handleUpserting(ctx context.Context, flexcluster *ako
231231

232232
// HandleIdle handles the creating and updating state for flex version v20250312
233233
func (h *Handlerv20250312) handleIdle(ctx context.Context, flexcluster *akov2generated.FlexCluster, currentState, finalState state.ResourceState) (ctrlstate.Result, error) {
234-
update, err := ctrlstate.ShouldUpdate(flexcluster)
234+
deps, err := h.getDependencies(ctx, flexcluster)
235+
if err != nil {
236+
return result.Error(currentState, fmt.Errorf("failed to get dependencies: %w", err))
237+
}
238+
239+
update, err := ctrlstate.ShouldUpdate(flexcluster, deps...)
235240
if err != nil {
236241
return result.Error(currentState, reconcile.TerminalError(err))
237242
}
@@ -240,11 +245,6 @@ func (h *Handlerv20250312) handleIdle(ctx context.Context, flexcluster *akov2gen
240245
return result.NextState(currentState, "Flex cluster up to date. No update required.")
241246
}
242247

243-
deps, err := h.getDependencies(ctx, flexcluster)
244-
if err != nil {
245-
return result.Error(currentState, fmt.Errorf("failed to get dependencies: %w", err))
246-
}
247-
248248
body := &v20250312sdk.FlexClusterDescriptionUpdate20241113{}
249249
params := &v20250312sdk.UpdateFlexClusterApiParams{
250250
FlexClusterDescriptionUpdate20241113: body,
@@ -264,15 +264,20 @@ func (h *Handlerv20250312) handleIdle(ctx context.Context, flexcluster *akov2gen
264264
if err != nil {
265265
return result.Error(currentState, fmt.Errorf("failed to get update cluster: %w", err))
266266
}
267+
267268
flexclusterCopy := flexcluster.DeepCopy()
268269
if _, err := h.translator.FromAPI(flexclusterCopy, atlasFlexCluster); err != nil {
269270
return result.Error(currentState, fmt.Errorf("failed to translate update cluster response: %w", err))
270271
}
271272

272-
err = h.kubeClient.Status().Patch(ctx, flexclusterCopy, client.MergeFrom(flexcluster))
273-
if err != nil {
274-
return result.Error(currentState, fmt.Errorf("failed to patch cluster status: %w", err))
273+
if err := ctrlstate.
274+
NewPatcher(flexclusterCopy).
275+
UpdateStateTracker(deps...).
276+
UpdateStatus().
277+
Patch(ctx, h.kubeClient); err != nil {
278+
return result.Error(currentState, fmt.Errorf("failed to patch cluster: %w", err))
275279
}
280+
276281
return result.NextState(finalState, "Updating Flex Cluster.")
277282
}
278283

@@ -296,9 +301,13 @@ func (h *Handlerv20250312) patchStatus(ctx context.Context, flexcluster *akov2ge
296301
return nil, fmt.Errorf("failed to translate get cluster response: %w", err)
297302
}
298303

299-
err = h.kubeClient.Status().Patch(ctx, flexclusterCopy, client.MergeFrom(flexcluster))
300-
if err != nil {
301-
return nil, fmt.Errorf("failed to patch cluster status: %w", err)
304+
if err := ctrlstate.
305+
NewPatcher(flexclusterCopy).
306+
UpdateStateTracker(deps...).
307+
UpdateStatus().
308+
Patch(ctx, h.kubeClient); err != nil {
309+
return nil, fmt.Errorf("failed to patch cluster: %w", err)
302310
}
311+
303312
return atlasFlexCluster, nil
304313
}

internal/generated/controller/group/handler_v20250312.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,7 @@ func (h *Handlerv20250312) HandleInitial(ctx context.Context, group *akov2genera
7070
return result.Error(state.StateInitial, fmt.Errorf("failed to translate group from Atlas: %w", err))
7171
}
7272

73-
err = h.kubeClient.Status().Patch(ctx, groupCopy, client.MergeFrom(group))
74-
if err != nil {
73+
if err := ctrlstate.NewPatcher(groupCopy).UpdateStatus().UpdateStateTracker().Patch(ctx, h.kubeClient); err != nil {
7574
return result.Error(state.StateInitial, fmt.Errorf("failed to patch group status: %w", err))
7675
}
7776

@@ -96,8 +95,7 @@ func (h *Handlerv20250312) HandleImportRequested(ctx context.Context, group *ako
9695
return result.Error(state.StateImportRequested, fmt.Errorf("failed to translate Group from Atlas: %w", err))
9796
}
9897

99-
err = h.kubeClient.Status().Patch(ctx, groupCopy, client.MergeFrom(group))
100-
if err != nil {
98+
if err := ctrlstate.NewPatcher(groupCopy).UpdateStatus().UpdateStateTracker().Patch(ctx, h.kubeClient); err != nil {
10199
return result.Error(state.StateImportRequested, fmt.Errorf("failed to patch Group status: %w", err))
102100
}
103101

@@ -203,9 +201,8 @@ func (h *Handlerv20250312) handleUpserted(ctx context.Context, currentState stat
203201
return result.Error(currentState, err)
204202
}
205203

206-
err = h.kubeClient.Status().Patch(ctx, groupCopy, client.MergeFrom(group))
207-
if err != nil {
208-
return result.Error(currentState, err)
204+
if err := ctrlstate.NewPatcher(groupCopy).UpdateStateTracker().UpdateStatus().Patch(ctx, h.kubeClient); err != nil {
205+
return result.Error(currentState, fmt.Errorf("failed to patch group: %w", err))
209206
}
210207

211208
return result.NextState(state.StateUpdated, "Group is updated.")

internal/indexer/indexer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ func RegisterAll(ctx context.Context, c cluster.Cluster, logger *zap.Logger) err
7878
indexers = append(indexers,
7979
NewAtlasDataFederationByProjectIDIndexer(ctx, c.GetClient(), logger),
8080
indexer.NewFlexClusterByGroupIndexer(logger),
81+
indexer.NewClusterByGroupIndexer(logger),
8182
)
8283
}
8384
return Register(ctx, c, indexers...)
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright 2025 MongoDB Inc
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package state
16+
17+
import "strings"
18+
19+
const (
20+
AnnotationReapplyTimestamp = "mongodb.internal.com/reapply-timestamp"
21+
AnnotationStateTracker = "mongodb.internal.com/state-tracker"
22+
)
23+
24+
var jsonPatchReplacer = strings.NewReplacer("/", "~1", "~", "~0")

pkg/controller/state/patcher.go

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
// Copyright 2025 MongoDB Inc
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package state
16+
17+
import (
18+
"context"
19+
20+
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
21+
"k8s.io/apimachinery/pkg/runtime"
22+
"sigs.k8s.io/controller-runtime/pkg/client"
23+
)
24+
25+
const FieldOwner = "mongodb-atlas-kubernetes"
26+
27+
type Patcher struct {
28+
patchedObj *unstructured.Unstructured
29+
obj client.Object
30+
statusChanged, objectChanged bool
31+
fieldOwner string
32+
err error
33+
}
34+
35+
func NewPatcher(obj client.Object) *Patcher {
36+
patchedObj := &unstructured.Unstructured{}
37+
patchedObj.SetAPIVersion(obj.GetObjectKind().GroupVersionKind().GroupVersion().String())
38+
patchedObj.SetKind(obj.GetObjectKind().GroupVersionKind().Kind)
39+
patchedObj.SetName(obj.GetName())
40+
patchedObj.SetNamespace(obj.GetNamespace())
41+
patchedObj.SetGeneration(obj.GetGeneration())
42+
return &Patcher{patchedObj: patchedObj, obj: obj, fieldOwner: FieldOwner}
43+
}
44+
45+
// UpdateStateTracker updates the state tracker annotation on the given object.
46+
func (p *Patcher) UpdateStateTracker(dependencies ...client.Object) *Patcher {
47+
if p.err != nil {
48+
return p
49+
}
50+
51+
stateTracker := ComputeStateTracker(p.obj, dependencies...)
52+
53+
annotations := p.patchedObj.GetAnnotations()
54+
if annotations == nil {
55+
annotations = make(map[string]string)
56+
}
57+
annotations[AnnotationStateTracker] = stateTracker
58+
p.patchedObj.SetAnnotations(annotations)
59+
60+
p.objectChanged = true
61+
return p
62+
}
63+
64+
// UpdateStatus updates the status of the given object.
65+
func (p *Patcher) UpdateStatus() *Patcher {
66+
if p.err != nil {
67+
return p
68+
}
69+
70+
content, err := runtime.DefaultUnstructuredConverter.ToUnstructured(p.obj)
71+
if err != nil {
72+
p.err = err
73+
return p
74+
}
75+
76+
if err := unstructured.SetNestedField(p.patchedObj.Object, content["status"], "status"); err != nil {
77+
p.err = err
78+
return p
79+
}
80+
81+
unstructured.RemoveNestedField(p.patchedObj.Object, "status", "conditions")
82+
83+
p.statusChanged = true
84+
return p
85+
}
86+
87+
func (p *Patcher) patchObject(ctx context.Context, c client.Client) {
88+
if p.err != nil || !p.objectChanged {
89+
return
90+
}
91+
92+
applyConfig := client.ApplyConfigurationFromUnstructured(p.patchedObj)
93+
err := c.Apply(ctx, applyConfig, client.FieldOwner(FieldOwner), client.ForceOwnership)
94+
p.err = err
95+
}
96+
97+
func (p *Patcher) patchStatus(ctx context.Context, c client.Client) {
98+
if p.err != nil || !p.statusChanged {
99+
return
100+
}
101+
102+
// SSA Apply() method for sub-resources is not yet supported, so we use Patch here.
103+
// See the following issue for more details: https://github.com/kubernetes-sigs/controller-runtime/issues/3183
104+
patchedCopy := p.patchedObj.DeepCopy()
105+
err := c.Status().Patch(ctx, patchedCopy, client.Apply, client.FieldOwner(FieldOwner), client.ForceOwnership)
106+
p.err = err
107+
}
108+
109+
// Patch applies the patches to the given object and updates both status and the annotations if they were modified.
110+
func (p *Patcher) Patch(ctx context.Context, c client.Client) error {
111+
if p.err != nil {
112+
return p.err
113+
}
114+
115+
p.patchStatus(ctx, c)
116+
p.patchObject(ctx, c)
117+
118+
return p.err
119+
}

0 commit comments

Comments
 (0)