Skip to content

Commit f0bae6e

Browse files
authored
Merge pull request kubernetes#71573 from JacobTanenbaum/UDP_conntrack
Correctly Clear conntrack entry on endpoint changes when using nodeport
2 parents e94befa + 144280e commit f0bae6e

File tree

5 files changed

+77
-1
lines changed

5 files changed

+77
-1
lines changed

pkg/proxy/iptables/proxier.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -607,7 +607,13 @@ func (proxier *Proxier) deleteEndpointConnections(connectionMap []proxy.ServiceE
607607
for _, epSvcPair := range connectionMap {
608608
if svcInfo, ok := proxier.serviceMap[epSvcPair.ServicePortName]; ok && svcInfo.GetProtocol() == v1.ProtocolUDP {
609609
endpointIP := utilproxy.IPPart(epSvcPair.Endpoint)
610-
err := conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIPString(), endpointIP, v1.ProtocolUDP)
610+
nodePort := svcInfo.GetNodePort()
611+
var err error
612+
if nodePort != 0 {
613+
err = conntrack.ClearEntriesForPortNAT(proxier.exec, endpointIP, nodePort, v1.ProtocolUDP)
614+
} else {
615+
err = conntrack.ClearEntriesForNAT(proxier.exec, svcInfo.ClusterIPString(), endpointIP, v1.ProtocolUDP)
616+
}
611617
if err != nil {
612618
klog.Errorf("Failed to delete %s endpoint connections, error: %v", epSvcPair.ServicePortName.String(), err)
613619
}

pkg/proxy/service.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ func (info *BaseServiceInfo) GetHealthCheckNodePort() int {
7474
return info.HealthCheckNodePort
7575
}
7676

77+
// GetNodePort is part of the ServicePort interface.
78+
func (info *BaseServiceInfo) GetNodePort() int {
79+
return info.NodePort
80+
}
81+
7782
func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, service *v1.Service) *BaseServiceInfo {
7883
onlyNodeLocalEndpoints := false
7984
if apiservice.RequestsOnlyLocalTraffic(service) {

pkg/proxy/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ type ServicePort interface {
5454
GetProtocol() v1.Protocol
5555
// GetHealthCheckNodePort returns service health check node port if present. If return 0, it means not present.
5656
GetHealthCheckNodePort() int
57+
// GetNodePort returns a service Node port if present. If return 0, it means not present.
58+
GetNodePort() int
5759
}
5860

5961
// Endpoint in an interface which abstracts information about an endpoint.

pkg/util/conntrack/conntrack.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,3 +107,19 @@ func ClearEntriesForNAT(execer exec.Interface, origin, dest string, protocol v1.
107107
}
108108
return nil
109109
}
110+
111+
// ClearEntriesForPortNAT uses the conntrack tool to delete the contrack entries
112+
// for connections specified by the {dest IP, port} pair.
113+
// Known issue:
114+
// https://github.com/kubernetes/kubernetes/issues/59368
115+
func ClearEntriesForPortNAT(execer exec.Interface, dest string, port int, protocol v1.Protocol) error {
116+
if port <= 0 {
117+
return fmt.Errorf("Wrong port number. The port number must be greater then zero")
118+
}
119+
parameters := parametersWithFamily(utilnet.IsIPv6String(dest), "-D", "-p", protoStr(protocol), "--dport", strconv.Itoa(port), "--dst-nat", dest)
120+
err := Exec(execer, parameters...)
121+
if err != nil && !strings.Contains(err.Error(), NoConnectionToDelete) {
122+
return fmt.Errorf("error deleting conntrack entries for UDP port: %d, error: %v", port, err)
123+
}
124+
return nil
125+
}

pkg/util/conntrack/conntrack_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,3 +234,50 @@ func TestDeleteUDPConnections(t *testing.T) {
234234
t.Errorf("Expect command executed %d times, but got %d", svcCount, fexec.CommandCalls)
235235
}
236236
}
237+
238+
func TestClearUDPConntrackForPortNAT(t *testing.T) {
239+
fcmd := fakeexec.FakeCmd{
240+
CombinedOutputScript: []fakeexec.FakeCombinedOutputAction{
241+
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
242+
func() ([]byte, error) {
243+
return []byte(""), fmt.Errorf("conntrack v1.4.2 (conntrack-tools): 0 flow entries have been deleted")
244+
},
245+
func() ([]byte, error) { return []byte("1 flow entries have been deleted"), nil },
246+
},
247+
}
248+
fexec := fakeexec.FakeExec{
249+
CommandScript: []fakeexec.FakeCommandAction{
250+
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
251+
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
252+
func(cmd string, args ...string) exec.Cmd { return fakeexec.InitFakeCmd(&fcmd, cmd, args...) },
253+
},
254+
LookPathFunc: func(cmd string) (string, error) { return cmd, nil },
255+
}
256+
testCases := []struct {
257+
name string
258+
port int
259+
dest string
260+
}{
261+
{
262+
name: "IPv4 success",
263+
port: 30211,
264+
dest: "1.2.3.4",
265+
},
266+
}
267+
svcCount := 0
268+
for i, tc := range testCases {
269+
err := ClearEntriesForPortNAT(&fexec, tc.dest, tc.port, v1.ProtocolUDP)
270+
if err != nil {
271+
t.Errorf("%s test case: unexpected error: %v", tc.name, err)
272+
}
273+
expectCommand := fmt.Sprintf("conntrack -D -p udp --dport %d --dst-nat %s", tc.port, tc.dest) + familyParamStr(utilnet.IsIPv6String(tc.dest))
274+
execCommand := strings.Join(fcmd.CombinedOutputLog[i], " ")
275+
if expectCommand != execCommand {
276+
t.Errorf("%s test case: Expect command: %s, but executed %s", tc.name, expectCommand, execCommand)
277+
}
278+
svcCount++
279+
}
280+
if svcCount != fexec.CommandCalls {
281+
t.Errorf("Expect command executed %d times, but got %d", svcCount, fexec.CommandCalls)
282+
}
283+
}

0 commit comments

Comments
 (0)