Skip to content

Commit 0464b6a

Browse files
authored
CLOUDP-359047: Add Flex Reconciler (#2955)
* CLOUDP-359047: Add Flex Cluster reconciler support Signed-off-by: jose.vazquez <[email protected]> * Register flexcluster * Add manual group id resolution * Fix time until lint hint --------- Signed-off-by: jose.vazquez <[email protected]>
1 parent 717b4ef commit 0464b6a

File tree

5 files changed

+239
-38
lines changed

5 files changed

+239
-38
lines changed

internal/controller/registry.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import (
4747
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/dryrun"
4848
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/featureflags"
4949
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/generated/controller/connectionsecret"
50+
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/generated/controller/flexcluster"
5051
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/generated/controller/group"
5152
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/pointer"
5253
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/version"
@@ -147,7 +148,14 @@ func (r *Registry) registerControllers(c cluster.Cluster, ap atlas.Provider) err
147148
if err != nil {
148149
return fmt.Errorf("error creating group reconciler: %w", err)
149150
}
150-
reconcilers = append(reconcilers, newCtrlStateReconciler(groupReconciler, r.maxConcurrentReconciles))
151+
flexController, err := flexcluster.NewFlexClusterReconciler(c, ap, r.logger, r.globalSecretRef, r.deletionProtection, true, r.defaultPredicates())
152+
if err != nil {
153+
return fmt.Errorf("error creating group reconciler: %w", err)
154+
}
155+
reconcilers = append(reconcilers,
156+
newCtrlStateReconciler(groupReconciler, r.maxConcurrentReconciles),
157+
newCtrlStateReconciler(flexController, r.maxConcurrentReconciles),
158+
)
151159
}
152160

153161
r.reconcilers = reconcilers

internal/generated/controller/flexcluster/handler_v20250312.go

Lines changed: 173 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ package flexcluster
1616

1717
import (
1818
"context"
19+
"errors"
20+
"fmt"
1921

2022
v20250312sdk "go.mongodb.org/atlas-sdk/v20250312006/admin"
2123
controllerruntime "sigs.k8s.io/controller-runtime"
@@ -24,7 +26,9 @@ import (
2426
controller "sigs.k8s.io/controller-runtime/pkg/controller"
2527
reconcile "sigs.k8s.io/controller-runtime/pkg/reconcile"
2628

29+
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/customresource"
2730
crapi "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/crapi"
31+
"github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/generated/references"
2832
akov2generated "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/nextapi/generated/v1"
2933
ctrlstate "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/state"
3034
result "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/result"
@@ -47,67 +51,138 @@ func NewHandlerv20250312(kubeClient client.Client, atlasClient *v20250312sdk.API
4751
}
4852
}
4953

54+
// TODO: Autogenerate with Scaffolder
55+
func (h *Handlerv20250312) getGroupId(ctx context.Context, flexcluster *akov2generated.FlexCluster) (string, error) {
56+
groupID := ""
57+
58+
if flexcluster.Spec.V20250312.GroupId != nil {
59+
groupID = *flexcluster.Spec.V20250312.GroupId
60+
} else {
61+
var err error
62+
groupID, err = references.GetGroupID(
63+
ctx, h.kubeClient, flexcluster.Spec.V20250312.GroupRef, flexcluster.GetNamespace())
64+
if err != nil {
65+
return "", err
66+
}
67+
}
68+
69+
return groupID, nil
70+
}
71+
5072
// HandleInitial handles the initial state for version v20250312
5173
func (h *Handlerv20250312) HandleInitial(ctx context.Context, flexcluster *akov2generated.FlexCluster) (ctrlstate.Result, error) {
52-
// TODO: Implement initial state logic
53-
// TODO: Use h.atlasProvider.SdkClientSet(ctx, h.globalSecretRef, h.log) to get Atlas SDK client
54-
return result.NextState(state.StateUpdated, "Updated AtlasFlexCluster.")
74+
groupID, err := h.getGroupId(ctx, flexcluster)
75+
if err != nil {
76+
return result.Error(state.StateInitial, reconcile.TerminalError(err))
77+
}
78+
flexClusterRequest := v20250312sdk.FlexClusterDescriptionCreate20241113{}
79+
if err := h.translator.ToAPI(&flexClusterRequest, flexcluster); err != nil {
80+
return result.Error(state.StateInitial, fmt.Errorf("failed to translate flex create request: %w", err))
81+
}
82+
atlasFlexCluster, _, err := h.atlasClient.FlexClustersApi.CreateFlexCluster(ctx, groupID, &flexClusterRequest).Execute()
83+
if err != nil {
84+
return result.Error(state.StateInitial, fmt.Errorf("failed to create flex cluster: %w", err))
85+
}
86+
newFlexCluster := flexcluster.DeepCopy()
87+
if _, err := h.translator.FromAPI(newFlexCluster, atlasFlexCluster); err != nil {
88+
return result.Error(state.StateInitial, fmt.Errorf("failed to translate flex create response: %w", err))
89+
}
90+
91+
err = h.kubeClient.Status().Patch(ctx, newFlexCluster, client.MergeFrom(flexcluster))
92+
if err != nil {
93+
return result.Error(state.StateInitial, fmt.Errorf("failed to patch flex cluster status: %w", err))
94+
}
95+
return result.NextState(state.StateCreating, "Creating Flex Cluster.")
5596
}
5697

5798
// HandleImportRequested handles the importrequested state for version v20250312
5899
func (h *Handlerv20250312) HandleImportRequested(ctx context.Context, flexcluster *akov2generated.FlexCluster) (ctrlstate.Result, error) {
59-
// TODO: Implement importrequested state logic
60-
// TODO: Use h.atlasProvider.SdkClientSet(ctx, h.globalSecretRef, h.log) to get Atlas SDK client
61-
return result.NextState(state.StateImported, "Import completed")
100+
externalName, ok := flexcluster.GetAnnotations()["mongodb.com/external-name"]
101+
if !ok {
102+
return result.Error(state.StateImportRequested, errors.New("missing mongodb.com/external-name"))
103+
}
104+
105+
externalGroupID, ok := flexcluster.GetAnnotations()["mongodb.com/external-group-id"]
106+
if !ok {
107+
return result.Error(state.StateImportRequested, errors.New("missing mongodb.com/external-group-id"))
108+
}
109+
flexClusterCopy := flexcluster.DeepCopy()
110+
flexClusterCopy.Spec.V20250312.Entry.Name = externalName
111+
flexClusterCopy.Spec.V20250312.GroupId = &externalGroupID
112+
_, err := h.getToPatchStatus(ctx, flexClusterCopy)
113+
if err != nil {
114+
return result.Error(state.StateImportRequested, err)
115+
}
116+
return result.NextState(state.StateImported, "Imported Flex Cluster.")
62117
}
63118

64119
// HandleImported handles the imported state for version v20250312
65120
func (h *Handlerv20250312) HandleImported(ctx context.Context, flexcluster *akov2generated.FlexCluster) (ctrlstate.Result, error) {
66-
// TODO: Implement imported state logic
67-
// TODO: Use h.atlasProvider.SdkClientSet(ctx, h.globalSecretRef, h.log) to get Atlas SDK client
68-
return result.NextState(state.StateUpdated, "Ready")
121+
return h.handleIdle(ctx, flexcluster, state.StateCreated, state.StateUpdating)
69122
}
70123

71124
// HandleCreating handles the creating state for version v20250312
72125
func (h *Handlerv20250312) HandleCreating(ctx context.Context, flexcluster *akov2generated.FlexCluster) (ctrlstate.Result, error) {
73-
// TODO: Implement creating state logic
74-
// TODO: Use h.atlasProvider.SdkClientSet(ctx, h.globalSecretRef, h.log) to get Atlas SDK client
75-
return result.NextState(state.StateCreated, "Resource created")
126+
return h.handleUpserting(ctx, flexcluster, state.StateCreating, state.StateCreated)
76127
}
77128

78129
// HandleCreated handles the created state for version v20250312
79130
func (h *Handlerv20250312) HandleCreated(ctx context.Context, flexcluster *akov2generated.FlexCluster) (ctrlstate.Result, error) {
80-
// TODO: Implement created state logic
81-
// TODO: Use h.atlasProvider.SdkClientSet(ctx, h.globalSecretRef, h.log) to get Atlas SDK client
82-
return result.NextState(state.StateUpdated, "Ready")
131+
return h.handleIdle(ctx, flexcluster, state.StateCreated, state.StateUpdating)
83132
}
84133

85134
// HandleUpdating handles the updating state for version v20250312
86135
func (h *Handlerv20250312) HandleUpdating(ctx context.Context, flexcluster *akov2generated.FlexCluster) (ctrlstate.Result, error) {
87-
// TODO: Implement updating state logic
88-
// TODO: Use h.atlasProvider.SdkClientSet(ctx, h.globalSecretRef, h.log) to get Atlas SDK client
89-
return result.NextState(state.StateUpdated, "Update completed")
136+
return h.handleUpserting(ctx, flexcluster, state.StateUpdating, state.StateUpdated)
90137
}
91138

92139
// HandleUpdated handles the updated state for version v20250312
93140
func (h *Handlerv20250312) HandleUpdated(ctx context.Context, flexcluster *akov2generated.FlexCluster) (ctrlstate.Result, error) {
94-
// TODO: Implement updated state logic
95-
// TODO: Use h.atlasProvider.SdkClientSet(ctx, h.globalSecretRef, h.log) to get Atlas SDK client
96-
return result.NextState(state.StateUpdated, "Ready")
141+
return h.handleIdle(ctx, flexcluster, state.StateUpdated, state.StateUpdating)
97142
}
98143

99144
// HandleDeletionRequested handles the deletionrequested state for version v20250312
100145
func (h *Handlerv20250312) HandleDeletionRequested(ctx context.Context, flexcluster *akov2generated.FlexCluster) (ctrlstate.Result, error) {
101-
// TODO: Implement deletionrequested state logic
102-
// TODO: Use h.atlasProvider.SdkClientSet(ctx, h.globalSecretRef, h.log) to get Atlas SDK client
103-
return result.NextState(state.StateDeleting, "Deletion started")
146+
if customresource.IsResourcePolicyKeepOrDefault(flexcluster, h.deletionProtection) {
147+
return result.NextState(state.StateDeleted, "Flex Cluster deleted.")
148+
}
149+
150+
if flexcluster.Status.V20250312 == nil {
151+
return result.NextState(state.StateDeleted, "Flex Cluster is unamanged.")
152+
}
153+
154+
groupID, err := h.getGroupId(ctx, flexcluster)
155+
if err != nil {
156+
return result.Error(state.StateInitial, reconcile.TerminalError(err))
157+
}
158+
_, err = h.atlasClient.FlexClustersApi.DeleteFlexCluster(
159+
ctx, groupID, flexcluster.Spec.V20250312.Entry.Name).Execute()
160+
161+
switch {
162+
case v20250312sdk.IsErrorCode(err, "CLUSTER_NOT_FOUND"):
163+
return result.NextState(state.StateDeleted, "Flex Cluster was deleted in Atlas.")
164+
case err != nil:
165+
return result.Error(state.StateDeletionRequested, fmt.Errorf("failed to delete flex cluster: %w", err))
166+
}
167+
168+
return result.NextState(state.StateDeleting, "Deleting Flex Cluster.")
104169
}
105170

106171
// HandleDeleting handles the deleting state for version v20250312
107172
func (h *Handlerv20250312) HandleDeleting(ctx context.Context, flexcluster *akov2generated.FlexCluster) (ctrlstate.Result, error) {
108-
// TODO: Implement deleting state logic
109-
// TODO: Use h.atlasProvider.SdkClientSet(ctx, h.globalSecretRef, h.log) to get Atlas SDK client
110-
return result.NextState(state.StateDeleted, "Deleted")
173+
groupID, err := h.getGroupId(ctx, flexcluster)
174+
if err != nil {
175+
return result.Error(state.StateInitial, reconcile.TerminalError(err))
176+
}
177+
_, _, err = h.atlasClient.FlexClustersApi.GetFlexCluster(
178+
ctx, groupID, flexcluster.Spec.V20250312.Entry.Name).Execute()
179+
switch {
180+
case v20250312sdk.IsErrorCode(err, "CLUSTER_NOT_FOUND"):
181+
return result.NextState(state.StateDeleted, "Deleted")
182+
case err != nil:
183+
return result.Error(state.StateDeletionRequested, fmt.Errorf("failed to delete flexcluster: %w", err))
184+
}
185+
return result.NextState(state.StateDeleting, "Deleting Flex Cluster.")
111186
}
112187

113188
// For returns the resource and predicates for the controller
@@ -117,6 +192,75 @@ func (h *Handlerv20250312) For() (client.Object, builder.Predicates) {
117192

118193
// SetupWithManager sets up the controller with the Manager
119194
func (h *Handlerv20250312) SetupWithManager(mgr controllerruntime.Manager, rec reconcile.Reconciler, defaultOptions controller.Options) error {
120-
// This method is not used for version-specific handlers but required by StateHandler interface
121-
return nil
195+
panic("do not setup Handlerv20250312")
196+
}
197+
198+
// HandleUpserting handles the creating and updating state for flex version v20250312
199+
func (h *Handlerv20250312) handleUpserting(ctx context.Context, flexcluster *akov2generated.FlexCluster, currentState, finalState state.ResourceState) (ctrlstate.Result, error) {
200+
atlasFlexCluster, err := h.getToPatchStatus(ctx, flexcluster)
201+
if err != nil {
202+
return result.Error(currentState, err)
203+
}
204+
if atlasFlexCluster.GetStateName() == "CREATING" || atlasFlexCluster.GetStateName() == "UPDATING" {
205+
return result.NextState(currentState, "Upserting Flex Cluster.")
206+
}
207+
return result.NextState(finalState, "Upserted Flex Cluster.")
208+
}
209+
210+
// HandleIdle handles the creating and updating state for flex version v20250312
211+
func (h *Handlerv20250312) handleIdle(ctx context.Context, flexcluster *akov2generated.FlexCluster, currentState, finalState state.ResourceState) (ctrlstate.Result, error) {
212+
update, err := ctrlstate.ShouldUpdate(flexcluster)
213+
if err != nil {
214+
return result.Error(currentState, reconcile.TerminalError(err))
215+
}
216+
217+
if !update {
218+
return result.NextState(currentState, "Flex cluster up to date. No update required.")
219+
}
220+
221+
groupID, err := h.getGroupId(ctx, flexcluster)
222+
if err != nil {
223+
return result.Error(state.StateInitial, reconcile.TerminalError(err))
224+
}
225+
flexClusterUpdate := v20250312sdk.FlexClusterDescriptionUpdate20241113{}
226+
if err := h.translator.ToAPI(&flexClusterUpdate, flexcluster); err != nil {
227+
return result.Error(state.StateInitial, fmt.Errorf("failed to translate update flex cluster request: %w", err))
228+
}
229+
atlasFlexCluster, _, err := h.atlasClient.FlexClustersApi.UpdateFlexCluster(
230+
ctx, groupID, flexcluster.Spec.V20250312.Entry.Name, &flexClusterUpdate).Execute()
231+
if err != nil {
232+
return result.Error(currentState, fmt.Errorf("failed to get update cluster: %w", err))
233+
}
234+
flexclusterCopy := flexcluster.DeepCopy()
235+
if _, err := h.translator.FromAPI(flexclusterCopy, atlasFlexCluster); err != nil {
236+
return result.Error(currentState, fmt.Errorf("failed to translate update cluster response: %w", err))
237+
}
238+
239+
err = h.kubeClient.Status().Patch(ctx, flexclusterCopy, client.MergeFrom(flexcluster))
240+
if err != nil {
241+
return result.Error(currentState, fmt.Errorf("failed to patch cluster status: %w", err))
242+
}
243+
return result.NextState(finalState, "Updating Flex Cluster.")
244+
}
245+
246+
func (h *Handlerv20250312) getToPatchStatus(ctx context.Context, flexcluster *akov2generated.FlexCluster) (*v20250312sdk.FlexClusterDescription20241113, error) {
247+
groupID, err := h.getGroupId(ctx, flexcluster)
248+
if err != nil {
249+
return nil, err
250+
}
251+
atlasFlexCluster, _, err := h.atlasClient.FlexClustersApi.GetFlexCluster(
252+
ctx, groupID, flexcluster.Spec.V20250312.Entry.Name).Execute()
253+
if err != nil {
254+
return nil, fmt.Errorf("failed to get cluster: %w", err)
255+
}
256+
flexclusterCopy := flexcluster.DeepCopy()
257+
if _, err := h.translator.FromAPI(flexclusterCopy, atlasFlexCluster); err != nil {
258+
return nil, fmt.Errorf("failed to translate get cluster response: %w", err)
259+
}
260+
261+
err = h.kubeClient.Status().Patch(ctx, flexclusterCopy, client.MergeFrom(flexcluster))
262+
if err != nil {
263+
return nil, fmt.Errorf("failed to patch cluster status: %w", err)
264+
}
265+
return atlasFlexCluster, nil
122266
}

internal/generated/controller/group/handler_v20250312.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -150,15 +150,13 @@ func (h *Handlerv20250312) HandleDeletionRequested(ctx context.Context, group *a
150150
// HandleDeleting handles the deleting state for version v20250312
151151
func (h *Handlerv20250312) HandleDeleting(ctx context.Context, group *akov2generated.Group) (ctrlstate.Result, error) {
152152
_, _, err := h.atlasClient.ProjectsApi.GetProject(ctx, *group.Status.V20250312.Id).Execute()
153-
if err == nil {
154-
return result.Error(state.StateDeletionRequested, fmt.Errorf("failed to delete group: %w deletion", err))
155-
}
156-
157-
if !v20250312sdk.IsErrorCode(err, "GROUP_NOT_FOUND") {
153+
switch {
154+
case v20250312sdk.IsErrorCode(err, "GROUP_NOT_FOUND"):
155+
return result.NextState(state.StateDeleted, "Deleted")
156+
case err != nil:
158157
return result.Error(state.StateDeletionRequested, fmt.Errorf("failed to delete group: %w", err))
159158
}
160-
161-
return result.NextState(state.StateDeleted, "Group deleted.")
159+
return result.NextState(state.StateDeleting, "Deleting Group.")
162160
}
163161

164162
// For returns the resource and predicates for the controller
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Copyright 2025 MongoDB Inc
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package references
16+
17+
import (
18+
"context"
19+
"errors"
20+
"fmt"
21+
22+
"github.com/crd2go/crd2go/k8s"
23+
"sigs.k8s.io/controller-runtime/pkg/client"
24+
25+
akov2generated "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/nextapi/generated/v1"
26+
)
27+
28+
// GetGroupID returns the group IP (if present) from a given Kubernetes group reference
29+
// TODO: Autogenerate with Scaffolder
30+
func GetGroupID(ctx context.Context, c client.Client, groupRef *k8s.LocalReference, namespace string) (string, error) {
31+
if groupRef == nil {
32+
return "", errors.New("group reference is nil")
33+
}
34+
35+
group := &akov2generated.Group{}
36+
err := c.Get(ctx, client.ObjectKey{
37+
Namespace: groupRef.Name,
38+
Name: namespace,
39+
}, group)
40+
41+
if err != nil {
42+
return "", fmt.Errorf("failed to get object: %w", err)
43+
}
44+
45+
// for each suported Group version...
46+
if group.Status.V20250312 != nil && group.Status.V20250312.Id != nil {
47+
return *group.Status.V20250312.Id, nil
48+
}
49+
50+
return "", errors.New("group ID is not available")
51+
}

pkg/controller/state/reapply.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func ShouldReapply(obj metav1.Object) (bool, error) {
4848
return false, nil
4949
}
5050

51-
diff := timestamp.Add(period).Sub(time.Now())
51+
diff := time.Until(timestamp.Add(period))
5252

5353
return diff <= 0, nil
5454
}

0 commit comments

Comments
 (0)