@@ -16,6 +16,8 @@ package flexcluster
1616
1717import (
1818 "context"
19+ "errors"
20+ "fmt"
1921
2022 v20250312sdk "go.mongodb.org/atlas-sdk/v20250312006/admin"
2123 controllerruntime "sigs.k8s.io/controller-runtime"
@@ -24,13 +26,18 @@ import (
2426 controller "sigs.k8s.io/controller-runtime/pkg/controller"
2527 reconcile "sigs.k8s.io/controller-runtime/pkg/reconcile"
2628
29+ "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/customresource"
2730 crapi "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/crapi"
2831 akov2generated "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/nextapi/generated/v1"
2932 ctrlstate "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/controller/state"
3033 result "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/result"
3134 state "github.com/mongodb/mongodb-atlas-kubernetes/v2/pkg/state"
3235)
3336
37+ var (
38+ errMissingFlexGroupId = reconcile .TerminalError (errors .New ("missing Flex cluster group id" ))
39+ )
40+
3441type Handlerv20250312 struct {
3542 kubeClient client.Client
3643 atlasClient * v20250312sdk.APIClient
@@ -49,65 +56,118 @@ func NewHandlerv20250312(kubeClient client.Client, atlasClient *v20250312sdk.API
4956
5057// HandleInitial handles the initial state for version v20250312
5158func (h * Handlerv20250312 ) HandleInitial (ctx context.Context , flexcluster * akov2generated.FlexCluster ) (ctrlstate.Result , error ) {
52- // TODO: Implement initial state logic
53- // TODO: Use h.atlasProvider.SdkClientSet(ctx, h.globalSecretRef, h.log) to get Atlas SDK client
54- return result .NextState (state .StateUpdated , "Updated AtlasFlexCluster." )
59+ if flexcluster .Spec .V20250312 .GroupId == nil {
60+ return result .Error (state .StateInitial , errMissingFlexGroupId )
61+ }
62+ groupID := * flexcluster .Spec .V20250312 .GroupId
63+ flexClusterRequest := v20250312sdk.FlexClusterDescriptionCreate20241113 {}
64+ if err := h .translator .ToAPI (& flexClusterRequest , flexcluster ); err != nil {
65+ return result .Error (state .StateInitial , fmt .Errorf ("failed to translate flex create request: %w" , err ))
66+ }
67+ atlasFlexCluster , _ , err := h .atlasClient .FlexClustersApi .CreateFlexCluster (ctx , groupID , & flexClusterRequest ).Execute ()
68+ if err != nil {
69+ return result .Error (state .StateInitial , fmt .Errorf ("failed to create flex cluster: %w" , err ))
70+ }
71+ newFlexCluster := flexcluster .DeepCopy ()
72+ if _ , err := h .translator .FromAPI (newFlexCluster , atlasFlexCluster ); err != nil {
73+ return result .Error (state .StateInitial , fmt .Errorf ("failed to translate flex create response: %w" , err ))
74+ }
75+
76+ err = h .kubeClient .Status ().Patch (ctx , newFlexCluster , client .MergeFrom (flexcluster ))
77+ if err != nil {
78+ return result .Error (state .StateInitial , fmt .Errorf ("failed to patch flex cluster status: %w" , err ))
79+ }
80+ return result .NextState (state .StateCreating , "Creating Flex Cluster." )
5581}
5682
5783// HandleImportRequested handles the importrequested state for version v20250312
5884func (h * Handlerv20250312 ) HandleImportRequested (ctx context.Context , flexcluster * akov2generated.FlexCluster ) (ctrlstate.Result , error ) {
59- // TODO: Implement importrequested state logic
60- // TODO: Use h.atlasProvider.SdkClientSet(ctx, h.globalSecretRef, h.log) to get Atlas SDK client
61- return result .NextState (state .StateImported , "Import completed" )
85+ externalName , ok := flexcluster .GetAnnotations ()["mongodb.com/external-name" ]
86+ if ! ok {
87+ return result .Error (state .StateImportRequested , errors .New ("missing mongodb.com/external-name" ))
88+ }
89+
90+ externalGroupID , ok := flexcluster .GetAnnotations ()["mongodb.com/external-group-id" ]
91+ if ! ok {
92+ return result .Error (state .StateImportRequested , errors .New ("missing mongodb.com/external-group-id" ))
93+ }
94+ flexClusterCopy := flexcluster .DeepCopy ()
95+ flexClusterCopy .Spec .V20250312 .Entry .Name = externalName
96+ flexClusterCopy .Spec .V20250312 .GroupId = & externalGroupID
97+ _ , err := h .getAndPatchStatus (ctx , flexClusterCopy )
98+ if err != nil {
99+ return result .Error (state .StateImportRequested , err )
100+ }
101+ return result .NextState (state .StateImported , "Imported Flex Cluster." )
62102}
63103
64104// HandleImported handles the imported state for version v20250312
65105func (h * Handlerv20250312 ) HandleImported (ctx context.Context , flexcluster * akov2generated.FlexCluster ) (ctrlstate.Result , error ) {
66- // TODO: Implement imported state logic
67- // TODO: Use h.atlasProvider.SdkClientSet(ctx, h.globalSecretRef, h.log) to get Atlas SDK client
68- return result .NextState (state .StateUpdated , "Ready" )
106+ return h .handleIdle (ctx , flexcluster , state .StateCreated , state .StateUpdating )
69107}
70108
71109// HandleCreating handles the creating state for version v20250312
72110func (h * Handlerv20250312 ) HandleCreating (ctx context.Context , flexcluster * akov2generated.FlexCluster ) (ctrlstate.Result , error ) {
73- // TODO: Implement creating state logic
74- // TODO: Use h.atlasProvider.SdkClientSet(ctx, h.globalSecretRef, h.log) to get Atlas SDK client
75- return result .NextState (state .StateCreated , "Resource created" )
111+ return h .handleUpserting (ctx , flexcluster , state .StateCreating , state .StateCreated )
76112}
77113
78114// HandleCreated handles the created state for version v20250312
79115func (h * Handlerv20250312 ) HandleCreated (ctx context.Context , flexcluster * akov2generated.FlexCluster ) (ctrlstate.Result , error ) {
80- // TODO: Implement created state logic
81- // TODO: Use h.atlasProvider.SdkClientSet(ctx, h.globalSecretRef, h.log) to get Atlas SDK client
82- return result .NextState (state .StateUpdated , "Ready" )
116+ return h .handleIdle (ctx , flexcluster , state .StateCreated , state .StateUpdating )
83117}
84118
85119// HandleUpdating handles the updating state for version v20250312
86120func (h * Handlerv20250312 ) HandleUpdating (ctx context.Context , flexcluster * akov2generated.FlexCluster ) (ctrlstate.Result , error ) {
87- // TODO: Implement updating state logic
88- // TODO: Use h.atlasProvider.SdkClientSet(ctx, h.globalSecretRef, h.log) to get Atlas SDK client
89- return result .NextState (state .StateUpdated , "Update completed" )
121+ return h .handleUpserting (ctx , flexcluster , state .StateUpdating , state .StateUpdated )
90122}
91123
92124// HandleUpdated handles the updated state for version v20250312
93125func (h * Handlerv20250312 ) HandleUpdated (ctx context.Context , flexcluster * akov2generated.FlexCluster ) (ctrlstate.Result , error ) {
94- // TODO: Implement updated state logic
95- // TODO: Use h.atlasProvider.SdkClientSet(ctx, h.globalSecretRef, h.log) to get Atlas SDK client
96- return result .NextState (state .StateUpdated , "Ready" )
126+ return h .handleIdle (ctx , flexcluster , state .StateUpdated , state .StateUpdating )
97127}
98128
99129// HandleDeletionRequested handles the deletionrequested state for version v20250312
100130func (h * Handlerv20250312 ) HandleDeletionRequested (ctx context.Context , flexcluster * akov2generated.FlexCluster ) (ctrlstate.Result , error ) {
101- // TODO: Implement deletionrequested state logic
102- // TODO: Use h.atlasProvider.SdkClientSet(ctx, h.globalSecretRef, h.log) to get Atlas SDK client
103- return result .NextState (state .StateDeleting , "Deletion started" )
131+ if customresource .IsResourcePolicyKeepOrDefault (flexcluster , h .deletionProtection ) {
132+ return result .NextState (state .StateDeleted , "Flex Cluster deleted." )
133+ }
134+
135+ if flexcluster .Status .V20250312 == nil {
136+ return result .NextState (state .StateDeleted , "Flex Cluster is unamanged." )
137+ }
138+
139+ if flexcluster .Spec .V20250312 .GroupId == nil {
140+ return result .Error (state .StateInitial , errMissingFlexGroupId )
141+ }
142+ groupID := * flexcluster .Spec .V20250312 .GroupId
143+ _ , err := h .atlasClient .FlexClustersApi .DeleteFlexCluster (
144+ ctx , groupID , flexcluster .Spec .V20250312 .Entry .Name ).Execute ()
145+
146+ switch {
147+ case v20250312sdk .IsErrorCode (err , "CLUSTER_NOT_FOUND" ):
148+ return result .NextState (state .StateDeleted , "Flex Cluster was deleted in Atlas." )
149+ case err != nil :
150+ return result .Error (state .StateDeletionRequested , fmt .Errorf ("failed to delete flex cluster: %w" , err ))
151+ }
152+
153+ return result .NextState (state .StateDeleting , "Deleting Flex Cluster." )
104154}
105155
106156// HandleDeleting handles the deleting state for version v20250312
107157func (h * Handlerv20250312 ) HandleDeleting (ctx context.Context , flexcluster * akov2generated.FlexCluster ) (ctrlstate.Result , error ) {
108- // TODO: Implement deleting state logic
109- // TODO: Use h.atlasProvider.SdkClientSet(ctx, h.globalSecretRef, h.log) to get Atlas SDK client
110- return result .NextState (state .StateDeleted , "Deleted" )
158+ if flexcluster .Spec .V20250312 .GroupId == nil {
159+ return result .Error (state .StateDeleting , errMissingFlexGroupId )
160+ }
161+ groupID := * flexcluster .Spec .V20250312 .GroupId
162+ _ , _ , err := h .atlasClient .FlexClustersApi .GetFlexCluster (
163+ ctx , groupID , flexcluster .Spec .V20250312 .Entry .Name ).Execute ()
164+ switch {
165+ case v20250312sdk .IsErrorCode (err , "CLUSTER_NOT_FOUND" ):
166+ return result .NextState (state .StateDeleted , "Deleted" )
167+ case err != nil :
168+ return result .Error (state .StateDeletionRequested , fmt .Errorf ("failed to delete flexcluster: %w" , err ))
169+ }
170+ return result .NextState (state .StateDeleting , "Deleting Flex Cluster." )
111171}
112172
113173// For returns the resource and predicates for the controller
@@ -117,6 +177,75 @@ func (h *Handlerv20250312) For() (client.Object, builder.Predicates) {
117177
118178// SetupWithManager sets up the controller with the Manager
119179func (h * Handlerv20250312 ) SetupWithManager (mgr controllerruntime.Manager , rec reconcile.Reconciler , defaultOptions controller.Options ) error {
120- // This method is not used for version-specific handlers but required by StateHandler interface
121- return nil
180+ panic ("do not setup Handlerv20250312" )
181+ }
182+
183+ // HandleUpserting handles the creating and updating state for flex version v20250312
184+ func (h * Handlerv20250312 ) handleUpserting (ctx context.Context , flexcluster * akov2generated.FlexCluster , currentState , finalState state.ResourceState ) (ctrlstate.Result , error ) {
185+ atlasFlexCluster , err := h .getAndPatchStatus (ctx , flexcluster )
186+ if err != nil {
187+ return result .Error (currentState , err )
188+ }
189+ if atlasFlexCluster .GetStateName () == "CREATING" || atlasFlexCluster .GetStateName () == "UPDATING" {
190+ return result .NextState (currentState , "Upserting Flex Cluster." )
191+ }
192+ return result .NextState (finalState , "Upserted Flex Cluster." )
193+ }
194+
195+ // HandleIdle handles the creating and updating state for flex version v20250312
196+ func (h * Handlerv20250312 ) handleIdle (ctx context.Context , flexcluster * akov2generated.FlexCluster , currentState , finalState state.ResourceState ) (ctrlstate.Result , error ) {
197+ update , err := ctrlstate .ShouldUpdate (flexcluster )
198+ if err != nil {
199+ return result .Error (currentState , reconcile .TerminalError (err ))
200+ }
201+
202+ if ! update {
203+ return result .NextState (currentState , "Flex cluster up to date. No update required." )
204+ }
205+
206+ if flexcluster .Spec .V20250312 .GroupId == nil {
207+ return result .Error (state .StateInitial , errMissingFlexGroupId )
208+ }
209+ groupID := * flexcluster .Spec .V20250312 .GroupId
210+ flexClusterUpdate := v20250312sdk.FlexClusterDescriptionUpdate20241113 {}
211+ if err := h .translator .ToAPI (& flexClusterUpdate , flexcluster ); err != nil {
212+ return result .Error (state .StateInitial , fmt .Errorf ("failed to translate update flex cluster request: %w" , err ))
213+ }
214+ atlasFlexCluster , _ , err := h .atlasClient .FlexClustersApi .UpdateFlexCluster (
215+ ctx , groupID , flexcluster .Spec .V20250312 .Entry .Name , & flexClusterUpdate ).Execute ()
216+ if err != nil {
217+ return result .Error (currentState , fmt .Errorf ("failed to get update cluster: %w" , err ))
218+ }
219+ flexclusterCopy := flexcluster .DeepCopy ()
220+ if _ , err := h .translator .FromAPI (flexclusterCopy , atlasFlexCluster ); err != nil {
221+ return result .Error (currentState , fmt .Errorf ("failed to translate update cluster response: %w" , err ))
222+ }
223+
224+ err = h .kubeClient .Status ().Patch (ctx , flexclusterCopy , client .MergeFrom (flexcluster ))
225+ if err != nil {
226+ return result .Error (currentState , fmt .Errorf ("failed to patch cluster status: %w" , err ))
227+ }
228+ return result .NextState (finalState , "Updating Flex Cluster." )
229+ }
230+
231+ func (h * Handlerv20250312 ) getAndPatchStatus (ctx context.Context , flexcluster * akov2generated.FlexCluster ) (* v20250312sdk.FlexClusterDescription20241113 , error ) {
232+ if flexcluster .Spec .V20250312 .GroupId == nil {
233+ return nil , errMissingFlexGroupId
234+ }
235+ groupID := * flexcluster .Spec .V20250312 .GroupId
236+ atlasFlexCluster , _ , err := h .atlasClient .FlexClustersApi .GetFlexCluster (
237+ ctx , groupID , flexcluster .Spec .V20250312 .Entry .Name ).Execute ()
238+ if err != nil {
239+ return nil , fmt .Errorf ("failed to get cluster: %w" , err )
240+ }
241+ flexclusterCopy := flexcluster .DeepCopy ()
242+ if _ , err := h .translator .FromAPI (flexclusterCopy , atlasFlexCluster ); err != nil {
243+ return nil , fmt .Errorf ("failed to translate get cluster response: %w" , err )
244+ }
245+
246+ err = h .kubeClient .Status ().Patch (ctx , flexclusterCopy , client .MergeFrom (flexcluster ))
247+ if err != nil {
248+ return nil , fmt .Errorf ("failed to patch cluster status: %w" , err )
249+ }
250+ return atlasFlexCluster , nil
122251}
0 commit comments