Skip to content

Commit ec4c4d3

Browse files
authored
CLOUDP-124554: Improve the way PE conditions are set (#555)
* Make sure PE conditions are set properly * Simplify assert statement for one HELM test
1 parent f41ceb4 commit ec4c4d3

File tree

5 files changed

+80
-65
lines changed

5 files changed

+80
-65
lines changed

pkg/controller/atlasproject/atlasproject_controller.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,6 @@ func (r *AtlasProjectReconciler) Reconcile(context context.Context, req ctrl.Req
202202
}
203203

204204
if result = r.ensurePrivateEndpoint(ctx, projectID, project); !result.IsOk() {
205-
setCondition(ctx, status.PrivateEndpointReadyType, result)
206205
return result.ReconcileResult(), nil
207206
}
208207
r.EventRecorder.Event(project, "Normal", string(status.PrivateEndpointReadyType), "")

pkg/controller/atlasproject/private_endpoint.go

Lines changed: 71 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -37,67 +37,77 @@ func createOrDeletePEInAtlas(ctx *workflow.Context, projectID string, specPEs []
3737
log.Debugw("Updated PE Connections", "atlasPeConnections", atlasPeConnections, "statusPEs", statusPEs)
3838

3939
if result := clearOutNotLinkedPEs(ctx, projectID, atlasPeConnections, statusPEs); !result.IsOk() {
40+
ctx.SetConditionFromResult(status.PrivateEndpointServiceReadyType, result)
4041
return result
4142
}
4243

4344
endpointsToDelete := set.Difference(statusPEs, specPEs)
4445
log.Debugw("Private Endpoints to delete", "difference", endpointsToDelete)
4546
if result := deletePrivateEndpointsFromAtlas(ctx, projectID, endpointsToDelete); !result.IsOk() {
47+
ctx.SetConditionFromResult(status.PrivateEndpointServiceReadyType, result)
4648
return result
4749
}
4850

4951
endpointsToCreate := set.Difference(specPEs, statusPEs)
5052
log.Debugw("Private Endpoints to create", "difference", endpointsToCreate)
5153
newConnections, err := createPeServiceInAtlas(ctx.Client, projectID, endpointsToCreate)
5254
if err != nil {
53-
log.Debugw("Failed to create PE Service in Atlas", "error", err)
55+
return terminateWithError(ctx, status.PrivateEndpointServiceReadyType, "Failed to create PE Service in Atlas", err)
5456
}
5557
ctx.EnsureStatusOption(status.AtlasProjectAddPrivateEnpointsOption(convertAllToStatus(ctx, projectID, newConnections)))
5658

5759
endpointsToUpdate := set.Intersection(specPEs, statusPEs)
5860
log.Debugw("Private Endpoints to update", "difference", endpointsToUpdate)
59-
if err = createPrivateEndpointInAtlas(ctx.Client, projectID, endpointsToUpdate, log); err != nil {
60-
log.Debugw("Failed to create PE Interface in Atlas", "error", err)
61+
if err = createPeInterfaceInAtlas(ctx.Client, projectID, endpointsToUpdate, log); err != nil {
62+
return terminateWithError(ctx, status.PrivateEndpointReadyType, "Failed to update PE Interface in Atlas", err)
6163
}
6264

63-
return getStatusForInterfaceConnections(ctx, projectID)
65+
return getStatusForInterfaceConnections(ctx, projectID, specPEs)
6466
}
6567

66-
func getStatusForInterfaceConnections(ctx *workflow.Context, projectID string) workflow.Result {
67-
atlasPeConnections, err := getAllPrivateEndpoints(ctx.Client, projectID)
68+
func getStatusForInterfaceConnections(ctx *workflow.Context, projectID string, specPEs []mdbv1.PrivateEndpoint) workflow.Result {
69+
atlasPEs, err := getAllPrivateEndpoints(ctx.Client, projectID)
6870
if err != nil {
6971
return workflow.Terminate(workflow.Internal, err.Error())
7072
}
7173

72-
if len(atlasPeConnections) != 0 {
73-
allAvailable, anyFailed := areServicesAvailableOrFailed(atlasPeConnections)
74-
if anyFailed {
75-
result := workflow.Terminate(workflow.ProjectPEServiceIsNotReadyInAtlas, "Private Endpoint Service failed")
76-
ctx.SetConditionFromResult(status.PrivateEndpointServiceReadyType, result)
77-
return result
78-
}
79-
if !allAvailable {
80-
result := workflow.InProgress(workflow.ProjectPEServiceIsNotReadyInAtlas, "Private Endpoint Service is not ready")
81-
ctx.SetConditionFromResult(status.PrivateEndpointServiceReadyType, result)
82-
return result
83-
}
74+
if len(atlasPEs) != len(specPEs) {
75+
result := workflow.InProgress(workflow.ProjectPEServiceIsNotReadyInAtlas, "Not all Private Endpoints are created")
76+
ctx.SetConditionFromResult(status.PrivateEndpointServiceReadyType, result)
77+
return result
78+
}
8479

80+
allAvailable, failureMessage := areServicesAvailableOrFailed(atlasPEs)
81+
if failureMessage != "" {
82+
result := workflow.Terminate(workflow.ProjectPEServiceIsNotReadyInAtlas, failureMessage)
83+
ctx.SetConditionFromResult(status.PrivateEndpointServiceReadyType, result)
84+
return result
85+
}
86+
if !allAvailable {
87+
result := workflow.InProgress(workflow.ProjectPEServiceIsNotReadyInAtlas, "Private Endpoint Service is not ready")
88+
ctx.SetConditionFromResult(status.PrivateEndpointServiceReadyType, result)
89+
return result
90+
}
91+
92+
if len(atlasPEs) != 0 {
8593
ctx.SetConditionTrue(status.PrivateEndpointServiceReadyType)
8694
}
8795

88-
for _, statusPeService := range convertAllToStatus(ctx, projectID, atlasPeConnections) {
96+
for _, statusPeService := range convertAllToStatus(ctx, projectID, atlasPEs) {
8997
if statusPeService.InterfaceEndpointID == "" {
90-
continue
98+
result := workflow.OK().WithMessage("Interface Private Endpoint awaits configuration")
99+
ctx.SetConditionFromResult(status.PrivateEndpointReadyType, result)
100+
return result
91101
}
92102

93103
interfaceEndpoint, _, err := ctx.Client.PrivateEndpoints.GetOnePrivateEndpoint(context.Background(), projectID, string(statusPeService.Provider), statusPeService.ID, statusPeService.InterfaceEndpointID)
94104
if err != nil {
95105
return workflow.Terminate(workflow.Internal, err.Error())
96106
}
97107

98-
interfaceIsAvailable, interfaceHasFailed := isInterfaceAvailableOrFailed(ctx, statusPeService.Provider, interfaceEndpoint)
99-
if interfaceHasFailed {
100-
result := workflow.Terminate(workflow.ProjectPrivateEndpointIsNotReadyInAtlas, "Interface Private Endpoint has failed")
108+
interfaceIsAvailable, interfaceFailureMessage := checkIfInterfaceIsAvailable(interfaceEndpoint)
109+
if interfaceFailureMessage != "" {
110+
result := workflow.Terminate(workflow.ProjectPrivateEndpointIsNotReadyInAtlas, interfaceFailureMessage)
101111
ctx.SetConditionFromResult(status.PrivateEndpointReadyType, result)
102112
return result
103113
}
@@ -106,42 +116,28 @@ func getStatusForInterfaceConnections(ctx *workflow.Context, projectID string) w
106116
ctx.SetConditionFromResult(status.PrivateEndpointReadyType, result)
107117
return result
108118
}
119+
}
109120

121+
if len(atlasPEs) != 0 {
110122
ctx.SetConditionTrue(status.PrivateEndpointReadyType)
111123
}
112124

113125
return workflow.OK()
114126
}
115127

116-
func areServicesAvailableOrFailed(atlasPeConnections []mongodbatlas.PrivateEndpointConnection) (allAvailable, anyFailed bool) {
128+
func areServicesAvailableOrFailed(atlasPeConnections []mongodbatlas.PrivateEndpointConnection) (allAvailable bool, failureMessage string) {
117129
allAvailable = true
118-
anyFailed = false
119130

120131
for _, conn := range atlasPeConnections {
132+
if isFailed(conn.Status) {
133+
return false, conn.ErrorMessage
134+
}
121135
if !isAvailable(conn.Status) {
122136
allAvailable = false
123137
}
124-
if isFailed(conn.Status) {
125-
anyFailed = true
126-
return
127-
}
128138
}
129139

130-
return allAvailable, anyFailed
131-
}
132-
133-
func isInterfaceAvailableOrFailed(ctx *workflow.Context, peProvider provider.ProviderName, interfaceEndpoint *mongodbatlas.InterfaceEndpointConnection) (interfaceAvailable, hasFailed bool) {
134-
switch peProvider {
135-
case provider.ProviderAWS:
136-
return isAvailable(interfaceEndpoint.AWSConnectionStatus), isFailed(interfaceEndpoint.AWSConnectionStatus)
137-
case provider.ProviderAzure:
138-
return isAvailable(interfaceEndpoint.Status), isFailed(interfaceEndpoint.Status)
139-
case provider.ProviderGCP:
140-
return isGCPInterfaceAvailableOrFailed(interfaceEndpoint)
141-
default:
142-
ctx.Log.Warnf("unsupported provider value for Private Endpoints: %s", peProvider)
143-
return false, true
144-
}
140+
return allAvailable, ""
145141
}
146142

147143
func syncPEConnections(ctx *workflow.Context, projectID string) ([]mongodbatlas.PrivateEndpointConnection, error) {
@@ -194,7 +190,7 @@ func createPeServiceInAtlas(client mongodbatlas.Client, projectID string, endpoi
194190
return newConnections, nil
195191
}
196192

197-
func createPrivateEndpointInAtlas(client mongodbatlas.Client, projectID string, endpointsToUpdate [][]set.Identifiable, log *zap.SugaredLogger) error {
193+
func createPeInterfaceInAtlas(client mongodbatlas.Client, projectID string, endpointsToUpdate [][]set.Identifiable, log *zap.SugaredLogger) error {
198194
for _, pair := range endpointsToUpdate {
199195
specPeService := pair[0].(mdbv1.PrivateEndpoint)
200196
statusPeService := pair[1].(status.ProjectPrivateEndpoint)
@@ -210,9 +206,14 @@ func createPrivateEndpointInAtlas(client mongodbatlas.Client, projectID string,
210206
if gcpEndpoints, err := specPeService.Endpoints.ConvertToAtlas(); err == nil {
211207
interfaceConn.Endpoints = gcpEndpoints
212208
}
213-
interfaceConn, _, err := client.PrivateEndpoints.AddOnePrivateEndpoint(context.Background(), projectID, string(specPeService.Provider), statusPeService.ID, interfaceConn)
209+
interfaceConn, response, err := client.PrivateEndpoints.AddOnePrivateEndpoint(context.Background(), projectID, string(specPeService.Provider), statusPeService.ID, interfaceConn)
214210
log.Debugw("AddOnePrivateEndpoint Reply", "interfaceConn", interfaceConn, "err", err)
215211
if err != nil {
212+
if response.StatusCode == 400 || response.StatusCode == 409 {
213+
log.Debugw("failed to create PE Interface", "error", err)
214+
continue
215+
}
216+
216217
return err
217218
}
218219
}
@@ -222,9 +223,14 @@ func createPrivateEndpointInAtlas(client mongodbatlas.Client, projectID string,
222223
}
223224

224225
func endpointsAreNotFullyConfigured(specPeService mdbv1.PrivateEndpoint, statusPeService status.ProjectPrivateEndpoint) bool {
225-
awsOrAzureCondition := specPeService.ID != "" && statusPeService.InterfaceEndpointID == ""
226-
gcpCondition := specPeService.GCPProjectID != "" && specPeService.EndpointGroupName != "" && len(specPeService.Endpoints) != 0 && len(statusPeService.Endpoints) != len(specPeService.Endpoints)
227-
return awsOrAzureCondition || gcpCondition
226+
switch specPeService.Provider {
227+
case provider.ProviderAWS, provider.ProviderAzure:
228+
return specPeService.ID != "" && statusPeService.InterfaceEndpointID == ""
229+
case provider.ProviderGCP:
230+
return specPeService.GCPProjectID != "" && specPeService.EndpointGroupName != "" && len(specPeService.Endpoints) != 0 && len(statusPeService.Endpoints) != len(specPeService.Endpoints)
231+
}
232+
233+
return false
228234
}
229235

230236
func DeleteAllPrivateEndpoints(ctx *workflow.Context, client mongodbatlas.Client, projectID string, statusPE []status.ProjectPrivateEndpoint, log *zap.SugaredLogger) workflow.Result {
@@ -365,26 +371,24 @@ func getGCPInterfaceEndpoint(ctx *workflow.Context, projectID string, endpoint s
365371
return listOfInterfaces, nil
366372
}
367373

368-
func isGCPInterfaceAvailableOrFailed(interfaceEndpointConn *mongodbatlas.InterfaceEndpointConnection) (allAvailable, anyFailed bool) {
374+
// checkIfInterfaceIsAvailable checks if an interface and all of its nested endpoints are available and also returns an error message
375+
func checkIfInterfaceIsAvailable(interfaceEndpointConn *mongodbatlas.InterfaceEndpointConnection) (allAvailable bool, failureMessage string) {
369376
allAvailable = true
370-
anyFailed = false
371377

372-
if !isAvailable(interfaceEndpointConn.Status) {
373-
allAvailable = false
374-
}
375378
if isFailed(interfaceEndpointConn.Status) {
376-
anyFailed = true
377-
return
379+
return false, interfaceEndpointConn.ErrorMessage
380+
}
381+
if !isAvailable(interfaceEndpointConn.Status) && !isAvailable(interfaceEndpointConn.AWSConnectionStatus) {
382+
allAvailable = false
378383
}
379384

380385
for _, endpoint := range interfaceEndpointConn.Endpoints {
386+
if isFailed(endpoint.Status) {
387+
return false, interfaceEndpointConn.ErrorMessage
388+
}
381389
if !isAvailable(endpoint.Status) {
382390
allAvailable = false
383391
}
384-
if isFailed(endpoint.Status) {
385-
anyFailed = true
386-
return
387-
}
388392
}
389393

390394
return
@@ -397,3 +401,10 @@ func isAvailable(status string) bool {
397401
func isFailed(status string) bool {
398402
return status == "FAILED"
399403
}
404+
405+
func terminateWithError(ctx *workflow.Context, conditionType status.ConditionType, message string, err error) workflow.Result {
406+
ctx.Log.Debugw(message, "error", err)
407+
result := workflow.Terminate(workflow.ProjectPEServiceIsNotReadyInAtlas, err.Error()).WithoutRetry()
408+
ctx.SetConditionFromResult(conditionType, result)
409+
return result
410+
}

pkg/controller/workflow/result.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ func (r Result) WithoutRetry() Result {
7474
return r
7575
}
7676

77+
func (r Result) WithMessage(message string) Result {
78+
r.message = message
79+
return r
80+
}
81+
7782
func (r Result) IsOk() bool {
7883
return !r.terminated
7984
}

test/e2e/cli/helm/helm.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func Install(args ...string) {
5656
args = append([]string{"install"}, args...)
5757
session := cli.Execute("helm", args...)
5858
msg := cli.GetSessionExitMsg(session)
59-
ExpectWithOffset(1, msg).Should(SatisfyAny(Say("STATUS: deployed"), Say("resource that already exists"), BeEmpty()),
59+
ExpectWithOffset(1, msg).Should(SatisfyAny(Say("STATUS: deployed"), Say("already exists"), BeEmpty()),
6060
"HELM. Can't install release",
6161
)
6262
}

test/int/deployment_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -435,8 +435,8 @@ var _ = Describe("AtlasDeployment", Label("int", "AtlasDeployment"), func() {
435435
})
436436
})
437437

438-
Describe("Create/Update the cluster (GCP)", func() {
439-
It("Should fail, then be fixed", func() {
438+
Describe("Create/Update the cluster", func() {
439+
It("Should fail, then be fixed (GCP)", func() {
440440
createdCluster = mdbv1.DefaultGCPCluster(namespace.Name, createdProject.Name).WithAtlasName("")
441441

442442
By(fmt.Sprintf("Creating the Cluster %s with invalid parameters", kube.ObjectKeyFromObject(createdCluster)), func() {
@@ -472,7 +472,7 @@ var _ = Describe("AtlasDeployment", Label("int", "AtlasDeployment"), func() {
472472
})
473473
})
474474

475-
It("Should Succeed", func() {
475+
It("Should Succeed (AWS)", func() {
476476
createdCluster = mdbv1.DefaultAWSCluster(namespace.Name, createdProject.Name)
477477

478478
By(fmt.Sprintf("Creating the Cluster %s", kube.ObjectKeyFromObject(createdCluster)), func() {

0 commit comments

Comments
 (0)