-
Notifications
You must be signed in to change notification settings - Fork 37
Expand file tree
/
Copy pathendpoints.go
More file actions
198 lines (166 loc) · 6 KB
/
endpoints.go
File metadata and controls
198 lines (166 loc) · 6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
package managerdriver
import (
"context"
"reflect"
ngrokv1alpha1 "github.com/ngrok/ngrok-operator/api/ngrok/v1alpha1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
)
func (d *Driver) SyncEndpoints(ctx context.Context, c client.Client) error {
if !d.syncAllowConcurrent {
proceed, wait := d.syncStart(true)
if !proceed {
return wait(ctx)
}
defer d.syncDone()
}
d.log.Info("syncing cloud and agent endpoints state!!")
translator := NewTranslator(
d.log,
d.store,
d.controllerLabels.Labels(),
d.ingressNgrokMetadata,
d.gatewayNgrokMetadata,
d.clusterDomain,
d.disableGatewayReferenceGrants,
)
translationResult := translator.Translate()
currentAgentEndpoints := &ngrokv1alpha1.AgentEndpointList{}
currentCloudEndpoints := &ngrokv1alpha1.CloudEndpointList{}
matchLabels := d.controllerLabels.Selector()
if err := c.List(ctx, currentAgentEndpoints, matchLabels); err != nil {
d.log.Error(err, "error listing agent endpoints")
return err
}
if err := c.List(ctx, currentCloudEndpoints, matchLabels); err != nil {
d.log.Error(err, "error listing cloud endpoints")
return err
}
if err := d.applyAgentEndpoints(ctx, c, translationResult.AgentEndpoints, currentAgentEndpoints.Items); err != nil {
d.log.Error(err, "applying agent endpoints")
return err
}
if err := d.applyCloudEndpoints(ctx, c, translationResult.CloudEndpoints, currentCloudEndpoints.Items); err != nil {
d.log.Error(err, "applying cloud endpoints")
return err
}
return nil
}
func (d *Driver) applyAgentEndpoints(ctx context.Context, c client.Client, desired map[types.NamespacedName]*ngrokv1alpha1.AgentEndpoint, current []ngrokv1alpha1.AgentEndpoint) error {
// update or delete agent endpoints we don't need anymore
for _, currAEP := range current {
// If this AgentEndpoint is created by the user and not owned/managed by the operator then ignore it
if d.shouldBeSkippedInApply(&currAEP) {
continue
}
objectKey := types.NamespacedName{
Name: currAEP.Name,
Namespace: currAEP.Namespace,
}
if desiredAEP, exists := desired[objectKey]; exists {
needsUpdate := false
if !reflect.DeepEqual(desiredAEP.Spec, currAEP.Spec) {
currAEP.Spec = desiredAEP.Spec
needsUpdate = true
}
if !reflect.DeepEqual(desiredAEP.Labels, currAEP.Labels) {
currAEP.Labels = desiredAEP.Labels
needsUpdate = true
}
if !reflect.DeepEqual(desiredAEP.Annotations, currAEP.Annotations) {
currAEP.Annotations = desiredAEP.Annotations
needsUpdate = true
}
if needsUpdate {
if err := c.Update(ctx, &currAEP); err != nil {
d.log.Error(err, "error updating agent endpoint", "desired", desiredAEP, "current", currAEP)
return err
}
}
// matched and updated the agent endpoint, no longer desired
delete(desired, objectKey)
} else {
if err := c.Delete(ctx, &currAEP); client.IgnoreNotFound(err) != nil {
d.log.Error(err, "error deleting agent endpoint", "current agent endpoints", currAEP)
return err
}
}
}
// the set of desired agent endpoints now only contains new agent endpoints, create them
for _, agentEndpoint := range desired {
if err := c.Create(ctx, agentEndpoint); err != nil {
d.log.Error(err, "error creating agent endpoint", "agent endpoint", agentEndpoint)
return err
}
}
return nil
}
// Returns true if the object should be skipped during apply. This is for one of the following reasons:
// 1. The object was manually created by the user and is not managed by the operator
// 2. The object is owned by the LoadBalancer controller and modifications by the managerdriver would cause it
// to play battle bots.
func (d *Driver) shouldBeSkippedInApply(obj client.Object) bool {
if !d.controllerLabels.HasLabels(obj) {
return true
}
for _, ownerRef := range obj.GetOwnerReferences() {
if ownerRef.Kind == "Service" {
// Owned by LoadBalancer controller, skip it
return true
}
}
return false
}
func (d *Driver) applyCloudEndpoints(ctx context.Context, c client.Client, desired map[types.NamespacedName]*ngrokv1alpha1.CloudEndpoint, current []ngrokv1alpha1.CloudEndpoint) error {
// update or delete cloud endpoints we don't need anymore
for _, currCLEP := range current {
// If this CloudEndpoint is created by the user and not owned/managed by the operator then ignore it
if d.shouldBeSkippedInApply(&currCLEP) {
continue
}
objectKey := types.NamespacedName{
Name: currCLEP.Name,
Namespace: currCLEP.Namespace,
}
if desiredCLEP, exists := desired[objectKey]; exists {
needsUpdate := false
// Copy the ID in the status field from the existing cloud endpoint to the desired one.
// The ID is set by controller in the operator-agent pod and so we don't want the controller from the
// operator-manager pod (which this code runs in) to erase it
desiredCLEP.Status.ID = currCLEP.Status.ID
if !reflect.DeepEqual(desiredCLEP.Spec, currCLEP.Spec) {
currCLEP.Spec = desiredCLEP.Spec
needsUpdate = true
}
if !reflect.DeepEqual(desiredCLEP.Labels, currCLEP.Labels) {
currCLEP.Labels = desiredCLEP.Labels
needsUpdate = true
}
if !reflect.DeepEqual(desiredCLEP.Annotations, currCLEP.Annotations) {
currCLEP.Annotations = desiredCLEP.Annotations
needsUpdate = true
}
if needsUpdate {
if err := c.Update(ctx, &currCLEP); err != nil {
d.log.Error(err, "error updating cloud endpoint", "desired", desiredCLEP, "current", currCLEP)
return err
}
}
// matched and updated the cloud endpoint, no longer desired
delete(desired, objectKey)
} else {
if err := c.Delete(ctx, &currCLEP); client.IgnoreNotFound(err) != nil {
d.log.Error(err, "error deleting cloud endpoint", "cloud endpoint", currCLEP)
return err
}
}
}
// the set of desired cloud endpoints now only contains new cloud endpoints, create them
for _, cloudEndpoint := range desired {
if err := c.Create(ctx, cloudEndpoint); err != nil {
d.log.Error(err, "error creating cloud endpoint", "cloud endpoint", cloudEndpoint)
return err
}
}
return nil
}