Skip to content

Commit 55ca64d

Browse files
committed
fixup! controller support for mixed protocol LB services
1 parent 72ed8d1 commit 55ca64d

File tree

2 files changed

+113
-48
lines changed

2 files changed

+113
-48
lines changed

providers/gce/gce_loadbalancer.go

Lines changed: 35 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -145,28 +145,28 @@ func (g *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, svc
145145
return nil, err
146146
}
147147

148-
// Services with multiples protocols are not supported by this controller, warn the users and sets
149-
// the corresponding Service Status Condition.
148+
// Services with multiples protocols are not supported by this controller (without AlphaFeatureMultiProtocolLB),
149+
// warn the users and set the corresponding Service Status Condition.
150150
// https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/1435-mixed-protocol-lb
151-
if !g.AlphaFeatureGate.Enabled(AlphaFeatureMultiProtocolLB) {
152-
if err := checkMixedProtocol(svc.Spec.Ports); err != nil {
153-
if hasLoadBalancerPortsError(svc) {
154-
return nil, err
155-
}
156-
klog.Warningf("Ignoring service %s/%s using different ports protocols", svc.Namespace, svc.Name)
157-
g.eventRecorder.Event(svc, v1.EventTypeWarning, v1.LoadBalancerPortsErrorReason, "LoadBalancers with multiple protocols are not supported.")
158-
svcApplyStatus := corev1apply.ServiceStatus().WithConditions(
159-
metav1apply.Condition().
160-
WithType(v1.LoadBalancerPortsError).
161-
WithStatus(metav1.ConditionTrue).
162-
WithReason(v1.LoadBalancerPortsErrorReason).
163-
WithMessage("LoadBalancer with multiple protocols are not supported"))
164-
svcApply := corev1apply.Service(svc.Name, svc.Namespace).WithStatus(svcApplyStatus)
165-
if _, errApply := g.client.CoreV1().Services(svc.Namespace).ApplyStatus(ctx, svcApply, metav1.ApplyOptions{FieldManager: "gce-cloud-controller", Force: true}); errApply != nil {
166-
return nil, errApply
167-
}
151+
if g.AlphaFeatureGate.Enabled(AlphaFeatureMultiProtocolLB) {
152+
klog.Infof("AlphaFeatureMultiProtocolLB feature gate is enabled")
153+
} else if err := checkMixedProtocol(svc.Spec.Ports); err != nil {
154+
if hasLoadBalancerPortsError(svc) {
168155
return nil, err
169156
}
157+
klog.Warningf("Ignoring service %s/%s using different ports protocols", svc.Namespace, svc.Name)
158+
g.eventRecorder.Event(svc, v1.EventTypeWarning, v1.LoadBalancerPortsErrorReason, "LoadBalancers with multiple protocols are not supported.")
159+
svcApplyStatus := corev1apply.ServiceStatus().WithConditions(
160+
metav1apply.Condition().
161+
WithType(v1.LoadBalancerPortsError).
162+
WithStatus(metav1.ConditionTrue).
163+
WithReason(v1.LoadBalancerPortsErrorReason).
164+
WithMessage("LoadBalancer with multiple protocols are not supported"))
165+
svcApply := corev1apply.Service(svc.Name, svc.Namespace).WithStatus(svcApplyStatus)
166+
if _, errApply := g.client.CoreV1().Services(svc.Namespace).ApplyStatus(ctx, svcApply, metav1.ApplyOptions{FieldManager: "gce-cloud-controller", Force: true}); errApply != nil {
167+
return nil, errApply
168+
}
169+
return nil, err
170170
}
171171

172172
klog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v): ensure %v loadbalancer", clusterName, svc.Namespace, svc.Name, loadBalancerName, g.region, desiredScheme)
@@ -230,24 +230,24 @@ func (g *Cloud) UpdateLoadBalancer(ctx context.Context, clusterName string, svc
230230
return err
231231
}
232232

233-
// Services with multiples protocols are not supported by this controller, warn the users and sets
233+
// Services with multiples protocols are not supported by this controller (without AlphaFeatureMultiProtocolLB), warn the users and sets
234234
// the corresponding Service Status Condition, but keep processing the Update to not break upgrades.
235235
// https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/1435-mixed-protocol-lb
236-
if !g.AlphaFeatureGate.Enabled(AlphaFeatureMultiProtocolLB) {
237-
if err := checkMixedProtocol(svc.Spec.Ports); err != nil && !hasLoadBalancerPortsError(svc) {
238-
klog.Warningf("Ignoring update for service %s/%s using different ports protocols", svc.Namespace, svc.Name)
239-
g.eventRecorder.Event(svc, v1.EventTypeWarning, v1.LoadBalancerPortsErrorReason, "LoadBalancer with multiple protocols are not supported.")
240-
svcApplyStatus := corev1apply.ServiceStatus().WithConditions(
241-
metav1apply.Condition().
242-
WithType(v1.LoadBalancerPortsError).
243-
WithStatus(metav1.ConditionTrue).
244-
WithReason(v1.LoadBalancerPortsErrorReason).
245-
WithMessage("LoadBalancer with multiple protocols are not supported"))
246-
svcApply := corev1apply.Service(svc.Name, svc.Namespace).WithStatus(svcApplyStatus)
247-
if _, errApply := g.client.CoreV1().Services(svc.Namespace).ApplyStatus(ctx, svcApply, metav1.ApplyOptions{FieldManager: "gce-cloud-controller", Force: true}); errApply != nil {
248-
// the error is retried by the controller loop
249-
return errApply
250-
}
236+
if g.AlphaFeatureGate.Enabled(AlphaFeatureMultiProtocolLB) {
237+
klog.Infof("AlphaFeatureMultiProtocolLB feature gate is enabled")
238+
} else if err := checkMixedProtocol(svc.Spec.Ports); err != nil && !hasLoadBalancerPortsError(svc) {
239+
klog.Warningf("Ignoring update for service %s/%s using different ports protocols", svc.Namespace, svc.Name)
240+
g.eventRecorder.Event(svc, v1.EventTypeWarning, v1.LoadBalancerPortsErrorReason, "LoadBalancer with multiple protocols are not supported.")
241+
svcApplyStatus := corev1apply.ServiceStatus().WithConditions(
242+
metav1apply.Condition().
243+
WithType(v1.LoadBalancerPortsError).
244+
WithStatus(metav1.ConditionTrue).
245+
WithReason(v1.LoadBalancerPortsErrorReason).
246+
WithMessage("LoadBalancer with multiple protocols are not supported"))
247+
svcApply := corev1apply.Service(svc.Name, svc.Namespace).WithStatus(svcApplyStatus)
248+
if _, errApply := g.client.CoreV1().Services(svc.Namespace).ApplyStatus(ctx, svcApply, metav1.ApplyOptions{FieldManager: "gce-cloud-controller", Force: true}); errApply != nil {
249+
// the error is retried by the controller loop
250+
return errApply
251251
}
252252
}
253253

providers/gce/gce_loadbalancer_external.go

Lines changed: 78 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,9 @@ const (
5252
// IP address, a firewall rule, a target pool, and a forwarding rule. This
5353
// function has to manage all of them.
5454
//
55-
// Due to an interesting series of design decisions, this handles both creating
56-
// new load balancers and updating existing load balancers, recognizing when
57-
// each is needed.
55+
// This function handles both creating new load balancers and updating existing load balancers,
56+
// recognizing when each is needed.
57+
// This approach is resilient, for example if we are interrupted part-way during creation.
5858
func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string, apiService *v1.Service, existingFwdRule *compute.ForwardingRule, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
5959
// Process services with LoadBalancerClass "networking.gke.io/l4-regional-external-legacy" used for this controller.
6060
// LoadBalancerClass can't be updated so we know this controller should process the NetLB.
@@ -253,11 +253,37 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string,
253253
}
254254
}
255255

256-
ipAddressToUse, isSafeToReleaseIP, err := g.ensureIPAddress(loadBalancerName, lbRefStr, requestedIP, fwdRuleIP, netTier)
257-
if err != nil {
258-
return nil, err
259-
}
256+
// Make sure we know which IP address will be used and have properly reserved
257+
// it as static before moving forward with the rest of our operations.
258+
//
259+
// We use static IP addresses when updating a load balancer to ensure that we
260+
// can replace the load balancer's other components without changing the
261+
// address its service is reachable on. We do it this way rather than always
262+
// keeping the static IP around even though this is more complicated because
263+
// it makes it less likely that we'll run into quota issues. Only 7 static
264+
// IP addresses are allowed per region by default.
265+
//
266+
// We could let an IP be allocated for us when the forwarding rule is created,
267+
// but we need the IP to set up the firewall rule, and we want to keep the
268+
// forwarding rule creation as the last thing that needs to be done in this
269+
// function in order to maintain the invariant that "if the forwarding rule
270+
// exists, the LB has been fully created".
271+
ipAddressToUse := ""
272+
273+
// Through this process we try to keep track of whether it is safe to
274+
// release the IP that was allocated. If the user specifically asked for
275+
// an IP, we assume they are managing it themselves. Otherwise, we will
276+
// release the IP in case of early-terminating failure or upon successful
277+
// creating of the LB.
278+
// TODO(#36535): boil this logic down into a set of component functions
279+
// and key the flag values off of errors returned.
280+
isUserOwnedIP := false // if this is set, we never release the IP
281+
isSafeToReleaseIP := false
282+
260283
defer func() {
284+
if isUserOwnedIP {
285+
return
286+
}
261287
if isSafeToReleaseIP {
262288
if err := g.DeleteRegionAddress(loadBalancerName, g.region); err != nil && !isNotFound(err) {
263289
klog.Errorf("ensureExternalLoadBalancer(%s): Failed to release static IP %s in region %v: %v.", lbRefStr, ipAddressToUse, g.region, err)
@@ -271,6 +297,36 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string,
271297
}
272298
}()
273299

300+
if requestedIP != "" {
301+
// If user requests a specific IP address, verify first. No mutation to
302+
// the GCE resources will be performed in the verification process.
303+
isUserOwnedIP, err = verifyUserRequestedIP(g, g.region, requestedIP, fwdRuleIP, lbRefStr, netTier)
304+
if err != nil {
305+
return nil, err
306+
}
307+
ipAddressToUse = requestedIP
308+
}
309+
310+
if !isUserOwnedIP {
311+
// If we are not using the user-owned IP, either promote the
312+
// emphemeral IP used by the fwd rule, or create a new static IP.
313+
ipAddr, existed, err := ensureStaticIP(g, loadBalancerName, serviceName.String(), g.region, fwdRuleIP, netTier)
314+
if err != nil {
315+
return nil, fmt.Errorf("failed to ensure a static IP for load balancer (%s): %v", lbRefStr, err)
316+
}
317+
klog.Infof("ensureExternalLoadBalancer(%s): Ensured IP address %s (tier: %s).", lbRefStr, ipAddr, netTier)
318+
// If the IP was not owned by the user, but it already existed, it
319+
// could indicate that the previous update cycle failed. We can use
320+
// this IP and try to run through the process again, but we should
321+
// not release the IP unless it is explicitly flagged as OK.
322+
isSafeToReleaseIP = !existed
323+
ipAddressToUse = ipAddr
324+
}
325+
326+
// Deal with the firewall next. The reason we do this here rather than last
327+
// is because the forwarding rule is used as the indicator that the load
328+
// balancer is fully created - it's what getLoadBalancer checks for.
329+
// Check if user specified the allow source range
274330
sourceRanges, err := servicehelpers.GetLoadBalancerSourceRanges(apiService)
275331
if err != nil {
276332
return nil, err
@@ -308,7 +364,8 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string,
308364
klog.Infof("ensureExternalLoadBalancer(%s): Target pool for service doesn't exist.", lbRefStr)
309365
}
310366

311-
// Health check logic...
367+
// Check which health check needs to create and which health check needs to delete.
368+
// Health check management is coupled with target pool operation to prevent leaking.
312369
var hcToCreate, hcToDelete *compute.HttpHealthCheck
313370
hcLocalTrafficExisting, err := g.GetHTTPHealthCheck(loadBalancerName)
314371
if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) {
@@ -317,6 +374,9 @@ func (g *Cloud) ensureExternalLoadBalancer(clusterName string, clusterID string,
317374
if path, healthCheckNodePort := servicehelpers.GetServiceHealthCheckPathPort(apiService); path != "" {
318375
klog.V(4).Infof("ensureExternalLoadBalancer(%s): Service needs local traffic health checks on: %d%s.", lbRefStr, healthCheckNodePort, path)
319376
if hcLocalTrafficExisting == nil {
377+
// This logic exists to detect a transition for non-OnlyLocal to OnlyLocal service
378+
// turn on the tpNeedsRecreation flag to delete/recreate fwdrule/tpool updating the
379+
// target pool to use local traffic health check.
320380
klog.V(2).Infof("ensureExternalLoadBalancer(%s): Updating from nodes health checks to local traffic health checks.", lbRefStr)
321381
hcToDelete = makeHTTPHealthCheck(MakeNodesHealthCheckName(clusterID), GetNodesHealthCheckPath(), GetNodesHealthCheckPort())
322382
tpNeedsRecreation = true
@@ -534,8 +594,13 @@ func (g *Cloud) ensureExternalLoadBalancerDeleted(clusterName, clusterID string,
534594
},
535595
func() error {
536596
klog.Infof("ensureExternalLoadBalancerDeleted(%s): Deleting forwarding rules.", lbRefStr)
537-
// The forwarding rule must be deleted before either the target pool can,
538-
// unfortunately, so we have to do these two serially.
597+
// The forwarding rule must be deleted before the target pool can be deleted,
598+
// unfortunately, so we have to delete forwarding rules then target pools serially.
599+
if err := ignoreNotFound(g.DeleteRegionForwardingRule(loadBalancerName, g.region)); err != nil {
600+
return err
601+
}
602+
603+
// TODO: Always or just with alpha feature flag?
539604
frs, err := g.ListRegionForwardingRules(g.region)
540605
if err != nil {
541606
return err
@@ -1125,8 +1190,7 @@ func equalPorts(existingPorts, newPorts []string, existingPortRange, newPortRang
11251190

11261191
func groupPortsByProtocol(ports []v1.ServicePort) map[v1.Protocol][]v1.ServicePort {
11271192
grouped := make(map[v1.Protocol][]v1.ServicePort)
1128-
for _, p := range ports {
1129-
port := p
1193+
for _, port := range ports {
11301194
grouped[port.Protocol] = append(grouped[port.Protocol], port)
11311195
}
11321196
return grouped
@@ -1195,7 +1259,6 @@ func (g *Cloud) firewallNeedsUpdate(name, serviceName, ipAddress string, ports [
11951259
return true, false, nil
11961260
}
11971261

1198-
11991262
func (g *Cloud) ensureHTTPHealthCheckFirewall(svc *v1.Service, serviceName, ipAddress, region, clusterID string, hosts []*gceInstance, hcName string, hcPort int32, isNodesHealthCheck bool) error {
12001263
// Prepare the firewall params for creating / checking.
12011264
desc := fmt.Sprintf(`{"kubernetes.io/cluster-id":"%s"}`, clusterID)
@@ -1310,6 +1373,8 @@ func (g *Cloud) updateFirewall(svc *v1.Service, name, desc, destinationIP string
13101373
func (g *Cloud) firewallObject(name, desc, destinationIP string, sourceRanges utilnet.IPNetSet, ports []v1.ServicePort, hosts []*gceInstance) (*compute.Firewall, error) {
13111374
// destinationIP can be empty string "" and this means that it is not set.
13121375
// GCE considers empty destinationRanges as "all" for ingress firewall-rules.
1376+
// Concatenate service ports into port ranges. This help to workaround the gce firewall limitation where only
1377+
// 100 ports or port ranges can be used in a firewall rule.
13131378
groupedPorts := groupPortsByProtocol(ports)
13141379
var allowed []*compute.FirewallAllowed
13151380
for protocol, protocolPorts := range groupedPorts {

0 commit comments

Comments
 (0)