@@ -2,6 +2,7 @@ package atlasproject
22
33import (
44 "context"
5+ "encoding/json"
56 "errors"
67 "fmt"
78 "net/http"
@@ -15,6 +16,7 @@ import (
1516 "github.com/mongodb/mongodb-atlas-kubernetes/v2/api/v1/provider"
1617 "github.com/mongodb/mongodb-atlas-kubernetes/v2/api/v1/status"
1718 "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/compare"
19+ "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/customresource"
1820 "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/controller/workflow"
1921 "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/pointer"
2022 "github.com/mongodb/mongodb-atlas-kubernetes/v2/internal/translation/paging"
@@ -27,17 +29,61 @@ const (
2729 StatusTerminating = "TERMINATING"
2830)
2931
32+ var errNortFound = errors .New ("not found" )
33+
3034type networkPeerDiff struct {
3135 PeersToDelete []string
3236 PeersToCreate []akov2.NetworkPeer
3337 PeersToUpdate []admin.BaseNetworkPeeringConnectionSettings
3438}
3539
40+ func isSkippedNetworkPeersEmpty (atlasProject * akov2.AtlasProject ) (bool , error ) {
41+ lastSkippedSpec := akov2.AtlasProjectSpec {}
42+ lastSkippedSpecString , ok := atlasProject .Annotations [customresource .AnnotationLastSkippedConfiguration ]
43+ if ! ok {
44+ return false , nil
45+ }
46+
47+ if err := json .Unmarshal ([]byte (lastSkippedSpecString ), & lastSkippedSpec ); err != nil {
48+ return false , fmt .Errorf ("failed to parse last skipped configuration: %w" , err )
49+ }
50+
51+ return len (lastSkippedSpec .NetworkPeers ) == 0 , nil
52+ }
53+
54+ func hasLastAppliedNetworkPeers (atlasProject * akov2.AtlasProject ) (bool , error ) {
55+ lastAppliedSpec := akov2.AtlasProjectSpec {}
56+ lastAppliedSpecStr , ok := atlasProject .Annotations [customresource .AnnotationLastAppliedConfiguration ]
57+ if ! ok {
58+ return false , nil
59+ }
60+
61+ if err := json .Unmarshal ([]byte (lastAppliedSpecStr ), & lastAppliedSpec ); err != nil {
62+ return false , fmt .Errorf ("failed to parse last applied configuration: %w" , err )
63+ }
64+
65+ return len (lastAppliedSpec .NetworkPeers ) > 0 , nil
66+ }
67+
3668func ensureNetworkPeers (workflowCtx * workflow.Context , akoProject * akov2.AtlasProject ) workflow.Result {
69+ shouldSkip , err := isSkippedNetworkPeersEmpty (akoProject )
70+ if err != nil {
71+ return workflow .Terminate (workflow .Internal , err )
72+ }
73+ if shouldSkip {
74+ workflowCtx .UnsetCondition (api .NetworkPeerReadyType )
75+ return workflow .OK ()
76+ }
77+
78+ configuredBefore , err := hasLastAppliedNetworkPeers (akoProject )
79+ if err != nil {
80+ return workflow .Terminate (workflow .Internal , err )
81+ }
82+
3783 networkPeerStatus := akoProject .Status .DeepCopy ().NetworkPeers
3884 networkPeerSpec := akoProject .Spec .DeepCopy ().NetworkPeers
3985
40- result , condition := SyncNetworkPeer (workflowCtx , akoProject .ID (), networkPeerStatus , networkPeerSpec )
86+ result , condition := SyncNetworkPeer (workflowCtx , akoProject .ID (), networkPeerStatus , networkPeerSpec , configuredBefore )
4187 if ! result .IsOk () {
4288 workflowCtx .SetConditionFromResult (condition , result )
4389 return result
@@ -68,7 +114,7 @@ func failedPeerStatus(errMessage string, peer akov2.NetworkPeer) status.AtlasNet
68114 }
69115}
70116
71- func SyncNetworkPeer (workflowCtx * workflow.Context , groupID string , peerStatuses []status.AtlasNetworkPeer , peerSpecs []akov2.NetworkPeer ) (workflow.Result , api.ConditionType ) {
117+ func SyncNetworkPeer (workflowCtx * workflow.Context , groupID string , peerStatuses []status.AtlasNetworkPeer , peerSpecs []akov2.NetworkPeer , configuredBefore bool ) (workflow.Result , api.ConditionType ) {
72118 defer workflowCtx .EnsureStatusOption (status .AtlasProjectSetNetworkPeerOption (& peerStatuses ))
73119 logger := workflowCtx .Log
74120 mongoClient := workflowCtx .SdkClient
@@ -100,11 +146,13 @@ func SyncNetworkPeer(workflowCtx *workflow.Context, groupID string, peerStatuses
100146 return workflow .Terminate (workflow .ProjectNetworkPeerIsNotReadyInAtlas ,
101147 errors .New ("failed to update network peer statuses" )), api .NetworkPeerReadyType
102148 }
103- err = deleteUnusedContainers (workflowCtx .Context , mongoClient .NetworkPeeringApi , groupID , getPeerIDs (peerStatuses ))
104- if err != nil {
105- logger .Errorf ("failed to delete unused containers: %v" , err )
106- return workflow .Terminate (workflow .ProjectNetworkPeerIsNotReadyInAtlas ,
107- fmt .Errorf ("failed to delete unused containers: %w" , err )), api .NetworkPeerReadyType
149+ if configuredBefore {
150+ err = deleteUnusedContainers (workflowCtx .Context , mongoClient .NetworkPeeringApi , groupID , getPeerIDs (peerStatuses ))
151+ if err != nil {
152+ logger .Errorf ("failed to delete unused containers: %v" , err )
153+ return workflow .Terminate (workflow .ProjectNetworkPeerIsNotReadyInAtlas ,
154+ fmt .Errorf ("failed to delete unused containers: %w" , err )), api .NetworkPeerReadyType
155+ }
108156 }
109157 return ensurePeerStatus (peerStatuses , len (peerSpecs ), logger ), api .NetworkPeerReadyType
110158}
@@ -410,8 +458,11 @@ func comparePeersPair(ctx context.Context, existedPeer, expectedPeer akov2.Netwo
410458}
411459
412460func deletePeerByID (ctx context.Context , peerService admin.NetworkPeeringApi , groupID string , containerID string , logger * zap.SugaredLogger ) error {
413- _ , _ , err := peerService .DeletePeeringConnection (ctx , groupID , containerID ).Execute ()
461+ _ , response , err := peerService .DeletePeeringConnection (ctx , groupID , containerID ).Execute ()
414462 if err != nil {
463+ if response .StatusCode == http .StatusNotFound {
464+ return errors .Join (err , errNortFound )
465+ }
415466 logger .Errorf ("failed to delete peering container %s: %v" , containerID , err )
416467 return err
417468 }
@@ -548,27 +599,23 @@ func validateInitNetworkPeer(peer akov2.NetworkPeer) error {
548599 return fmt .Errorf ("unsupported provider: %s" , peer .ProviderName )
549600}
550601
551- func DeleteAllNetworkPeers (ctx context.Context , groupID string , service admin.NetworkPeeringApi , logger * zap.SugaredLogger ) workflow.Result {
552- result := workflow .OK ()
553- err := deleteAllNetworkPeers (ctx , groupID , service , logger )
602+ func DeleteOwnedNetworkPeers (ctx context.Context , project * akov2.AtlasProject , service admin.NetworkPeeringApi , logger * zap.SugaredLogger ) workflow.Result {
603+ shouldSkip , err := isSkippedNetworkPeersEmpty (project )
554604 if err != nil {
555- result = workflow .Terminate (workflow .ProjectNetworkPeerIsNotReadyInAtlas , errors .New ("failed to delete NetworkPeers" ))
605+ workflow .Terminate (workflow .ProjectNetworkPeerIsNotReadyInAtlas ,
606+ fmt .Errorf ("failed to delete NetworkPeers: %w" , err ))
556607 }
557- return result
558- }
559-
560- func deleteAllNetworkPeers (ctx context.Context , groupID string , service admin.NetworkPeeringApi , logger * zap.SugaredLogger ) error {
561- peers , err := GetAllExistedNetworkPeer (ctx , service , groupID )
562- if err != nil {
563- logger .Errorf ("failed to list network peers for project %s: %v" , groupID , err )
564- return err
608+ if shouldSkip {
609+ logger .Debug ("Nothing to do, Network Peers projects subresouedes are disabled" )
610+ return workflow .OK ()
565611 }
566- for _ , peer := range peers {
567- errDelete := deletePeerByID (ctx , service , groupID , peer .GetId (), logger )
568- if errDelete != nil {
569- logger .Errorf ("failed to delete network peer %s: %v" , peer .GetId (), errDelete )
570- return errDelete
612+ for _ , peerStatus := range project .Status .NetworkPeers {
613+ errDelete := deletePeerByID (ctx , service , project .ID (), peerStatus .ID , logger )
614+ if errDelete != nil && ! errors .Is (errDelete , errNortFound ) {
615+ logger .Errorf ("failed to delete network peer %s: %v" , peerStatus .ID , errDelete )
616+ return workflow .Terminate (workflow .ProjectNetworkPeerIsNotReadyInAtlas ,
617+ fmt .Errorf ("failed to delete NetworkPeers: %w" , errDelete ))
571618 }
572619 }
573- return nil
620+ return workflow . OK ()
574621}
0 commit comments