Skip to content

Commit a6b4aa7

Browse files
committed
proxy/conntrack: consolidate flow cleanup
Signed-off-by: Daman Arora <[email protected]>
1 parent b0f823e commit a6b4aa7

File tree

5 files changed

+341
-354
lines changed

5 files changed

+341
-354
lines changed

pkg/proxy/conntrack/cleanup.go

Lines changed: 93 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ limitations under the License.
2020
package conntrack
2121

2222
import (
23+
"github.com/vishvananda/netlink"
24+
"golang.org/x/sys/unix"
25+
2326
v1 "k8s.io/api/core/v1"
2427
"k8s.io/apimachinery/pkg/util/sets"
2528
"k8s.io/klog/v2"
@@ -40,6 +43,7 @@ func CleanStaleEntries(ct Interface, svcPortMap proxy.ServicePortMap,
4043
// may create "black hole" entries for that IP+port. When the service gets endpoints we
4144
// need to delete those entries so further traffic doesn't get dropped.
4245
func deleteStaleServiceConntrackEntries(ct Interface, svcPortMap proxy.ServicePortMap, serviceUpdateResult proxy.UpdateServiceMapResult, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
46+
var filters []netlink.CustomConntrackFilter
4347
conntrackCleanupServiceIPs := serviceUpdateResult.DeletedUDPClusterIPs
4448
conntrackCleanupServiceNodePorts := sets.New[int]()
4549
isIPv6 := false
@@ -48,6 +52,7 @@ func deleteStaleServiceConntrackEntries(ct Interface, svcPortMap proxy.ServicePo
4852
// a UDP service that changes from 0 to non-0 endpoints is newly active.
4953
for _, svcPortName := range endpointsUpdateResult.NewlyActiveUDPServices {
5054
if svcInfo, ok := svcPortMap[svcPortName]; ok {
55+
isIPv6 = netutils.IsIPv6(svcInfo.ClusterIP())
5156
klog.V(4).InfoS("Newly-active UDP service may have stale conntrack entries", "servicePortName", svcPortName)
5257
conntrackCleanupServiceIPs.Insert(svcInfo.ClusterIP().String())
5358
for _, extIP := range svcInfo.ExternalIPs() {
@@ -59,57 +64,120 @@ func deleteStaleServiceConntrackEntries(ct Interface, svcPortMap proxy.ServicePo
5964
nodePort := svcInfo.NodePort()
6065
if svcInfo.Protocol() == v1.ProtocolUDP && nodePort != 0 {
6166
conntrackCleanupServiceNodePorts.Insert(nodePort)
62-
isIPv6 = netutils.IsIPv6(svcInfo.ClusterIP())
6367
}
6468
}
6569
}
6670

6771
klog.V(4).InfoS("Deleting conntrack stale entries for services", "IPs", conntrackCleanupServiceIPs.UnsortedList())
6872
for _, svcIP := range conntrackCleanupServiceIPs.UnsortedList() {
69-
if err := ct.ClearEntriesForIP(svcIP, v1.ProtocolUDP); err != nil {
70-
klog.ErrorS(err, "Failed to delete stale service connections", "IP", svcIP)
71-
}
73+
filters = append(filters, filterForIP(svcIP, v1.ProtocolUDP))
7274
}
7375
klog.V(4).InfoS("Deleting conntrack stale entries for services", "nodePorts", conntrackCleanupServiceNodePorts.UnsortedList())
7476
for _, nodePort := range conntrackCleanupServiceNodePorts.UnsortedList() {
75-
err := ct.ClearEntriesForPort(nodePort, isIPv6, v1.ProtocolUDP)
76-
if err != nil {
77-
klog.ErrorS(err, "Failed to clear udp conntrack", "nodePort", nodePort)
78-
}
77+
filters = append(filters, filterForPort(nodePort, v1.ProtocolUDP))
78+
}
79+
80+
if err := ct.ClearEntries(getUnixIPFamily(isIPv6), filters...); err != nil {
81+
klog.ErrorS(err, "Failed to delete stale service connections")
7982
}
8083
}
8184

8285
// deleteStaleEndpointConntrackEntries takes care of flushing stale conntrack entries related
8386
// to UDP endpoints. After a UDP endpoint is removed we must flush any conntrack entries
8487
// for it so that if the same client keeps sending, the packets will get routed to a new endpoint.
8588
func deleteStaleEndpointConntrackEntries(ct Interface, svcPortMap proxy.ServicePortMap, endpointsUpdateResult proxy.UpdateEndpointsMapResult) {
89+
var filters []netlink.CustomConntrackFilter
90+
isIPv6 := false
8691
for _, epSvcPair := range endpointsUpdateResult.DeletedUDPEndpoints {
8792
if svcInfo, ok := svcPortMap[epSvcPair.ServicePortName]; ok {
93+
isIPv6 = netutils.IsIPv6(svcInfo.ClusterIP())
8894
endpointIP := proxyutil.IPPart(epSvcPair.Endpoint)
8995
nodePort := svcInfo.NodePort()
90-
var err error
9196
if nodePort != 0 {
92-
err = ct.ClearEntriesForPortNAT(endpointIP, nodePort, v1.ProtocolUDP)
93-
if err != nil {
94-
klog.ErrorS(err, "Failed to delete nodeport-related endpoint connections", "servicePortName", epSvcPair.ServicePortName)
95-
}
96-
}
97-
err = ct.ClearEntriesForNAT(svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP)
98-
if err != nil {
99-
klog.ErrorS(err, "Failed to delete endpoint connections", "servicePortName", epSvcPair.ServicePortName)
97+
filters = append(filters, filterForPortNAT(endpointIP, nodePort, v1.ProtocolUDP))
98+
10099
}
100+
filters = append(filters, filterForNAT(svcInfo.ClusterIP().String(), endpointIP, v1.ProtocolUDP))
101101
for _, extIP := range svcInfo.ExternalIPs() {
102-
err := ct.ClearEntriesForNAT(extIP.String(), endpointIP, v1.ProtocolUDP)
103-
if err != nil {
104-
klog.ErrorS(err, "Failed to delete endpoint connections for externalIP", "servicePortName", epSvcPair.ServicePortName, "externalIP", extIP)
105-
}
102+
filters = append(filters, filterForNAT(extIP.String(), endpointIP, v1.ProtocolUDP))
106103
}
107104
for _, lbIP := range svcInfo.LoadBalancerVIPs() {
108-
err := ct.ClearEntriesForNAT(lbIP.String(), endpointIP, v1.ProtocolUDP)
109-
if err != nil {
110-
klog.ErrorS(err, "Failed to delete endpoint connections for LoadBalancerIP", "servicePortName", epSvcPair.ServicePortName, "loadBalancerIP", lbIP)
111-
}
105+
filters = append(filters, filterForNAT(lbIP.String(), endpointIP, v1.ProtocolUDP))
112106
}
113107
}
114108
}
109+
110+
if err := ct.ClearEntries(getUnixIPFamily(isIPv6), filters...); err != nil {
111+
klog.ErrorS(err, "Failed to delete stale endpoint connections")
112+
}
113+
}
114+
115+
// getUnixIPFamily returns the unix IPFamily constant.
116+
func getUnixIPFamily(isIPv6 bool) uint8 {
117+
if isIPv6 {
118+
return unix.AF_INET6
119+
}
120+
return unix.AF_INET
121+
}
122+
123+
// protocolMap maps v1.Protocol to the Assigned Internet Protocol Number.
124+
// https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml
125+
var protocolMap = map[v1.Protocol]uint8{
126+
v1.ProtocolTCP: unix.IPPROTO_TCP,
127+
v1.ProtocolUDP: unix.IPPROTO_UDP,
128+
v1.ProtocolSCTP: unix.IPPROTO_SCTP,
129+
}
130+
131+
// filterForIP returns *conntrackFilter to delete the conntrack entries for connections
132+
// specified by the destination IP (original direction).
133+
func filterForIP(ip string, protocol v1.Protocol) *conntrackFilter {
134+
klog.V(4).InfoS("Adding conntrack filter for cleanup", "org-dst", ip, "protocol", protocol)
135+
return &conntrackFilter{
136+
protocol: protocolMap[protocol],
137+
original: &connectionTuple{
138+
dstIP: netutils.ParseIPSloppy(ip),
139+
},
140+
}
141+
}
142+
143+
// filterForPort returns *conntrackFilter to delete the conntrack entries for connections
144+
// specified by the destination Port (original direction).
145+
func filterForPort(port int, protocol v1.Protocol) *conntrackFilter {
146+
klog.V(4).InfoS("Adding conntrack filter for cleanup", "org-port-dst", port, "protocol", protocol)
147+
return &conntrackFilter{
148+
protocol: protocolMap[protocol],
149+
original: &connectionTuple{
150+
dstPort: uint16(port),
151+
},
152+
}
153+
}
154+
155+
// filterForNAT returns *conntrackFilter to delete the conntrack entries for connections
156+
// specified by the destination IP (original direction) and source IP (reply direction).
157+
func filterForNAT(origin, dest string, protocol v1.Protocol) *conntrackFilter {
158+
klog.V(4).InfoS("Adding conntrack filter for cleanup", "org-dst", origin, "reply-src", dest, "protocol", protocol)
159+
return &conntrackFilter{
160+
protocol: protocolMap[protocol],
161+
original: &connectionTuple{
162+
dstIP: netutils.ParseIPSloppy(origin),
163+
},
164+
reply: &connectionTuple{
165+
srcIP: netutils.ParseIPSloppy(dest),
166+
},
167+
}
168+
}
169+
170+
// filterForPortNAT returns *conntrackFilter to delete the conntrack entries for connections
171+
// specified by the destination Port (original direction) and source IP (reply direction).
172+
func filterForPortNAT(dest string, port int, protocol v1.Protocol) *conntrackFilter {
173+
klog.V(4).InfoS("Adding conntrack filter for cleanup", "org-port-dst", port, "reply-src", dest, "protocol", protocol)
174+
return &conntrackFilter{
175+
protocol: protocolMap[protocol],
176+
original: &connectionTuple{
177+
dstPort: uint16(port),
178+
},
179+
reply: &connectionTuple{
180+
srcIP: netutils.ParseIPSloppy(dest),
181+
},
182+
}
115183
}

pkg/proxy/conntrack/cleanup_test.go

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,15 @@ import (
2424
"reflect"
2525
"testing"
2626

27+
"github.com/stretchr/testify/require"
28+
"github.com/vishvananda/netlink"
29+
2730
v1 "k8s.io/api/core/v1"
2831
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2932
"k8s.io/apimachinery/pkg/types"
3033
"k8s.io/apimachinery/pkg/util/sets"
3134
"k8s.io/kubernetes/pkg/proxy"
35+
netutils "k8s.io/utils/net"
3236
)
3337

3438
const (
@@ -261,3 +265,152 @@ func TestCleanStaleEntries(t *testing.T) {
261265
})
262266
}
263267
}
268+
269+
func TestFilterForIP(t *testing.T) {
270+
testCases := []struct {
271+
name string
272+
ip string
273+
protocol v1.Protocol
274+
expectedFamily netlink.InetFamily
275+
expectedFilter *conntrackFilter
276+
}{
277+
{
278+
name: "ipv4 + UDP",
279+
ip: "10.96.0.10",
280+
protocol: v1.ProtocolUDP,
281+
expectedFilter: &conntrackFilter{
282+
protocol: 17,
283+
original: &connectionTuple{dstIP: netutils.ParseIPSloppy("10.96.0.10")},
284+
},
285+
},
286+
{
287+
name: "ipv6 + TCP",
288+
ip: "2001:db8:1::2",
289+
protocol: v1.ProtocolTCP,
290+
expectedFilter: &conntrackFilter{
291+
protocol: 6,
292+
original: &connectionTuple{dstIP: netutils.ParseIPSloppy("2001:db8:1::2")},
293+
},
294+
},
295+
}
296+
297+
for _, tc := range testCases {
298+
t.Run(tc.name, func(t *testing.T) {
299+
require.Equal(t, tc.expectedFilter, filterForIP(tc.ip, tc.protocol))
300+
})
301+
}
302+
}
303+
304+
func TestFilterForPort(t *testing.T) {
305+
testCases := []struct {
306+
name string
307+
port int
308+
protocol v1.Protocol
309+
expectedFilter *conntrackFilter
310+
}{
311+
{
312+
name: "UDP",
313+
port: 5000,
314+
protocol: v1.ProtocolUDP,
315+
316+
expectedFilter: &conntrackFilter{
317+
protocol: 17,
318+
original: &connectionTuple{dstPort: 5000},
319+
},
320+
},
321+
{
322+
name: "SCTP",
323+
port: 3000,
324+
protocol: v1.ProtocolSCTP,
325+
expectedFilter: &conntrackFilter{
326+
protocol: 132,
327+
original: &connectionTuple{dstPort: 3000},
328+
},
329+
},
330+
}
331+
332+
for _, tc := range testCases {
333+
t.Run(tc.name, func(t *testing.T) {
334+
require.Equal(t, tc.expectedFilter, filterForPort(tc.port, tc.protocol))
335+
})
336+
}
337+
}
338+
339+
func TestFilterForNAT(t *testing.T) {
340+
testCases := []struct {
341+
name string
342+
orig string
343+
dest string
344+
protocol v1.Protocol
345+
expectedFilter *conntrackFilter
346+
}{
347+
{
348+
name: "ipv4 + SCTP",
349+
orig: "10.96.0.10",
350+
dest: "10.244.0.3",
351+
protocol: v1.ProtocolSCTP,
352+
expectedFilter: &conntrackFilter{
353+
protocol: 132,
354+
original: &connectionTuple{dstIP: netutils.ParseIPSloppy("10.96.0.10")},
355+
reply: &connectionTuple{srcIP: netutils.ParseIPSloppy("10.244.0.3")},
356+
},
357+
},
358+
{
359+
name: "ipv6 + UDP",
360+
orig: "2001:db8:1::2",
361+
dest: "4001:ab8::2",
362+
protocol: v1.ProtocolUDP,
363+
expectedFilter: &conntrackFilter{
364+
protocol: 17,
365+
original: &connectionTuple{dstIP: netutils.ParseIPSloppy("2001:db8:1::2")},
366+
reply: &connectionTuple{srcIP: netutils.ParseIPSloppy("4001:ab8::2")},
367+
},
368+
},
369+
}
370+
371+
for _, tc := range testCases {
372+
t.Run(tc.name, func(t *testing.T) {
373+
require.Equal(t, tc.expectedFilter, filterForNAT(tc.orig, tc.dest, tc.protocol))
374+
})
375+
}
376+
}
377+
378+
func TestFilterForPortNAT(t *testing.T) {
379+
testCases := []struct {
380+
name string
381+
dest string
382+
port int
383+
protocol v1.Protocol
384+
expectedFamily netlink.InetFamily
385+
expectedFilter *conntrackFilter
386+
}{
387+
{
388+
name: "ipv4 + TCP",
389+
dest: "10.96.0.10",
390+
port: 80,
391+
protocol: v1.ProtocolTCP,
392+
expectedFilter: &conntrackFilter{
393+
protocol: 6,
394+
original: &connectionTuple{dstPort: 80},
395+
reply: &connectionTuple{srcIP: netutils.ParseIPSloppy("10.96.0.10")},
396+
},
397+
},
398+
{
399+
name: "ipv6 + UDP",
400+
dest: "2001:db8:1::2",
401+
port: 8000,
402+
protocol: v1.ProtocolUDP,
403+
expectedFilter: &conntrackFilter{
404+
protocol: 17,
405+
original: &connectionTuple{dstPort: 8000},
406+
reply: &connectionTuple{srcIP: netutils.ParseIPSloppy("2001:db8:1::2")},
407+
},
408+
},
409+
}
410+
411+
for _, tc := range testCases {
412+
t.Run(tc.name, func(t *testing.T) {
413+
require.Equal(t, tc.expectedFilter, filterForPortNAT(tc.dest, tc.port, tc.protocol))
414+
})
415+
}
416+
}

0 commit comments

Comments
 (0)