Skip to content

Commit 4652554

Browse files
khenidakLars Ekman
andcommitted
create meta-proxy for proxy-mode=ipvs (dualstack)
co-authored-by: Lars Ekman <[email protected]>
1 parent 4495d09 commit 4652554

File tree

6 files changed

+515
-24
lines changed

6 files changed

+515
-24
lines changed

cmd/kube-proxy/app/server_others.go

Lines changed: 99 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,15 @@ import (
2424
"errors"
2525
"fmt"
2626
"net"
27+
"strings"
2728

2829
"k8s.io/api/core/v1"
2930
"k8s.io/apimachinery/pkg/types"
3031
utilnet "k8s.io/apimachinery/pkg/util/net"
3132
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
33+
utilfeature "k8s.io/apiserver/pkg/util/feature"
3234
"k8s.io/client-go/tools/record"
35+
"k8s.io/kubernetes/pkg/features"
3336
"k8s.io/kubernetes/pkg/proxy"
3437
proxyconfigapi "k8s.io/kubernetes/pkg/proxy/apis/config"
3538
proxyconfigscheme "k8s.io/kubernetes/pkg/proxy/apis/config/scheme"
@@ -46,6 +49,7 @@ import (
4649
utilnode "k8s.io/kubernetes/pkg/util/node"
4750
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
4851
"k8s.io/utils/exec"
52+
utilsnet "k8s.io/utils/net"
4953

5054
"k8s.io/klog"
5155
)
@@ -170,26 +174,61 @@ func newProxyServer(
170174
metrics.RegisterMetrics()
171175
} else if proxyMode == proxyModeIPVS {
172176
klog.V(0).Info("Using ipvs Proxier.")
173-
proxier, err = ipvs.NewProxier(
174-
iptInterface,
175-
ipvsInterface,
176-
ipsetInterface,
177-
utilsysctl.New(),
178-
execer,
179-
config.IPVS.SyncPeriod.Duration,
180-
config.IPVS.MinSyncPeriod.Duration,
181-
config.IPVS.ExcludeCIDRs,
182-
config.IPVS.StrictARP,
183-
config.IPTables.MasqueradeAll,
184-
int(*config.IPTables.MasqueradeBit),
185-
config.ClusterCIDR,
186-
hostname,
187-
nodeIP,
188-
recorder,
189-
healthzServer,
190-
config.IPVS.Scheduler,
191-
config.NodePortAddresses,
192-
)
177+
if utilfeature.DefaultFeatureGate.Enabled(features.IPv6DualStack) {
178+
klog.V(0).Info("creating dualStackProxier for ipvs.")
179+
180+
// Create iptables handlers for both families, one is already created
181+
var ipt [2]utiliptables.Interface
182+
if iptInterface.IsIpv6() {
183+
ipt[1] = iptInterface
184+
ipt[0] = utiliptables.New(execer, dbus, utiliptables.ProtocolIpv4)
185+
} else {
186+
ipt[0] = iptInterface
187+
ipt[1] = utiliptables.New(execer, dbus, utiliptables.ProtocolIpv6)
188+
}
189+
190+
proxier, err = ipvs.NewDualStackProxier(
191+
ipt,
192+
ipvsInterface,
193+
ipsetInterface,
194+
utilsysctl.New(),
195+
execer,
196+
config.IPVS.SyncPeriod.Duration,
197+
config.IPVS.MinSyncPeriod.Duration,
198+
config.IPVS.ExcludeCIDRs,
199+
config.IPVS.StrictARP,
200+
config.IPTables.MasqueradeAll,
201+
int(*config.IPTables.MasqueradeBit),
202+
cidrTuple(config.ClusterCIDR),
203+
hostname,
204+
nodeIPTuple(config.BindAddress),
205+
recorder,
206+
healthzServer,
207+
config.IPVS.Scheduler,
208+
config.NodePortAddresses,
209+
)
210+
} else {
211+
proxier, err = ipvs.NewProxier(
212+
iptInterface,
213+
ipvsInterface,
214+
ipsetInterface,
215+
utilsysctl.New(),
216+
execer,
217+
config.IPVS.SyncPeriod.Duration,
218+
config.IPVS.MinSyncPeriod.Duration,
219+
config.IPVS.ExcludeCIDRs,
220+
config.IPVS.StrictARP,
221+
config.IPTables.MasqueradeAll,
222+
int(*config.IPTables.MasqueradeBit),
223+
config.ClusterCIDR,
224+
hostname,
225+
nodeIP,
226+
recorder,
227+
healthzServer,
228+
config.IPVS.Scheduler,
229+
config.NodePortAddresses,
230+
)
231+
}
193232
if err != nil {
194233
return nil, fmt.Errorf("unable to create proxier: %v", err)
195234
}
@@ -238,6 +277,46 @@ func newProxyServer(
238277
}, nil
239278
}
240279

280+
// cidrTuple takes a comma separated list of CIDRs and return a tuple (ipv4cidr,ipv6cidr)
281+
// The returned tuple is guaranteed to have the order (ipv4,ipv6) and if no cidr from a family is found an
282+
// empty string "" is inserted.
283+
func cidrTuple(cidrList string) [2]string {
284+
cidrs := [2]string{"", ""}
285+
foundIPv4 := false
286+
foundIPv6 := false
287+
288+
for _, cidr := range strings.Split(cidrList, ",") {
289+
if utilsnet.IsIPv6CIDRString(cidr) && !foundIPv6 {
290+
cidrs[1] = cidr
291+
foundIPv6 = true
292+
} else if !foundIPv4 {
293+
cidrs[0] = cidr
294+
foundIPv4 = true
295+
}
296+
if foundIPv6 && foundIPv4 {
297+
break
298+
}
299+
}
300+
301+
return cidrs
302+
}
303+
304+
// nodeIPTuple takes an addresses and return a tuple (ipv4,ipv6)
305+
// The returned tuple is guaranteed to have the order (ipv4,ipv6). The address NOT of the passed address
306+
// will have "any" address (0.0.0.0 or ::) inserted.
307+
func nodeIPTuple(bindAddress string) [2]net.IP {
308+
nodes := [2]net.IP{net.IPv4zero, net.IPv6zero}
309+
310+
adr := net.ParseIP(bindAddress)
311+
if utilsnet.IsIPv6(adr) {
312+
nodes[1] = adr
313+
} else {
314+
nodes[0] = adr
315+
}
316+
317+
return nodes
318+
}
319+
241320
func getProxyMode(proxyMode string, khandle ipvs.KernelHandler, ipsetver ipvs.IPSetVersioner, kcompat iptables.KernelCompatTester) string {
242321
switch proxyMode {
243322
case proxyModeUserspace:

pkg/proxy/ipvs/ipset.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323

2424
"fmt"
2525
"k8s.io/klog"
26+
"strings"
2627
)
2728

2829
const (
@@ -102,6 +103,20 @@ func NewIPSet(handle utilipset.Interface, name string, setType utilipset.Type, i
102103
hashFamily := utilipset.ProtocolFamilyIPV4
103104
if isIPv6 {
104105
hashFamily = utilipset.ProtocolFamilyIPV6
106+
// In dual-stack both ipv4 and ipv6 ipset's can co-exist. To
107+
// ensure unique names the prefix for ipv6 is changed from
108+
// "KUBE-" to "KUBE-6-". The "KUBE-" prefix is kept for
109+
// backward compatibility. The maximum name length of an ipset
110+
// is 31 characters which must be taken into account. The
111+
// ipv4 names are not altered to minimize the risk for
112+
// problems on upgrades.
113+
if strings.HasPrefix(name, "KUBE-") {
114+
name = strings.Replace(name, "KUBE-", "KUBE-6-", 1)
115+
if len(name) > 31 {
116+
klog.Warningf("ipset name truncated; [%s] -> [%s]", name, name[:31])
117+
name = name[:31]
118+
}
119+
}
105120
}
106121
set := &IPSet{
107122
IPSet: utilipset.IPSet{

pkg/proxy/ipvs/meta_proxier.go

Lines changed: 203 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,203 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package ipvs
18+
19+
import (
20+
"fmt"
21+
22+
"k8s.io/api/core/v1"
23+
"k8s.io/klog"
24+
"k8s.io/kubernetes/pkg/proxy"
25+
utilnet "k8s.io/utils/net"
26+
27+
discovery "k8s.io/api/discovery/v1alpha1"
28+
)
29+
30+
type metaProxier struct {
31+
ipv4Proxier proxy.Provider
32+
ipv6Proxier proxy.Provider
33+
}
34+
35+
// NewMetaProxier returns a dual-stack "meta-proxier". Proxier API
36+
// calls will be dispatched to the ProxyProvider instances depending
37+
// on address family.
38+
func NewMetaProxier(ipv4Proxier, ipv6Proxier proxy.Provider) proxy.Provider {
39+
return proxy.Provider(&metaProxier{
40+
ipv4Proxier: ipv4Proxier,
41+
ipv6Proxier: ipv6Proxier,
42+
})
43+
}
44+
45+
// Sync immediately synchronizes the ProxyProvider's current state to
46+
// proxy rules.
47+
func (proxier *metaProxier) Sync() {
48+
proxier.ipv4Proxier.Sync()
49+
proxier.ipv6Proxier.Sync()
50+
}
51+
52+
// SyncLoop runs periodic work. This is expected to run as a
53+
// goroutine or as the main loop of the app. It does not return.
54+
func (proxier *metaProxier) SyncLoop() {
55+
go proxier.ipv6Proxier.SyncLoop() // Use go-routine here!
56+
proxier.ipv4Proxier.SyncLoop() // never returns
57+
}
58+
59+
// OnServiceAdd is called whenever creation of new service object is observed.
60+
func (proxier *metaProxier) OnServiceAdd(service *v1.Service) {
61+
if *(service.Spec.IPFamily) == v1.IPv4Protocol {
62+
proxier.ipv4Proxier.OnServiceAdd(service)
63+
return
64+
}
65+
proxier.ipv6Proxier.OnServiceAdd(service)
66+
}
67+
68+
// OnServiceUpdate is called whenever modification of an existing
69+
// service object is observed.
70+
func (proxier *metaProxier) OnServiceUpdate(oldService, service *v1.Service) {
71+
// IPFamily is immutable, hence we only need to check on the new service
72+
if *(service.Spec.IPFamily) == v1.IPv4Protocol {
73+
proxier.ipv4Proxier.OnServiceUpdate(oldService, service)
74+
return
75+
}
76+
77+
proxier.ipv6Proxier.OnServiceUpdate(oldService, service)
78+
}
79+
80+
// OnServiceDelete is called whenever deletion of an existing service
81+
// object is observed.
82+
func (proxier *metaProxier) OnServiceDelete(service *v1.Service) {
83+
if *(service.Spec.IPFamily) == v1.IPv4Protocol {
84+
proxier.ipv4Proxier.OnServiceDelete(service)
85+
return
86+
}
87+
proxier.ipv6Proxier.OnServiceDelete(service)
88+
}
89+
90+
// OnServiceSynced is called once all the initial event handlers were
91+
// called and the state is fully propagated to local cache.
92+
func (proxier *metaProxier) OnServiceSynced() {
93+
proxier.ipv4Proxier.OnServiceSynced()
94+
proxier.ipv6Proxier.OnServiceSynced()
95+
}
96+
97+
// OnEndpointsAdd is called whenever creation of new endpoints object
98+
// is observed.
99+
func (proxier *metaProxier) OnEndpointsAdd(endpoints *v1.Endpoints) {
100+
ipFamily, err := endpointsIPFamily(endpoints)
101+
if err != nil {
102+
klog.Warningf("failed to add endpoints %s/%s with error %v", endpoints.ObjectMeta.Namespace, endpoints.ObjectMeta.Name, err)
103+
return
104+
}
105+
if *ipFamily == v1.IPv4Protocol {
106+
proxier.ipv4Proxier.OnEndpointsAdd(endpoints)
107+
return
108+
}
109+
proxier.ipv6Proxier.OnEndpointsAdd(endpoints)
110+
}
111+
112+
// OnEndpointsUpdate is called whenever modification of an existing
113+
// endpoints object is observed.
114+
func (proxier *metaProxier) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoints) {
115+
ipFamily, err := endpointsIPFamily(endpoints)
116+
if err != nil {
117+
klog.Warningf("failed to update endpoints %s/%s with error %v", endpoints.ObjectMeta.Namespace, endpoints.ObjectMeta.Name, err)
118+
return
119+
}
120+
121+
if *ipFamily == v1.IPv4Protocol {
122+
proxier.ipv4Proxier.OnEndpointsUpdate(oldEndpoints, endpoints)
123+
return
124+
}
125+
proxier.ipv6Proxier.OnEndpointsUpdate(oldEndpoints, endpoints)
126+
}
127+
128+
// OnEndpointsDelete is called whenever deletion of an existing
129+
// endpoints object is observed.
130+
func (proxier *metaProxier) OnEndpointsDelete(endpoints *v1.Endpoints) {
131+
ipFamily, err := endpointsIPFamily(endpoints)
132+
if err != nil {
133+
klog.Warningf("failed to delete endpoints %s/%s with error %v", endpoints.ObjectMeta.Namespace, endpoints.ObjectMeta.Name, err)
134+
return
135+
}
136+
137+
if *ipFamily == v1.IPv4Protocol {
138+
proxier.ipv4Proxier.OnEndpointsDelete(endpoints)
139+
return
140+
}
141+
proxier.ipv6Proxier.OnEndpointsDelete(endpoints)
142+
}
143+
144+
// OnEndpointsSynced is called once all the initial event handlers
145+
// were called and the state is fully propagated to local cache.
146+
func (proxier *metaProxier) OnEndpointsSynced() {
147+
proxier.ipv4Proxier.OnEndpointsSynced()
148+
proxier.ipv6Proxier.OnEndpointsSynced()
149+
}
150+
151+
// TODO: (khenidak) implement EndpointSlice handling
152+
153+
// OnEndpointSliceAdd is called whenever creation of a new endpoint slice object
154+
// is observed.
155+
func (proxier *metaProxier) OnEndpointSliceAdd(endpointSlice *discovery.EndpointSlice) {
156+
// noop
157+
}
158+
159+
// OnEndpointSliceUpdate is called whenever modification of an existing endpoint
160+
// slice object is observed.
161+
func (proxier *metaProxier) OnEndpointSliceUpdate(_, endpointSlice *discovery.EndpointSlice) {
162+
//noop
163+
}
164+
165+
// OnEndpointSliceDelete is called whenever deletion of an existing endpoint slice
166+
// object is observed.
167+
func (proxier *metaProxier) OnEndpointSliceDelete(endpointSlice *discovery.EndpointSlice) {
168+
//noop
169+
}
170+
171+
// OnEndpointSlicesSynced is called once all the initial event handlers were
172+
// called and the state is fully propagated to local cache.
173+
func (proxier *metaProxier) OnEndpointSlicesSynced() {
174+
//noop
175+
}
176+
177+
// endpointsIPFamily that returns IPFamily of endpoints or error if
178+
// failed to identify the IP family.
179+
func endpointsIPFamily(endpoints *v1.Endpoints) (*v1.IPFamily, error) {
180+
if len(endpoints.Subsets) == 0 {
181+
return nil, fmt.Errorf("failed to identify ipfamily for endpoints (no subsets)")
182+
}
183+
184+
// we only need to work with subset [0],endpoint controller
185+
// ensures that endpoints selected are of the same family.
186+
subset := endpoints.Subsets[0]
187+
if len(subset.Addresses) == 0 {
188+
return nil, fmt.Errorf("failed to identify ipfamily for endpoints (no addresses)")
189+
}
190+
// same apply on addresses
191+
address := subset.Addresses[0]
192+
if len(address.IP) == 0 {
193+
return nil, fmt.Errorf("failed to identify ipfamily for endpoints (address has no ip)")
194+
}
195+
196+
ipv4 := v1.IPv4Protocol
197+
ipv6 := v1.IPv6Protocol
198+
if utilnet.IsIPv6String(address.IP) {
199+
return &ipv6, nil
200+
}
201+
202+
return &ipv4, nil
203+
}

0 commit comments

Comments
 (0)