Skip to content

Commit 3224275

Browse files
committed
fixup! refactor into designed declarative approach
1 parent 9327597 commit 3224275

File tree

1 file changed

+124
-99
lines changed

1 file changed

+124
-99
lines changed

providers/gce/gce_loadbalancer_external.go

Lines changed: 124 additions & 99 deletions
Original file line numberDiff line numberDiff line change
@@ -112,46 +112,49 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string,
112112
g.deleteWrongNetworkTieredResources(loadBalancerName, lbRefStr, netTier)
113113
}
114114

115-
groupedPorts := groupPortsByProtocol(ports)
116-
if !g.AlphaFeatureGate.Enabled(AlphaFeatureMultiProtocolLB) && len(groupedPorts) > 1 {
117-
var protocols []string
118-
for p := range groupedPorts {
119-
protocols = append(protocols, string(p))
120-
}
121-
return nil, fmt.Errorf("load balancer with multiple protocols (%s) is not supported when AlphaFeatureMultiProtocolLB is disabled", strings.Join(protocols, ","))
115+
existingFRs, err := g.getExistingForwardingRules(loadBalancerName)
116+
if err != nil {
117+
return nil, fmt.Errorf("error getting existing forwarding rules for %s: %v", loadBalancerName, err)
122118
}
123119

120+
// Check if a forwarding rule exists, and if so, what the IP is.
124121
var fwdRuleIP string
125-
// We need to determine the IP address for all forwarding rules.
126-
// We check if any of the forwarding rules already exist and use their IP.
127-
for protocol := range groupedPorts {
128-
frName := g.getProtocolForwardingRuleName(loadBalancerName, protocol)
129-
fwd, err := g.GetRegionForwardingRule(frName, g.region)
130-
if err != nil && !isNotFound(err) {
131-
return nil, err
132-
}
133-
if fwd != nil {
134-
fwdRuleIP = fwd.IPAddress
135-
break
136-
}
137-
}
138-
// If no forwarding rule exists, check for the old one.
139-
if fwdRuleIP == "" {
140-
fwd, err := g.GetRegionForwardingRule(loadBalancerName, g.region)
141-
if err != nil && !isNotFound(err) {
142-
return nil, err
143-
}
144-
if fwd != nil {
145-
fwdRuleIP = fwd.IPAddress
122+
for _, fr := range existingFRs {
123+
if fr.IPAddress != "" {
124+
if fwdRuleIP == "" {
125+
fwdRuleIP = fr.IPAddress
126+
} else if fwdRuleIP != fr.IPAddress {
127+
return nil, fmt.Errorf("found multiple forwarding rules with different IP addresses (%s and %s) for load balancer %s", fwdRuleIP, fr.IPAddress, loadBalancerName)
128+
}
146129
}
147130
}
148131

149132
// Make sure we know which IP address will be used and have properly reserved
150133
// it as static before moving forward with the rest of our operations.
134+
//
135+
// We use static IP addresses when updating a load balancer to ensure that we
136+
// can replace the load balancer's other components without changing the
137+
// address its service is reachable on. We do it this way rather than always
138+
// keeping the static IP around even though this is more complicated because
139+
// it makes it less likely that we'll run into quota issues. Only 7 static
140+
// IP addresses are allowed per region by default.
141+
//
142+
// We could let an IP be allocated for us when the forwarding rule is created,
143+
// but we need the IP to set up the firewall rule, and we want to keep the
144+
// forwarding rule creation as the last thing that needs to be done in this
145+
// function in order to maintain the invariant that "if the forwarding rule
146+
// exists, the LB has been fully created".
151147
ipAddressToUse := ""
152-
isUserOwnedIP := false
153-
isSafeToReleaseIP := false
154148

149+
// Through this process we try to keep track of whether it is safe to
150+
// release the IP that was allocated. If the user specifically asked for
151+
// an IP, we assume they are managing it themselves. Otherwise, we will
152+
// release the IP in case of early-terminating failure or upon successful
153+
// creating of the LB.
154+
// TODO(#36535): boil this logic down into a set of component functions
155+
// and key the flag values off of errors returned.
156+
isUserOwnedIP := false // if this is set, we never release the IP
157+
isSafeToReleaseIP := false
155158
defer func() {
156159
if isUserOwnedIP {
157160
return
@@ -195,6 +198,12 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string,
195198
ipAddressToUse = ipAddr
196199
}
197200

201+
// Now we have the IP address we can build the desired forwarding rules.
202+
desiredFRs, err := g.buildDesiredForwardingRules(loadBalancerName, serviceName.String(), ipAddressToUse, g.targetPoolURL(loadBalancerName), apiService, netTier, existingFRs)
203+
if err != nil {
204+
return nil, fmt.Errorf("error building desired forwarding rules for %s: %v", loadBalancerName, err)
205+
}
206+
198207
// Deal with the firewall next. The reason we do this here rather than last
199208
// is because the forwarding rule is used as the indicator that the load
200209
// balancer is fully created - it's what getLoadBalancer checks for.
@@ -257,33 +266,42 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string,
257266
} else {
258267
klog.V(4).Infof("ensureExternalLoadBalancer(%s): Service needs nodes health checks.", lbRefStr)
259268
if hcLocalTrafficExisting != nil {
269+
// This logic exists to detect a transition from OnlyLocal to non-OnlyLocal service
270+
// and turn on the tpNeedsRecreation flag to delete/recreate fwdrule/tpool updating the
271+
// target pool to use nodes health check.
260272
klog.V(2).Infof("ensureExternalLoadBalancer(%s): Updating from local traffic health checks to nodes health checks.", lbRefStr)
261273
hcToDelete = hcLocalTrafficExisting
262274
tpNeedsRecreation = true
263275
}
264276
hcToCreate = makeHTTPHealthCheck(MakeNodesHealthCheckName(clusterID), GetNodesHealthCheckPath(), GetNodesHealthCheckPort())
265277
}
266278

267-
if err := g.ensureTargetPoolAndHealthCheck(tpExists, tpNeedsRecreation, apiService, loadBalancerName, clusterID, ipAddressToUse, hosts, hcToCreate, hcToDelete); err != nil {
268-
return nil, err
269-
}
270-
271-
// Forwarding rule logic
272-
// We build the desired forwarding rules and then sync them with what exists.
273-
existingFRs, err := g.getExistingForwardingRules(loadBalancerName)
274-
if err != nil {
275-
return nil, fmt.Errorf("error getting existing forwarding rules for %s: %v", loadBalancerName, err)
276-
}
279+
// Now we get to some slightly more interesting logic.
280+
// First, neither target pools nor forwarding rules can be updated in place -
281+
// they have to be deleted and recreated.
282+
// Second, forwarding rules are layered on top of target pools in that you
283+
// can't delete a target pool that's currently in use by a forwarding rule.
284+
// Thus, we have to tear down a forwarding rule if either it or the target
285+
// pool needs to be updated.
286+
if len(existingFRs) > 0 && tpNeedsRecreation {
287+
// Begin critical section. If we have to delete the forwarding rule,
288+
// and something should fail before we recreate it, don't release the
289+
// IP. That way we can come back to it later.
290+
isSafeToReleaseIP = false
291+
292+
for _, fr := range existingFRs {
293+
if err := g.DeleteRegionForwardingRule(fr.Name, g.region); err != nil && !isNotFound(err) {
294+
return nil, fmt.Errorf("failed to delete existing forwarding rule for load balancer (%s) update: %v", lbRefStr, err)
295+
}
296+
}
297+
klog.Infof("ensureExternalLoadBalancer(%s): Deleted forwarding rule(s).", lbRefStr)
277298

278-
// Get the old forwarding rule for backward compatibility.
279-
var oldFwdRule *compute.ForwardingRule
280-
if fwd, ok := existingFRs[loadBalancerName]; ok {
281-
oldFwdRule = fwd
299+
// Clear the existing forwarding rules so we don't try to delete them again.
300+
existingFRs = nil
282301
}
283302

284-
desiredFRs, err := g.buildDesiredForwardingRules(loadBalancerName, serviceName.String(), ipAddressToUse, g.targetPoolURL(loadBalancerName), apiService, netTier, oldFwdRule)
285-
if err != nil {
286-
return nil, fmt.Errorf("error building desired forwarding rules for %s: %v", loadBalancerName, err)
303+
if err := g.ensureTargetPoolAndHealthCheck(tpExists, tpNeedsRecreation, apiService, loadBalancerName, clusterID, ipAddressToUse, hosts, hcToCreate, hcToDelete); err != nil {
304+
return nil, err
287305
}
288306

289307
// Delete unwanted forwarding rules.
@@ -299,31 +317,34 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string,
299317
// Create or update forwarding rules.
300318
for _, desiredFR := range desiredFRs {
301319
existingFR, exists := existingFRs[desiredFR.Name]
302-
303-
needsUpdate := false
304320
if exists {
305-
if !g.forwardingRulesEqual(existingFR, desiredFR) {
306-
needsUpdate = true
321+
if g.forwardingRulesEqual(existingFR, desiredFR) {
322+
continue
307323
}
308324
}
309325

310-
if !exists || needsUpdate || tpNeedsRecreation {
311-
if exists {
312-
klog.Infof("ensureExternalLoadBalancer(%s): Deleting forwarding rule %s to update.", lbRefStr, desiredFR.Name)
313-
if err := g.DeleteRegionForwardingRule(desiredFR.Name, g.region); err != nil && !isNotFound(err) {
314-
return nil, err
315-
}
316-
}
317-
klog.Infof("ensureExternalLoadBalancer(%s): Creating forwarding rule %s for protocol %s, IP %s (tier: %s).", lbRefStr, desiredFR.Name, desiredFR.IPProtocol, ipAddressToUse, netTier)
318-
// The desiredFR is a complete spec, so we can just pass it to CreateRegionForwardingRule.
319-
if err := g.CreateRegionForwardingRule(desiredFR, g.region); err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
320-
return nil, fmt.Errorf("failed to create forwarding rule for load balancer (%s): %v", lbRefStr, err)
326+
// We can't update a forwarding rule in place, so we have to delete it and recreate it.
327+
if exists {
328+
klog.Infof("ensureExternalLoadBalancer(%s): Deleting forwarding rule %s to update.", lbRefStr, desiredFR.Name)
329+
if err := g.DeleteRegionForwardingRule(desiredFR.Name, g.region); err != nil && !isNotFound(err) {
330+
return nil, err
321331
}
322-
klog.Infof("ensureExternalLoadBalancer(%s): Created forwarding rule %s.", lbRefStr, desiredFR.Name)
323332
}
333+
klog.Infof("ensureExternalLoadBalancer(%s): Creating forwarding rule %s for protocol %s, IP %s (tier: %s).", lbRefStr, desiredFR.Name, desiredFR.IPProtocol, ipAddressToUse, netTier)
334+
// The desiredFR is a complete spec, so we can just pass it to CreateRegionForwardingRule.
335+
// TODO: Why do we ignore the conflict error?
336+
if err := g.CreateRegionForwardingRule(desiredFR, g.region); err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
337+
return nil, fmt.Errorf("failed to create forwarding rule for load balancer (%s): %v", lbRefStr, err)
338+
}
339+
klog.Infof("ensureExternalLoadBalancer(%s): Created forwarding rule %s.", lbRefStr, desiredFR.Name)
324340
}
325341

342+
// End critical section. It is safe to release the static IP (which
343+
// just demotes it to ephemeral) now that it is attached. In the case
344+
// of a user-requested IP, the "is user-owned" flag will be set,
345+
// preventing it from actually being released.
326346
isSafeToReleaseIP = true
347+
327348
status := &v1.LoadBalancerStatus{}
328349
status.Ingress = []v1.LoadBalancerIngress{{IP: ipAddressToUse}}
329350

@@ -339,19 +360,32 @@ func (g *Cloud) getExistingForwardingRules(loadBalancerName string) (map[string]
339360

340361
existingFRs := make(map[string]*compute.ForwardingRule)
341362
for _, fr := range frs {
342-
if strings.HasPrefix(fr.Name, loadBalancerName) {
363+
isMatch := false
364+
if fr.Name == loadBalancerName {
365+
isMatch = true
366+
} else if fr.Name == g.getProtocolForwardingRuleName(loadBalancerName, v1.Protocol(fr.IPProtocol)) {
367+
isMatch = true
368+
}
369+
370+
if isMatch {
343371
existingFRs[fr.Name] = fr
344372
}
345373
}
346374
return existingFRs, nil
347375
}
348376

349377
// buildDesiredForwardingRules builds the desired forwarding rules for the given load balancer.
350-
func (g *Cloud) buildDesiredForwardingRules(loadBalancerName, serviceName, ipAddress, targetPoolURL string, apiService *v1.Service, netTier cloud.NetworkTier, oldFwdRule *compute.ForwardingRule) (map[string]*compute.ForwardingRule, error) {
378+
func (g *Cloud) buildDesiredForwardingRules(loadBalancerName, serviceName, ipAddress, targetPoolURL string, apiService *v1.Service, netTier cloud.NetworkTier, existingFRs map[string]*compute.ForwardingRule) (map[string]*compute.ForwardingRule, error) {
351379
desiredFRs := make(map[string]*compute.ForwardingRule)
352380
groupedPorts := groupPortsByProtocol(apiService.Spec.Ports)
353381
desc := makeServiceDescription(serviceName)
354382

383+
// Find the legacy forwarding rule to minimize changes.
384+
var oldFwdRule *compute.ForwardingRule
385+
if fwd, ok := existingFRs[loadBalancerName]; ok {
386+
oldFwdRule = fwd
387+
}
388+
355389
for protocol, protocolPorts := range groupedPorts {
356390
var frName string
357391
if g.AlphaFeatureGate.Enabled(AlphaFeatureMultiProtocolLB) {
@@ -364,7 +398,6 @@ func (g *Cloud) buildDesiredForwardingRules(loadBalancerName, serviceName, ipAdd
364398
frName = loadBalancerName
365399
}
366400

367-
frPorts := getPorts(protocolPorts)
368401
portRange, err := loadBalancerPortRange(protocolPorts)
369402
if err != nil {
370403
return nil, err
@@ -380,8 +413,10 @@ func (g *Cloud) buildDesiredForwardingRules(loadBalancerName, serviceName, ipAdd
380413
NetworkTier: netTier.ToGCEValue(),
381414
}
382415

383-
if len(frPorts) <= maxForwardedPorts && g.enableDiscretePortForwarding {
384-
rule.Ports = frPorts
416+
if len(protocolPorts) <= maxForwardedPorts && g.enableDiscretePortForwarding {
417+
for _, p := range protocolPorts {
418+
rule.Ports = append(rule.Ports, strconv.Itoa(int(p.Port)))
419+
}
385420
rule.PortRange = ""
386421
}
387422

@@ -392,6 +427,14 @@ func (g *Cloud) buildDesiredForwardingRules(loadBalancerName, serviceName, ipAdd
392427

393428
// forwardingRulesEqual checks if two forwarding rules are equal.
394429
func (g *Cloud) forwardingRulesEqual(existing, desired *compute.ForwardingRule) bool {
430+
if existing.Name != desired.Name {
431+
klog.V(3).Infof("Forwarding rule %s name changed from %s to %s", existing.Name, existing.Name, desired.Name)
432+
return false
433+
}
434+
if existing.Description != desired.Description {
435+
klog.V(3).Infof("Forwarding rule %s description changed from %s to %s", existing.Name, existing.Description, desired.Description)
436+
return false
437+
}
395438
if existing.IPAddress != desired.IPAddress {
396439
klog.V(3).Infof("Forwarding rule %s IP changed from %s to %s", existing.Name, existing.IPAddress, desired.IPAddress)
397440
return false
@@ -415,7 +458,6 @@ func (g *Cloud) forwardingRulesEqual(existing, desired *compute.ForwardingRule)
415458
return true
416459
}
417460

418-
419461
// updateExternalLoadBalancer is the external implementation of LoadBalancer.UpdateLoadBalancer.
420462
func (g *Cloud) updateExternalLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error {
421463
// Process services with LoadBalancerClass "networking.gke.io/l4-regional-external-legacy" used for this controller.
@@ -500,17 +542,16 @@ func (g *Cloud) ensureExternalLoadBalancerDeleted(clusterName, clusterID string,
500542
}
501543

502544
// TODO: Always or just with alpha feature flag?
503-
frs, err := g.ListRegionForwardingRules(g.region)
545+
existingFRs, err := g.getExistingForwardingRules(loadBalancerName)
504546
if err != nil {
505547
return err
506548
}
549+
507550
var deleteErrs []error
508-
for _, fr := range frs {
509-
if strings.HasPrefix(fr.Name, loadBalancerName) {
510-
klog.Infof("ensureExternalLoadBalancerDeleted(%s): Deleting forwarding rule %s.", lbRefStr, fr.Name)
511-
if err := ignoreNotFound(g.DeleteRegionForwardingRule(fr.Name, g.region)); err != nil {
512-
deleteErrs = append(deleteErrs, err)
513-
}
551+
for _, fr := range existingFRs {
552+
klog.Infof("ensureExternalLoadBalancerDeleted(%s): Deleting forwarding rule %s.", lbRefStr, fr.Name)
553+
if err := ignoreNotFound(g.DeleteRegionForwardingRule(fr.Name, g.region)); err != nil {
554+
deleteErrs = append(deleteErrs, err)
514555
}
515556
}
516557
if len(deleteErrs) > 0 {
@@ -908,8 +949,6 @@ func (g *Cloud) ensureHTTPHealthCheck(name, path string, port int32) (hc *comput
908949
return hc, nil
909950
}
910951

911-
912-
913952
// Doesn't check whether the hosts have changed, since host updating is handled
914953
// separately.
915954
func (g *Cloud) targetPoolNeedsRecreation(name, region string, affinityType v1.ServiceAffinity) (exists bool, needsRecreation bool, err error) {
@@ -964,26 +1003,14 @@ func hostURLToComparablePath(hostURL string) string {
9641003
return hostURL[idx:]
9651004
}
9661005

967-
func getProtocol(svcPorts []v1.ServicePort) (v1.Protocol, error) {
968-
if len(svcPorts) == 0 {
969-
return v1.ProtocolTCP, nil
970-
}
971-
// The service controller verified all the protocols match on the ports, just check and use the first one
972-
protocol := svcPorts[0].Protocol
973-
if protocol != v1.ProtocolTCP && protocol != v1.ProtocolUDP {
974-
return v1.ProtocolTCP, fmt.Errorf("invalid protocol %s, only TCP and UDP are supported", string(protocol))
975-
}
976-
return protocol, nil
977-
}
978-
979-
func getPorts(svcPorts []v1.ServicePort) []string {
980-
ports := []string{}
981-
for _, p := range svcPorts {
982-
ports = append(ports, strconv.Itoa(int(p.Port)))
983-
}
1006+
// func getPorts(svcPorts []v1.ServicePort) []string {
1007+
// ports := []string{}
1008+
// for _, p := range svcPorts {
1009+
// ports = append(ports, strconv.Itoa(int(p.Port)))
1010+
// }
9841011

985-
return ports
986-
}
1012+
// return ports
1013+
// }
9871014

9881015
func minMaxPort[T v1.ServicePort | string](svcPorts []T) (int32, int32) {
9891016
minPort := int32(65536)
@@ -1145,8 +1172,6 @@ func (g *Cloud) ensureHTTPHealthCheckFirewall(svc *v1.Service, serviceName, ipAd
11451172
return nil
11461173
}
11471174

1148-
1149-
11501175
func (g *Cloud) createFirewall(svc *v1.Service, name, desc, destinationIP string, sourceRanges utilnet.IPNetSet, ports []v1.ServicePort, hosts []*gceInstance) error {
11511176
firewall, err := g.firewallObject(name, desc, destinationIP, sourceRanges, ports, hosts)
11521177
if err != nil {

0 commit comments

Comments
 (0)