@@ -21,13 +21,13 @@ package conntrack
21
21
22
22
import (
23
23
"fmt"
24
- "strconv"
25
- "strings"
24
+
25
+ "github.com/vishvananda/netlink"
26
+ "golang.org/x/sys/unix"
26
27
27
28
v1 "k8s.io/api/core/v1"
28
29
"k8s.io/klog/v2"
29
- "k8s.io/utils/exec"
30
- utilnet "k8s.io/utils/net"
30
+ netutils "k8s.io/utils/net"
31
31
)
32
32
33
33
// Interface for dealing with conntrack
@@ -49,95 +49,131 @@ type Interface interface {
49
49
ClearEntriesForPortNAT (dest string , port int , protocol v1.Protocol ) error
50
50
}
51
51
52
- // execCT implements Interface by execing the conntrack tool
53
- type execCT struct {
54
- execer exec. Interface
52
+ // netlinkHandler allows consuming real and mockable implementation for testing.
53
+ type netlinkHandler interface {
54
+ ConntrackDeleteFilters (netlink. ConntrackTableType , netlink. InetFamily , ... netlink. CustomConntrackFilter ) ( uint , error )
55
55
}
56
56
57
- var _ Interface = & execCT {}
58
-
59
- func NewExec (execer exec.Interface ) Interface {
60
- return & execCT {execer : execer }
57
+ // conntracker implements Interface by using netlink APIs.
58
+ type conntracker struct {
59
+ handler netlinkHandler
61
60
}
62
61
63
- // noConnectionToDelete is the error string returned by conntrack when no matching connections are found
64
- const noConnectionToDelete = "0 flow entries have been deleted"
62
+ var _ Interface = & conntracker {}
65
63
66
- func protoStr ( proto v1. Protocol ) string {
67
- return strings . ToLower ( string ( proto ) )
64
+ func New () Interface {
65
+ return newConntracker ( & netlink. Handle {} )
68
66
}
69
67
70
- func parametersWithFamily (isIPv6 bool , parameters ... string ) []string {
68
+ func newConntracker (handler netlinkHandler ) Interface {
69
+ return & conntracker {handler : handler }
70
+ }
71
+
72
+ // getNetlinkFamily returns the Netlink IP family constant
73
+ func getNetlinkFamily (isIPv6 bool ) netlink.InetFamily {
71
74
if isIPv6 {
72
- parameters = append ( parameters , "-f" , "ipv6" )
75
+ return unix . AF_INET6
73
76
}
74
- return parameters
77
+ return unix . AF_INET
75
78
}
76
79
77
- // exec executes the conntrack tool using the given parameters
78
- func (ct * execCT ) exec (parameters ... string ) error {
79
- conntrackPath , err := ct .execer .LookPath ("conntrack" )
80
- if err != nil {
81
- return fmt .Errorf ("error looking for path of conntrack: %v" , err )
82
- }
83
- klog .V (4 ).InfoS ("Clearing conntrack entries" , "parameters" , parameters )
84
- output , err := ct .execer .Command (conntrackPath , parameters ... ).CombinedOutput ()
85
- if err != nil {
86
- return fmt .Errorf ("conntrack command returned: %q, error message: %s" , string (output ), err )
87
- }
88
- klog .V (4 ).InfoS ("Conntrack entries deleted" , "output" , string (output ))
89
- return nil
80
+ // protocolMap maps v1.Protocol to the Assigned Internet Protocol Number.
81
+ // https://www.iana.org/assignments/protocol-numbers/protocol-numbers.xhtml
82
+ var protocolMap = map [v1.Protocol ]uint8 {
83
+ v1 .ProtocolTCP : unix .IPPROTO_TCP ,
84
+ v1 .ProtocolUDP : unix .IPPROTO_UDP ,
85
+ v1 .ProtocolSCTP : unix .IPPROTO_SCTP ,
90
86
}
91
87
92
- // ClearEntriesForIP is part of Interface
93
- func (ct * execCT ) ClearEntriesForIP (ip string , protocol v1.Protocol ) error {
94
- parameters := parametersWithFamily (utilnet .IsIPv6String (ip ), "-D" , "--orig-dst" , ip , "-p" , protoStr (protocol ))
95
- err := ct .exec (parameters ... )
96
- if err != nil && ! strings .Contains (err .Error (), noConnectionToDelete ) {
88
+ // ClearEntriesForIP delete the conntrack entries for connections specified by the
89
+ // destination IP(original direction).
90
+ func (ct * conntracker ) ClearEntriesForIP (ip string , protocol v1.Protocol ) error {
91
+ filter := & conntrackFilter {
92
+ protocol : protocolMap [protocol ],
93
+ original : & connectionTuple {
94
+ dstIP : netutils .ParseIPSloppy (ip ),
95
+ },
96
+ }
97
+ klog .V (4 ).InfoS ("Clearing conntrack entries" , "org-dst" , ip , "protocol" , protocol )
98
+
99
+ n , err := ct .handler .ConntrackDeleteFilters (netlink .ConntrackTable , getNetlinkFamily (netutils .IsIPv6String (ip )), filter )
100
+ if err != nil {
97
101
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
98
102
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
99
103
// is expensive to baby-sit all udp connections to kubernetes services.
100
- return fmt .Errorf ("error deleting connection tracking state for UDP service IP: %s, error: %v" , ip , err )
104
+ return fmt .Errorf ("error deleting connection tracking state for %s service IP: %s, error: %w" , protocol , ip , err )
101
105
}
106
+ klog .V (4 ).InfoS ("Cleared conntrack entries" , "count" , n )
102
107
return nil
103
108
}
104
109
105
- // ClearEntriesForPort is part of Interface
106
- func (ct * execCT ) ClearEntriesForPort (port int , isIPv6 bool , protocol v1.Protocol ) error {
110
+ // ClearEntriesForPort delete the conntrack entries for connections specified by the
111
+ // destination Port(original direction) and IPFamily.
112
+ func (ct * conntracker ) ClearEntriesForPort (port int , isIPv6 bool , protocol v1.Protocol ) error {
113
+ filter := & conntrackFilter {
114
+ protocol : protocolMap [protocol ],
115
+ original : & connectionTuple {
116
+ dstPort : uint16 (port ),
117
+ },
118
+ }
107
119
if port <= 0 {
108
120
return fmt .Errorf ("wrong port number. The port number must be greater than zero" )
109
121
}
110
- parameters := parametersWithFamily (isIPv6 , "-D" , "-p" , protoStr (protocol ), "--dport" , strconv .Itoa (port ))
111
- err := ct .exec (parameters ... )
112
- if err != nil && ! strings .Contains (err .Error (), noConnectionToDelete ) {
113
- return fmt .Errorf ("error deleting conntrack entries for UDP port: %d, error: %v" , port , err )
122
+
123
+ klog .V (4 ).InfoS ("Clearing conntrack entries" , "org-port-dst" , port , "protocol" , protocol )
124
+ n , err := ct .handler .ConntrackDeleteFilters (netlink .ConntrackTable , getNetlinkFamily (isIPv6 ), filter )
125
+ if err != nil {
126
+ return fmt .Errorf ("error deleting connection tracking state for %s port: %d, error: %w" , protocol , port , err )
114
127
}
128
+ klog .V (4 ).InfoS ("Cleared conntrack entries" , "count" , n )
115
129
return nil
116
130
}
117
131
118
- // ClearEntriesForNAT is part of Interface
119
- func (ct * execCT ) ClearEntriesForNAT (origin , dest string , protocol v1.Protocol ) error {
120
- parameters := parametersWithFamily (utilnet .IsIPv6String (origin ), "-D" , "--orig-dst" , origin , "--dst-nat" , dest ,
121
- "-p" , protoStr (protocol ))
122
- err := ct .exec (parameters ... )
123
- if err != nil && ! strings .Contains (err .Error (), noConnectionToDelete ) {
132
+ // ClearEntriesForNAT delete the conntrack entries for connections specified by the
133
+ // destination IP(original direction) and source IP(reply direction).
134
+ func (ct * conntracker ) ClearEntriesForNAT (origin , dest string , protocol v1.Protocol ) error {
135
+ filter := & conntrackFilter {
136
+ protocol : protocolMap [protocol ],
137
+ original : & connectionTuple {
138
+ dstIP : netutils .ParseIPSloppy (origin ),
139
+ },
140
+ reply : & connectionTuple {
141
+ srcIP : netutils .ParseIPSloppy (dest ),
142
+ },
143
+ }
144
+
145
+ klog .V (4 ).InfoS ("Clearing conntrack entries" , "org-dst" , origin , "reply-src" , dest , "protocol" , protocol )
146
+ n , err := ct .handler .ConntrackDeleteFilters (netlink .ConntrackTable , getNetlinkFamily (netutils .IsIPv6String (origin )), filter )
147
+ if err != nil {
124
148
// TODO: Better handling for deletion failure. When failure occur, stale udp connection may not get flushed.
125
149
// These stale udp connection will keep black hole traffic. Making this a best effort operation for now, since it
126
150
// is expensive to baby sit all udp connections to kubernetes services.
127
- return fmt .Errorf ("error deleting conntrack entries for UDP peer {%s, %s}, error: %v" , origin , dest , err )
151
+ return fmt .Errorf ("error deleting conntrack entries for %s peer {%s, %s}, error: %w" , protocol , origin , dest , err )
128
152
}
153
+ klog .V (4 ).InfoS ("Cleared conntrack entries" , "count" , n )
129
154
return nil
130
155
}
131
156
132
- // ClearEntriesForPortNAT is part of Interface
133
- func (ct * execCT ) ClearEntriesForPortNAT (dest string , port int , protocol v1.Protocol ) error {
157
+ // ClearEntriesForPortNAT delete the conntrack entries for connections specified by the
158
+ // destination Port(original direction) and source IP(reply direction).
159
+ func (ct * conntracker ) ClearEntriesForPortNAT (dest string , port int , protocol v1.Protocol ) error {
134
160
if port <= 0 {
135
161
return fmt .Errorf ("wrong port number. The port number must be greater than zero" )
136
162
}
137
- parameters := parametersWithFamily (utilnet .IsIPv6String (dest ), "-D" , "-p" , protoStr (protocol ), "--dport" , strconv .Itoa (port ), "--dst-nat" , dest )
138
- err := ct .exec (parameters ... )
139
- if err != nil && ! strings .Contains (err .Error (), noConnectionToDelete ) {
140
- return fmt .Errorf ("error deleting conntrack entries for UDP port: %d, error: %v" , port , err )
163
+ filter := & conntrackFilter {
164
+ protocol : protocolMap [protocol ],
165
+ original : & connectionTuple {
166
+ dstPort : uint16 (port ),
167
+ },
168
+ reply : & connectionTuple {
169
+ srcIP : netutils .ParseIPSloppy (dest ),
170
+ },
171
+ }
172
+ klog .V (4 ).InfoS ("Clearing conntrack entries" , "reply-src" , dest , "org-port-dst" , port , "protocol" , protocol )
173
+ n , err := ct .handler .ConntrackDeleteFilters (netlink .ConntrackTable , getNetlinkFamily (netutils .IsIPv6String (dest )), filter )
174
+ if err != nil {
175
+ return fmt .Errorf ("error deleting conntrack entries for %s port: %d, error: %w" , protocol , port , err )
141
176
}
177
+ klog .V (4 ).InfoS ("Cleared conntrack entries" , "count" , n )
142
178
return nil
143
179
}
0 commit comments