Skip to content

Commit 47a2c6f

Browse files
committed
fixup! Implement multi forwardingRule in interanal case
1 parent a068f95 commit 47a2c6f

File tree

1 file changed

+116
-111
lines changed

1 file changed

+116
-111
lines changed

providers/gce/gce_loadbalancer_internal.go

Lines changed: 116 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929

3030
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
3131
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
32-
"github.com/google/go-cmp/cmp"
3332
compute "google.golang.org/api/compute/v1"
3433
v1 "k8s.io/api/core/v1"
3534
"k8s.io/apimachinery/pkg/types"
@@ -188,121 +187,109 @@ func (g *Cloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v
188187

189188
// Get all existing forwarding rules for this service.
190189
// A service can have a forwarding rule with the base name, or with a protocol suffix.
191-
existingFwdRules := make(map[string]*compute.ForwardingRule)
192-
// The `existingFwdRule` is the one with the base name, passed into this function.
193-
if existingFwdRule != nil {
194-
existingFwdRules[existingFwdRule.Name] = existingFwdRule
195-
}
190+
// TODO: Remove existingFwdRules and use the one from the function.
196191
// Check for forwarding rules with protocol suffixes.
197-
if len(groupedPorts) > 1 {
198-
for protocol := range groupedPorts {
199-
frName := fmt.Sprintf("%s-%s", loadBalancerName, strings.ToLower(string(protocol)))
200-
if _, ok := existingFwdRules[frName]; ok {
201-
continue
202-
}
203-
fr, err := g.GetRegionForwardingRule(frName, g.region)
204-
if err != nil && !isNotFound(err) {
205-
return nil, err
206-
}
207-
if fr != nil {
208-
existingFwdRules[fr.Name] = fr
209-
}
210-
}
211-
}
192+
op := &ensureOperation{}
193+
g.findActualBackendServiceInternal(backendServiceName, op)
212194

213-
desiredFwdRuleNames := sets.NewString()
214-
var desiredFwdRuleProtocols []v1.Protocol
215-
for protocol := range groupedPorts {
216-
desiredFwdRuleProtocols = append(desiredFwdRuleProtocols, protocol)
195+
existingForwardingRules, err := g.getExistingForwardingRules(loadBalancerName)
196+
if err != nil {
197+
return nil, err
217198
}
218-
// Sort protocols to have a stable order for naming and processing.
219-
sort.Slice(desiredFwdRuleProtocols, func(i, j int) bool {
220-
return desiredFwdRuleProtocols[i] < desiredFwdRuleProtocols[j]
221-
})
199+
op.forwardingRulesActual = existingForwardingRules
222200

223201
var createdFwdRules []*compute.ForwardingRule
224-
var desiredBackendServices = make(map[string]bool)
225-
226-
for _, protocol := range desiredFwdRuleProtocols {
227-
portStruct := groupedPorts[protocol]
228-
ports := portStruct.portRanges
229-
230-
// Each protocol gets its own backend service.
231-
// The backend service name must be unique per protocol.
232-
// Pass a single-protocol map to makeBackendServiceName.
233-
singleProtocolGroupedPorts := map[v1.Protocol]ProtocolPorts{protocol: portStruct}
234-
backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, sharedBackend, scheme, singleProtocolGroupedPorts, svc.Spec.SessionAffinity)
235-
desiredBackendServices[backendServiceName] = true
236-
backendServiceLink := g.getBackendServiceLink(backendServiceName)
237-
238-
bsDescription := makeBackendServiceDescription(nm, sharedBackend)
239-
err = g.ensureInternalBackendService(backendServiceName, bsDescription, svc.Spec.SessionAffinity, scheme, protocol, igLinks, hc.SelfLink)
240-
if err != nil {
241-
return nil, err
242-
}
243202

203+
var desiredForwardingRules []*compute.ForwardingRule
204+
for protocol, protocolPorts := range groupedPorts {
244205
// Each protocol gets its own forwarding rule.
245206
// If there's only one protocol, the forwarding rule name is the load balancer name.
246207
// Otherwise, it's load-balancer-name-<protocol>.
247208
frName := loadBalancerName
248209
if len(groupedPorts) > 1 {
249210
frName = fmt.Sprintf("%s-%s", loadBalancerName, strings.ToLower(string(protocol)))
250211
}
251-
desiredFwdRuleNames.Insert(frName)
252212

253-
newFwdRule := &compute.ForwardingRule{
213+
forwardingRule := &compute.ForwardingRule{
254214
Name: frName,
255215
Description: fwdRuleDescriptionString,
256216
IPAddress: ipToUse,
257217
BackendService: backendServiceLink,
258-
Ports: ports,
259218
IPProtocol: string(protocol),
260219
LoadBalancingScheme: string(scheme),
261220
Subnetwork: subnetworkURL,
262221
Network: g.networkURL,
263222
}
264223
if options.AllowGlobalAccess {
265-
newFwdRule.AllowGlobalAccess = options.AllowGlobalAccess
224+
forwardingRule.AllowGlobalAccess = options.AllowGlobalAccess
225+
}
226+
if len(protocolPorts.ports) > maxL4ILBPorts {
227+
forwardingRule.Ports = nil
228+
forwardingRule.AllPorts = true
229+
} else {
230+
for _, port := range protocolPorts.ports {
231+
forwardingRule.Ports = append(forwardingRule.Ports, strconv.Itoa(port))
232+
}
266233
}
267-
if len(ports) > maxL4ILBPorts {
268-
newFwdRule.Ports = nil
269-
newFwdRule.AllPorts = true
234+
235+
// Check if a forwarding rule for this protocol already exists; reuse the same name if possible.
236+
for _, existingFwdRule := range existingForwardingRules {
237+
if existingFwdRule.IPProtocol == forwardingRule.IPProtocol {
238+
forwardingRule.Name = existingFwdRule.Name
239+
break
240+
}
270241
}
271242

272-
// Check if a forwarding rule for this protocol already exists.
243+
desiredForwardingRules = append(desiredForwardingRules, forwardingRule)
244+
}
245+
op.forwardingRulesDesired = desiredForwardingRules
246+
247+
bsDescription := makeBackendServiceDescription(nm, sharedBackend)
248+
backendServiceProtocol := "UNSPECIFIED"
249+
if len(desiredForwardingRules) == 1 {
250+
backendServiceProtocol = desiredForwardingRules[0].IPProtocol
251+
}
252+
g.buildDesiredBackendServiceInternal(backendServiceName, bsDescription, svc.Spec.SessionAffinity, scheme, backendServiceProtocol, igLinks, hc.SelfLink, op)
253+
254+
if err := g.ensureInternalBackendService(backendServiceName, op); err != nil {
255+
return nil, err
256+
}
257+
258+
for _, desiredForwardingRule := range desiredForwardingRules {
273259
var existingFwdRuleForProtocol *compute.ForwardingRule
274-
if fr, ok := existingFwdRules[frName]; ok {
275-
existingFwdRuleForProtocol = fr
260+
for _, existingFwdRule := range existingForwardingRules {
261+
if existingFwdRule.IPProtocol == desiredForwardingRule.IPProtocol {
262+
existingFwdRuleForProtocol = existingFwdRule
263+
break
264+
}
276265
}
277266

278-
if err := g.ensureInternalForwardingRule(existingFwdRuleForProtocol, newFwdRule); err != nil {
267+
if err := g.ensureInternalForwardingRule(existingFwdRuleForProtocol, desiredForwardingRule); err != nil {
279268
return nil, err
280269
}
281-
createdFwdRules = append(createdFwdRules, newFwdRule)
270+
createdFwdRules = append(createdFwdRules, desiredForwardingRule)
282271
}
283272

284273
// Delete any forwarding rules that are no longer needed.
285-
for frName, fr := range existingFwdRules {
286-
if desiredFwdRuleNames.Has(frName) {
274+
for frName, fr := range existingForwardingRules {
275+
var matching *compute.ForwardingRule
276+
for _, desiredForwardingRule := range desiredForwardingRules {
277+
if desiredForwardingRule.Name == fr.Name {
278+
matching = desiredForwardingRule
279+
continue
280+
}
281+
}
282+
if matching != nil {
287283
continue
288284
}
285+
289286
klog.V(2).Infof("ensureInternalLoadBalancer(%v): deleting stale forwarding rule %s", loadBalancerName, frName)
290287
if err := ignoreNotFound(g.DeleteRegionForwardingRule(frName, g.region)); err != nil {
291288
return nil, err
292289
}
293-
// Also delete the associated backend service if it's not used by other forwarding rules.
294-
if fr.BackendService != "" {
295-
bsName := getNameFromLink(fr.BackendService)
296-
if !desiredBackendServices[bsName] {
297-
klog.V(2).Infof("ensureInternalLoadBalancer(%v): deleting stale backend service %s", loadBalancerName, bsName)
298-
if err := g.teardownInternalBackendService(bsName); err != nil {
299-
klog.Warningf("ensureInternalLoadBalancer: could not delete old backend service %s: %v", bsName, err)
300-
}
301-
}
302-
}
303290
}
304291

305-
if len(createdFwdRules) == 0 {
292+
if len(desiredForwardingRules) == 0 {
306293
klog.V(2).Infof("ensureInternalLoadBalancer(%v): no forwarding rules needed, all deleted.", loadBalancerName)
307294
return &v1.LoadBalancerStatus{}, nil
308295
}
@@ -476,36 +463,25 @@ func (g *Cloud) ensureInternalLoadBalancerDeleted(clusterName, clusterID string,
476463
klog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): attempting delete of region internal address", loadBalancerName)
477464
ensureAddressDeleted(g, loadBalancerName, g.region)
478465

479-
frNames := sets.NewString(loadBalancerName)
480-
if len(groupedPorts) > 1 {
481-
for protocol := range groupedPorts {
482-
frNames.Insert(fmt.Sprintf("%s-%s", loadBalancerName, strings.ToLower(string(protocol))))
483-
}
466+
// Get existing forwarding rules.
467+
existingForwardingRules, err := g.getExistingForwardingRules(loadBalancerName)
468+
if err != nil {
469+
return err
484470
}
485-
// Sort for deterministic deletion order.
486-
sortedFrNames := frNames.List()
487-
sort.Strings(sortedFrNames)
488-
for _, frName := range sortedFrNames {
489-
klog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting region internal forwarding rule %s", loadBalancerName, frName)
490-
if err := ignoreNotFound(g.DeleteRegionForwardingRule(frName, g.region)); err != nil {
471+
472+
// Delete existing forwarding rules.
473+
for _, existingForwardingRule := range existingForwardingRules {
474+
klog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting region internal forwarding rule %s", loadBalancerName, existingForwardingRule.Name)
475+
if err := ignoreNotFound(g.DeleteRegionForwardingRule(existingForwardingRule.Name, g.region)); err != nil {
491476
return err
492477
}
493478
}
494479

495-
var protocols []v1.Protocol
496-
for p := range groupedPorts {
497-
protocols = append(protocols, p)
498-
}
499-
sort.Slice(protocols, func(i, j int) bool { return protocols[i] < protocols[j] })
500-
501-
for _, protocol := range protocols {
502-
portStruct := groupedPorts[protocol]
503-
singleProtocolGroupedPorts := map[v1.Protocol]ProtocolPorts{protocol: portStruct}
504-
backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, sharedBackend, scheme, singleProtocolGroupedPorts, svc.Spec.SessionAffinity)
505-
klog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting region backend service %v", loadBalancerName, backendServiceName)
506-
if err := g.teardownInternalBackendService(backendServiceName); err != nil {
507-
return err
508-
}
480+
// Delete existing backend service.
481+
backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, sharedBackend, scheme, groupedPorts, svc.Spec.SessionAffinity)
482+
klog.V(2).Infof("ensureInternalLoadBalancerDeleted(%v): deleting region backend service %v", loadBalancerName, backendServiceName)
483+
if err := g.teardownInternalBackendService(backendServiceName); err != nil {
484+
return err
509485
}
510486

511487
deleteFunc := func(fwName string) error {
@@ -881,43 +857,72 @@ func (g *Cloud) ensureInternalInstanceGroupsDeleted(name string) error {
881857
return nil
882858
}
883859

884-
func (g *Cloud) ensureInternalBackendService(name, description string, affinityType v1.ServiceAffinity, scheme cloud.LbScheme, protocol v1.Protocol, igLinks []string, hcLink string) error {
885-
klog.V(2).Infof("ensureInternalBackendService(%v, %v, %v): checking existing backend service with %d groups", name, scheme, protocol, len(igLinks))
886-
bs, err := g.GetRegionBackendService(name, g.region)
887-
if err != nil && !isNotFound(err) {
888-
return err
889-
}
860+
type ensureOperation struct {
861+
backendServiceDesired *compute.BackendService
862+
backendServiceActual *compute.BackendService
863+
forwardingRulesDesired []*compute.ForwardingRule
864+
forwardingRulesActual map[string]*compute.ForwardingRule
865+
}
890866

867+
func (g *Cloud) buildDesiredBackendServiceInternal(name, description string, affinityType v1.ServiceAffinity, scheme cloud.LbScheme, protocol string, igLinks []string, hcLink string, op *ensureOperation) error {
891868
backends := backendsFromGroupLinks(igLinks)
892-
expectedBS := &compute.BackendService{
869+
bs := &compute.BackendService{
893870
Name: name,
894-
Protocol: string(protocol),
871+
Protocol: protocol,
895872
Description: description,
896873
HealthChecks: []string{hcLink},
897874
Backends: backends,
898875
SessionAffinity: translateAffinityType(affinityType),
899876
LoadBalancingScheme: string(scheme),
900877
}
901878

879+
op.backendServiceDesired = bs
880+
return nil
881+
}
882+
883+
func (g *Cloud) findActualBackendServiceInternal(name string, op *ensureOperation) error {
884+
klog.V(2).Infof("findActualBackendServiceInternal(%v): checking existing backend services", name)
885+
886+
bs, err := g.GetRegionBackendService(name, g.region)
887+
if err != nil && !isNotFound(err) {
888+
return err
889+
}
890+
891+
op.backendServiceActual = bs
892+
return nil
893+
}
894+
895+
func (g *Cloud) ensureInternalBackendService(name string, op *ensureOperation) error {
902896
// Create backend service if none was found
903-
if bs == nil {
897+
if op.backendServiceActual == nil {
904898
klog.V(2).Infof("ensureInternalBackendService: creating backend service %v", name)
905-
err := g.CreateRegionBackendService(expectedBS, g.region)
899+
err := g.CreateRegionBackendService(op.backendServiceDesired, g.region)
906900
if err != nil {
907901
return err
908902
}
909903
klog.V(2).Infof("ensureInternalBackendService: created backend service %v successfully", name)
910904
return nil
911905
}
912906

913-
if backendSvcEqual(expectedBS, bs) {
907+
if backendSvcEqual(op.backendServiceActual, op.backendServiceDesired) {
914908
return nil
915909
}
916910

911+
// Delete existing forwarding rule before making changes to the backend service. For example - changing protocol
912+
// of backend service without first deleting forwarding rule will throw an error since the linked forwarding
913+
// rule would show the old protocol.
914+
for _, fr := range op.forwardingRulesActual {
915+
klog.V(2).Infof("ensureInternalBackendService: deleting forwarding rule %v", fr.Name)
916+
if err := g.DeleteRegionForwardingRule(fr.Name, g.region); ignoreNotFound(err) != nil {
917+
return err
918+
}
919+
}
920+
op.forwardingRulesActual = nil
921+
917922
klog.V(2).Infof("ensureInternalBackendService: updating backend service %v", name)
918923
// Set fingerprint for optimistic locking
919-
expectedBS.Fingerprint = bs.Fingerprint
920-
if err := g.UpdateRegionBackendService(expectedBS, g.region); err != nil {
924+
op.backendServiceDesired.Fingerprint = op.backendServiceActual.Fingerprint
925+
if err := g.UpdateRegionBackendService(op.backendServiceDesired, g.region); err != nil {
921926
return err
922927
}
923928
klog.V(2).Infof("ensureInternalBackendService: updated backend service %v successfully", name)

0 commit comments

Comments
 (0)