Skip to content

Commit 54eedcd

Browse files
roffemurali-reddy
authored andcommitted
Issue 572 - Graceful termination + Update to go-1.10.8, alpine-3.9 (#706)
* update netlink * update libnetwork to get ipvs stats * update gopkg.lock for libnetwork update * update libnetwork * add cli options * make endpoints delete gracefully * move conntrack flusher * get some order in the mainloop * update to alpine 3.9 & go 1.11.1 * revert to 1.10.3 just update alpine * and revert travis.yml * lock version * test 1.12 * test
1 parent 736757d commit 54eedcd

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+2697
-550
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ services:
33

44
language: go
55
go:
6-
- 1.10.x
6+
- 1.10.8
77

88
branches:
99
only:

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
FROM alpine:3.7
1+
FROM alpine:3.9
22

33
RUN apk add --no-cache \
44
iptables \

Gopkg.lock

Lines changed: 6 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Gopkg.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ required = ["github.com/osrg/gobgp/gobgp"]
6969

7070
[[override]]
7171
name = "github.com/vishvananda/netlink"
72-
version = "1.0.0"
72+
revision = "f504738125a57f35f87fc30fb69b8df75237ccde"
7373

7474
[[constraint]]
7575
branch = "master"
@@ -89,7 +89,7 @@ required = ["github.com/osrg/gobgp/gobgp"]
8989

9090
[[constraint]]
9191
name = "github.com/docker/libnetwork"
92-
revision = "14aa49f99093e1a22e65155b641103762911db8d"
92+
revision = "48f846327bbe6a0dce0c556e8dc9f5bb939d5c16"
9393

9494
[[override]]
9595
revision = "1e2f10eb65743fed02f573d31a4587de09afb20e"

Makefile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ DOCKER=$(if $(or $(IN_DOCKER_GROUP),$(IS_ROOT),$(OSX)),docker,sudo docker)
1717
MAKEFILE_DIR=$(dir $(realpath $(firstword $(MAKEFILE_LIST))))
1818
UPSTREAM_IMPORT_PATH=$(GOPATH)/src/github.com/cloudnativelabs/kube-router/
1919
BUILD_IN_DOCKER?=false
20-
DOCKER_BUILD_IMAGE?=golang:1.10.3-alpine
20+
DOCKER_BUILD_IMAGE?=golang:1.10.8-alpine3.9
2121
ifeq ($(GOARCH), arm)
2222
ARCH_TAG_PREFIX=$(GOARCH)
2323
FILE_ARCH=ARM
@@ -53,9 +53,9 @@ endif
5353
test: gofmt gomoqs ## Runs code quality pipelines (gofmt, tests, coverage, lint, etc)
5454
ifeq "$(BUILD_IN_DOCKER)" "true"
5555
$(DOCKER) run -v $(PWD):/go/src/github.com/cloudnativelabs/kube-router -w /go/src/github.com/cloudnativelabs/kube-router $(DOCKER_BUILD_IMAGE) \
56-
sh -c 'go test github.com/cloudnativelabs/kube-router/cmd/kube-router/ github.com/cloudnativelabs/kube-router/pkg/...'
56+
sh -c 'go test -v -timeout 30s github.com/cloudnativelabs/kube-router/cmd/kube-router/ github.com/cloudnativelabs/kube-router/pkg/...'
5757
else
58-
go test github.com/cloudnativelabs/kube-router/cmd/kube-router/ github.com/cloudnativelabs/kube-router/pkg/...
58+
go test -v -timeout 30s github.com/cloudnativelabs/kube-router/cmd/kube-router/ github.com/cloudnativelabs/kube-router/pkg/...
5959
endif
6060

6161
vagrant-up: export docker=$(DOCKER)

docs/user-guide.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ Usage of kube-router:
5757
-h, --help Print usage information.
5858
--hostname-override string Overrides the NodeName of the node. Set this if kube-router is unable to determine your NodeName automatically.
5959
--iptables-sync-period duration The delay between iptables rule synchronizations (e.g. '5s', '1m'). Must be greater than 0. (default 5m0s)
60+
--ipvs-graceful-period duration The graceful period before removing destinations from IPVS services (e.g. '5s', '1m', '2h22m'). Must be greater than 0. (default 30s)
61+
--ipvs-graceful-termination Enables the experimental IPVS graceful terminaton capability
6062
--ipvs-sync-period duration The delay between ipvs config synchronizations (e.g. '5s', '1m', '2h22m'). Must be greater than 0. (default 5m0s)
6163
--kubeconfig string Path to kubeconfig file with authorization information (the master location is set by the master flag).
6264
--masquerade-all SNAT all traffic to cluster IP/node port.
@@ -285,6 +287,13 @@ If you would like to use `HostPort` functionality below changes are required in
285287

286288
For an e.g manifest please look at [manifest](../daemonset/kubeadm-kuberouter-all-features-hostport.yaml) with necessary changes required for `HostPort` functionality.
287289

290+
## IPVS Graceful termination support
291+
292+
As of 0.2.6 we support experimental graceful termination of IPVS destinations. When possible the pods's TerminationGracePeriodSeconds is used, if it cannot be retrived for some reason
293+
the fallback period is 30 seconds and can be adjusted with `--ipvs-graceful-period` cli-opt
294+
295+
graceful termination works in such a way that when kube-router receives a delete endpoint notification for a service it's weight is adjusted to 0 before getting deleted after he termination grace period has passed or the Active & Inactive connections goes down to 0.
296+
288297
## BGP configuration
289298

290299
[Configuring BGP Peers](bgp.md)
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
package proxy
2+
3+
import (
4+
"fmt"
5+
"os/exec"
6+
"regexp"
7+
"strconv"
8+
"sync"
9+
"syscall"
10+
"time"
11+
12+
"github.com/docker/libnetwork/ipvs"
13+
"github.com/golang/glog"
14+
)
15+
16+
type gracefulQueue struct {
17+
mu sync.Mutex
18+
queue []gracefulRequest
19+
}
20+
21+
type gracefulQueueItem struct {
22+
added time.Time
23+
service *ipvs.Service
24+
}
25+
26+
type gracefulRequest struct {
27+
ipvsSvc *ipvs.Service
28+
ipvsDst *ipvs.Destination
29+
deletionTime time.Time
30+
gracefulTerminationPeriod time.Duration
31+
}
32+
33+
func (nsc *NetworkServicesController) ipvsDeleteDestination(svc *ipvs.Service, dst *ipvs.Destination) error {
34+
// If we have enabled graceful termination set the weight of the destination to 0
35+
// then add it to the queue for graceful termination
36+
if nsc.gracefulTermination {
37+
req := gracefulRequest{
38+
ipvsSvc: svc,
39+
ipvsDst: dst,
40+
deletionTime: time.Now(),
41+
}
42+
dst.Weight = 0
43+
err := nsc.ln.ipvsUpdateDestination(svc, dst)
44+
if err != nil {
45+
return err
46+
}
47+
nsc.addToGracefulQueue(&req)
48+
} else {
49+
err := nsc.ln.ipvsDelDestination(svc, dst)
50+
if err != nil {
51+
return err
52+
}
53+
}
54+
// flush conntrack when Destination for a UDP service changes
55+
if svc.Protocol == syscall.IPPROTO_UDP {
56+
if err := nsc.flushConntrackUDP(svc); err != nil {
57+
glog.Errorf("Failed to flush conntrack: %s", err.Error())
58+
}
59+
}
60+
return nil
61+
}
62+
63+
func (nsc *NetworkServicesController) addToGracefulQueue(req *gracefulRequest) {
64+
nsc.gracefulQueue.mu.Lock()
65+
defer nsc.gracefulQueue.mu.Unlock()
66+
var alreadyExists bool
67+
for _, jobQitem := range nsc.gracefulQueue.queue {
68+
if jobQitem.ipvsSvc.Address.Equal(req.ipvsSvc.Address) && jobQitem.ipvsSvc.Port == req.ipvsSvc.Port && jobQitem.ipvsSvc.Protocol == req.ipvsSvc.Protocol {
69+
if jobQitem.ipvsDst.Address.Equal(req.ipvsDst.Address) && jobQitem.ipvsDst.Port == req.ipvsDst.Port {
70+
glog.V(2).Infof("Endpoint already scheduled for removal %+v %+v %s", *req.ipvsSvc, *req.ipvsDst, req.gracefulTerminationPeriod.String())
71+
alreadyExists = true
72+
break
73+
}
74+
}
75+
}
76+
if !alreadyExists {
77+
// try to get get Termination grace period from the pod, if unsuccesfull use the default timeout
78+
podObj, err := nsc.getPodObjectForEndpoint(req.ipvsDst.Address.String())
79+
if err != nil {
80+
glog.V(1).Infof("Failed to find endpoint with ip: %s err: %s", req.ipvsDst.Address.String(), err.Error())
81+
req.gracefulTerminationPeriod = nsc.gracefulPeriod
82+
} else {
83+
glog.V(1).Infof("Found pod termination grace period %d for pod %s", *podObj.Spec.TerminationGracePeriodSeconds, podObj.Name)
84+
req.gracefulTerminationPeriod = time.Duration(float64(*podObj.Spec.TerminationGracePeriodSeconds) * float64(time.Second))
85+
}
86+
nsc.gracefulQueue.queue = append(nsc.gracefulQueue.queue, *req)
87+
}
88+
}
89+
90+
func (nsc *NetworkServicesController) gracefulSync() {
91+
nsc.gracefulQueue.mu.Lock()
92+
defer nsc.gracefulQueue.mu.Unlock()
93+
var newQueue []gracefulRequest
94+
// Itterate over our queued destination removals one by one, and don't add them back to the queue if they were processed
95+
for _, job := range nsc.gracefulQueue.queue {
96+
if removed := nsc.gracefulDeleteIpvsDestination(job); removed {
97+
continue
98+
}
99+
newQueue = append(newQueue, job)
100+
}
101+
nsc.gracefulQueue.queue = newQueue
102+
}
103+
104+
func (nsc *NetworkServicesController) gracefulDeleteIpvsDestination(req gracefulRequest) bool {
105+
var deleteDestination bool
106+
// Get active and inactive connections for the destination
107+
aConn, iConn, err := nsc.getIpvsDestinationConnStats(req.ipvsSvc, req.ipvsDst)
108+
if err != nil {
109+
glog.V(1).Infof("Could not get connection stats for destination: %s", err.Error())
110+
} else {
111+
// Do we have active or inactive connections to this destination
112+
// if we don't, proceed and delete the destination ahead of graceful period
113+
if aConn == 0 && iConn == 0 {
114+
deleteDestination = true
115+
}
116+
}
117+
118+
// Check if our destinations graceful termination period has passed
119+
if time.Since(req.deletionTime) > req.gracefulTerminationPeriod {
120+
deleteDestination = true
121+
}
122+
123+
//Destination has has one or more conditions for deletion
124+
if deleteDestination {
125+
glog.V(2).Infof("Deleting IPVS destination: %s", ipvsDestinationString(req.ipvsDst))
126+
if err := nsc.ln.ipvsDelDestination(req.ipvsSvc, req.ipvsDst); err != nil {
127+
glog.Errorf("Failed to delete IPVS destination: %s, %s", ipvsDestinationString(req.ipvsDst), err.Error())
128+
}
129+
}
130+
return deleteDestination
131+
}
132+
133+
// getConnStats returns the number of active & inactive connections for the IPVS destination
134+
func (nsc *NetworkServicesController) getIpvsDestinationConnStats(ipvsSvc *ipvs.Service, dest *ipvs.Destination) (int, int, error) {
135+
destStats, err := nsc.ln.ipvsGetDestinations(ipvsSvc)
136+
if err != nil {
137+
return 0, 0, fmt.Errorf("failed to get IPVS destinations for service : %s : %s", ipvsServiceString(ipvsSvc), err.Error())
138+
}
139+
140+
for _, destStat := range destStats {
141+
if destStat.Address.Equal(dest.Address) && destStat.Port == dest.Port {
142+
return destStat.ActiveConnections, destStat.InactiveConnections, nil
143+
}
144+
}
145+
return 0, 0, fmt.Errorf("destination %s not found on IPVS service %s ", ipvsDestinationString(dest), ipvsServiceString(ipvsSvc))
146+
}
147+
148+
// flushConntrackUDP flushes UDP conntrack records for the given service destination
149+
func (nsc *NetworkServicesController) flushConntrackUDP(svc *ipvs.Service) error {
150+
// Conntrack exits with non zero exit code when exiting if 0 flow entries have been deleted, use regex to check output and don't Error when matching
151+
re := regexp.MustCompile("([[:space:]]0 flow entries have been deleted.)")
152+
153+
// Shell out and flush conntrack records
154+
out, err := exec.Command("conntrack", "-D", "--orig-dst", svc.Address.String(), "-p", "udp", "--dport", strconv.Itoa(int(svc.Port))).CombinedOutput()
155+
if err != nil {
156+
if matched := re.MatchString(string(out)); !matched {
157+
return fmt.Errorf("Failed to delete conntrack entry for endpoint: %s:%d due to %s", svc.Address.String(), svc.Port, err.Error())
158+
}
159+
}
160+
glog.V(1).Infof("Deleted conntrack entry for endpoint: %s:%d", svc.Address.String(), svc.Port)
161+
return nil
162+
}

0 commit comments

Comments
 (0)