@@ -20,6 +20,9 @@ limitations under the License.
20
20
package conntrack
21
21
22
22
import (
23
+ "github.com/vishvananda/netlink"
24
+ "golang.org/x/sys/unix"
25
+
23
26
v1 "k8s.io/api/core/v1"
24
27
"k8s.io/apimachinery/pkg/util/sets"
25
28
"k8s.io/klog/v2"
@@ -29,20 +32,20 @@ import (
29
32
)
30
33
31
34
// CleanStaleEntries takes care of flushing stale conntrack entries for services and endpoints.
32
- func CleanStaleEntries (ct Interface , svcPortMap proxy.ServicePortMap ,
35
+ func CleanStaleEntries (ct Interface , ipFamily v1. IPFamily , svcPortMap proxy.ServicePortMap ,
33
36
serviceUpdateResult proxy.UpdateServiceMapResult , endpointsUpdateResult proxy.UpdateEndpointsMapResult ) {
34
- deleteStaleServiceConntrackEntries (ct , svcPortMap , serviceUpdateResult , endpointsUpdateResult )
35
- deleteStaleEndpointConntrackEntries (ct , svcPortMap , endpointsUpdateResult )
37
+ deleteStaleServiceConntrackEntries (ct , ipFamily , svcPortMap , serviceUpdateResult , endpointsUpdateResult )
38
+ deleteStaleEndpointConntrackEntries (ct , ipFamily , svcPortMap , endpointsUpdateResult )
36
39
}
37
40
38
41
// deleteStaleServiceConntrackEntries takes care of flushing stale conntrack entries related
39
42
// to UDP Service IPs. When a service has no endpoints and we drop traffic to it, conntrack
40
43
// may create "black hole" entries for that IP+port. When the service gets endpoints we
41
44
// need to delete those entries so further traffic doesn't get dropped.
42
- func deleteStaleServiceConntrackEntries (ct Interface , svcPortMap proxy.ServicePortMap , serviceUpdateResult proxy.UpdateServiceMapResult , endpointsUpdateResult proxy.UpdateEndpointsMapResult ) {
45
+ func deleteStaleServiceConntrackEntries (ct Interface , ipFamily v1.IPFamily , svcPortMap proxy.ServicePortMap , serviceUpdateResult proxy.UpdateServiceMapResult , endpointsUpdateResult proxy.UpdateEndpointsMapResult ) {
46
+ var filters []netlink.CustomConntrackFilter
43
47
conntrackCleanupServiceIPs := serviceUpdateResult .DeletedUDPClusterIPs
44
48
conntrackCleanupServiceNodePorts := sets .New [int ]()
45
- isIPv6 := false
46
49
47
50
// merge newly active services gathered from endpointsUpdateResult
48
51
// a UDP service that changes from 0 to non-0 endpoints is newly active.
@@ -59,57 +62,116 @@ func deleteStaleServiceConntrackEntries(ct Interface, svcPortMap proxy.ServicePo
59
62
nodePort := svcInfo .NodePort ()
60
63
if svcInfo .Protocol () == v1 .ProtocolUDP && nodePort != 0 {
61
64
conntrackCleanupServiceNodePorts .Insert (nodePort )
62
- isIPv6 = netutils .IsIPv6 (svcInfo .ClusterIP ())
63
65
}
64
66
}
65
67
}
66
68
67
69
klog .V (4 ).InfoS ("Deleting conntrack stale entries for services" , "IPs" , conntrackCleanupServiceIPs .UnsortedList ())
68
70
for _ , svcIP := range conntrackCleanupServiceIPs .UnsortedList () {
69
- if err := ct .ClearEntriesForIP (svcIP , v1 .ProtocolUDP ); err != nil {
70
- klog .ErrorS (err , "Failed to delete stale service connections" , "IP" , svcIP )
71
- }
71
+ filters = append (filters , filterForIP (svcIP , v1 .ProtocolUDP ))
72
72
}
73
73
klog .V (4 ).InfoS ("Deleting conntrack stale entries for services" , "nodePorts" , conntrackCleanupServiceNodePorts .UnsortedList ())
74
74
for _ , nodePort := range conntrackCleanupServiceNodePorts .UnsortedList () {
75
- err := ct .ClearEntriesForPort (nodePort , isIPv6 , v1 .ProtocolUDP )
76
- if err != nil {
77
- klog .ErrorS (err , "Failed to clear udp conntrack" , "nodePort" , nodePort )
78
- }
75
+ filters = append (filters , filterForPort (nodePort , v1 .ProtocolUDP ))
76
+ }
77
+
78
+ if err := ct .ClearEntries (ipFamilyMap [ipFamily ], filters ... ); err != nil {
79
+ klog .ErrorS (err , "Failed to delete stale service connections" )
79
80
}
80
81
}
81
82
82
83
// deleteStaleEndpointConntrackEntries takes care of flushing stale conntrack entries related
83
84
// to UDP endpoints. After a UDP endpoint is removed we must flush any conntrack entries
84
85
// for it so that if the same client keeps sending, the packets will get routed to a new endpoint.
85
- func deleteStaleEndpointConntrackEntries (ct Interface , svcPortMap proxy.ServicePortMap , endpointsUpdateResult proxy.UpdateEndpointsMapResult ) {
86
+ func deleteStaleEndpointConntrackEntries (ct Interface , ipFamily v1.IPFamily , svcPortMap proxy.ServicePortMap , endpointsUpdateResult proxy.UpdateEndpointsMapResult ) {
87
+ var filters []netlink.CustomConntrackFilter
86
88
for _ , epSvcPair := range endpointsUpdateResult .DeletedUDPEndpoints {
87
89
if svcInfo , ok := svcPortMap [epSvcPair .ServicePortName ]; ok {
88
90
endpointIP := proxyutil .IPPart (epSvcPair .Endpoint )
89
91
nodePort := svcInfo .NodePort ()
90
- var err error
91
92
if nodePort != 0 {
92
- err = ct .ClearEntriesForPortNAT (endpointIP , nodePort , v1 .ProtocolUDP )
93
- if err != nil {
94
- klog .ErrorS (err , "Failed to delete nodeport-related endpoint connections" , "servicePortName" , epSvcPair .ServicePortName )
95
- }
96
- }
97
- err = ct .ClearEntriesForNAT (svcInfo .ClusterIP ().String (), endpointIP , v1 .ProtocolUDP )
98
- if err != nil {
99
- klog .ErrorS (err , "Failed to delete endpoint connections" , "servicePortName" , epSvcPair .ServicePortName )
93
+ filters = append (filters , filterForPortNAT (endpointIP , nodePort , v1 .ProtocolUDP ))
94
+
100
95
}
96
+ filters = append (filters , filterForNAT (svcInfo .ClusterIP ().String (), endpointIP , v1 .ProtocolUDP ))
101
97
for _ , extIP := range svcInfo .ExternalIPs () {
102
- err := ct .ClearEntriesForNAT (extIP .String (), endpointIP , v1 .ProtocolUDP )
103
- if err != nil {
104
- klog .ErrorS (err , "Failed to delete endpoint connections for externalIP" , "servicePortName" , epSvcPair .ServicePortName , "externalIP" , extIP )
105
- }
98
+ filters = append (filters , filterForNAT (extIP .String (), endpointIP , v1 .ProtocolUDP ))
106
99
}
107
100
for _ , lbIP := range svcInfo .LoadBalancerVIPs () {
108
- err := ct .ClearEntriesForNAT (lbIP .String (), endpointIP , v1 .ProtocolUDP )
109
- if err != nil {
110
- klog .ErrorS (err , "Failed to delete endpoint connections for LoadBalancerIP" , "servicePortName" , epSvcPair .ServicePortName , "loadBalancerIP" , lbIP )
111
- }
101
+ filters = append (filters , filterForNAT (lbIP .String (), endpointIP , v1 .ProtocolUDP ))
112
102
}
113
103
}
114
104
}
105
+
106
+ if err := ct .ClearEntries (ipFamilyMap [ipFamily ], filters ... ); err != nil {
107
+ klog .ErrorS (err , "Failed to delete stale endpoint connections" )
108
+ }
109
+ }
110
+
111
+ // ipFamilyMap maps v1.IPFamily to the corresponding unix constant.
112
+ var ipFamilyMap = map [v1.IPFamily ]uint8 {
113
+ v1 .IPv4Protocol : unix .AF_INET ,
114
+ v1 .IPv6Protocol : unix .AF_INET6 ,
115
+ }
116
+
117
+ // protocolMap maps v1.Protocol to the Assigned Internet Protocol Number.
118
+ // https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml
119
+ var protocolMap = map [v1.Protocol ]uint8 {
120
+ v1 .ProtocolTCP : unix .IPPROTO_TCP ,
121
+ v1 .ProtocolUDP : unix .IPPROTO_UDP ,
122
+ v1 .ProtocolSCTP : unix .IPPROTO_SCTP ,
123
+ }
124
+
125
+ // filterForIP returns *conntrackFilter to delete the conntrack entries for connections
126
+ // specified by the destination IP (original direction).
127
+ func filterForIP (ip string , protocol v1.Protocol ) * conntrackFilter {
128
+ klog .V (4 ).InfoS ("Adding conntrack filter for cleanup" , "org-dst" , ip , "protocol" , protocol )
129
+ return & conntrackFilter {
130
+ protocol : protocolMap [protocol ],
131
+ original : & connectionTuple {
132
+ dstIP : netutils .ParseIPSloppy (ip ),
133
+ },
134
+ }
135
+ }
136
+
137
+ // filterForPort returns *conntrackFilter to delete the conntrack entries for connections
138
+ // specified by the destination Port (original direction).
139
+ func filterForPort (port int , protocol v1.Protocol ) * conntrackFilter {
140
+ klog .V (4 ).InfoS ("Adding conntrack filter for cleanup" , "org-port-dst" , port , "protocol" , protocol )
141
+ return & conntrackFilter {
142
+ protocol : protocolMap [protocol ],
143
+ original : & connectionTuple {
144
+ dstPort : uint16 (port ),
145
+ },
146
+ }
147
+ }
148
+
149
+ // filterForNAT returns *conntrackFilter to delete the conntrack entries for connections
150
+ // specified by the destination IP (original direction) and source IP (reply direction).
151
+ func filterForNAT (origin , dest string , protocol v1.Protocol ) * conntrackFilter {
152
+ klog .V (4 ).InfoS ("Adding conntrack filter for cleanup" , "org-dst" , origin , "reply-src" , dest , "protocol" , protocol )
153
+ return & conntrackFilter {
154
+ protocol : protocolMap [protocol ],
155
+ original : & connectionTuple {
156
+ dstIP : netutils .ParseIPSloppy (origin ),
157
+ },
158
+ reply : & connectionTuple {
159
+ srcIP : netutils .ParseIPSloppy (dest ),
160
+ },
161
+ }
162
+ }
163
+
164
+ // filterForPortNAT returns *conntrackFilter to delete the conntrack entries for connections
165
+ // specified by the destination Port (original direction) and source IP (reply direction).
166
+ func filterForPortNAT (dest string , port int , protocol v1.Protocol ) * conntrackFilter {
167
+ klog .V (4 ).InfoS ("Adding conntrack filter for cleanup" , "org-port-dst" , port , "reply-src" , dest , "protocol" , protocol )
168
+ return & conntrackFilter {
169
+ protocol : protocolMap [protocol ],
170
+ original : & connectionTuple {
171
+ dstPort : uint16 (port ),
172
+ },
173
+ reply : & connectionTuple {
174
+ srcIP : netutils .ParseIPSloppy (dest ),
175
+ },
176
+ }
115
177
}
0 commit comments