Skip to content

Commit 41d2445

Browse files
authored
Merge pull request kubernetes#71999 from mm4tt/kube-proxy
Start exporting the in-cluster network programming latency metric.
2 parents 3c2a4f0 + 7141ece commit 41d2445

File tree

5 files changed

+197
-10
lines changed

5 files changed

+197
-10
lines changed

pkg/proxy/endpoints.go

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"reflect"
2222
"strconv"
2323
"sync"
24+
"time"
2425

2526
"k8s.io/klog"
2627

@@ -92,16 +93,20 @@ type EndpointChangeTracker struct {
9293
// isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable.
9394
isIPv6Mode *bool
9495
recorder record.EventRecorder
96+
// Map from the Endpoints namespaced-name to the times of the triggers that caused the endpoints
97+
// object to change. Used to calculate the network-programming-latency.
98+
lastChangeTriggerTimes map[types.NamespacedName][]time.Time
9599
}
96100

97101
// NewEndpointChangeTracker initializes an EndpointsChangeMap
98102
func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, isIPv6Mode *bool, recorder record.EventRecorder) *EndpointChangeTracker {
99103
return &EndpointChangeTracker{
100-
hostname: hostname,
101-
items: make(map[types.NamespacedName]*endpointsChange),
102-
makeEndpointInfo: makeEndpointInfo,
103-
isIPv6Mode: isIPv6Mode,
104-
recorder: recorder,
104+
hostname: hostname,
105+
items: make(map[types.NamespacedName]*endpointsChange),
106+
makeEndpointInfo: makeEndpointInfo,
107+
isIPv6Mode: isIPv6Mode,
108+
recorder: recorder,
109+
lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
105110
}
106111
}
107112

@@ -133,14 +138,38 @@ func (ect *EndpointChangeTracker) Update(previous, current *v1.Endpoints) bool {
133138
change.previous = ect.endpointsToEndpointsMap(previous)
134139
ect.items[namespacedName] = change
135140
}
141+
if t := getLastChangeTriggerTime(endpoints); !t.IsZero() {
142+
ect.lastChangeTriggerTimes[namespacedName] =
143+
append(ect.lastChangeTriggerTimes[namespacedName], t)
144+
}
136145
change.current = ect.endpointsToEndpointsMap(current)
137146
// if change.previous equal to change.current, it means no change
138147
if reflect.DeepEqual(change.previous, change.current) {
139148
delete(ect.items, namespacedName)
149+
// Reset the lastChangeTriggerTimes for the Endpoints object. Given that the network programming
150+
// SLI is defined as the duration between a time of an event and a time when the network was
151+
// programmed to incorporate that event, if there are events that happened between two
152+
// consecutive syncs and that canceled each other out, e.g. pod A added -> pod A deleted,
153+
// there will be no network programming for them and thus no network programming latency metric
154+
// should be exported.
155+
delete(ect.lastChangeTriggerTimes, namespacedName)
140156
}
141157
return len(ect.items) > 0
142158
}
143159

160+
// getLastChangeTriggerTime returns the time.Time value of the EndpointsLastChangeTriggerTime
161+
// annotation stored in the given endpoints object or the "zero" time if the annotation wasn't set
162+
// or was set incorrectly.
163+
func getLastChangeTriggerTime(endpoints *v1.Endpoints) time.Time {
164+
val, err := time.Parse(time.RFC3339Nano, endpoints.Annotations[v1.EndpointsLastChangeTriggerTime])
165+
if err != nil {
166+
klog.Warningf("Error while parsing EndpointsLastChangeTriggerTimeAnnotation: '%s'. Error is %v",
167+
endpoints.Annotations[v1.EndpointsLastChangeTriggerTime], err)
168+
// In case of error val = time.Zero, which is ignored in the upstream code.
169+
}
170+
return val
171+
}
172+
144173
// endpointsChange contains all changes to endpoints that happened since proxy rules were synced. For a single object,
145174
// changes are accumulated, i.e. previous is state from before applying the changes,
146175
// current is state after applying the changes.
@@ -157,14 +186,19 @@ type UpdateEndpointMapResult struct {
157186
StaleEndpoints []ServiceEndpoint
158187
// StaleServiceNames identifies if a service is stale.
159188
StaleServiceNames []ServicePortName
189+
// List of the trigger times for all endpoints objects that changed. It's used to export the
190+
// network programming latency.
191+
LastChangeTriggerTimes []time.Time
160192
}
161193

162194
// UpdateEndpointsMap updates endpointsMap base on the given changes.
163195
func UpdateEndpointsMap(endpointsMap EndpointsMap, changes *EndpointChangeTracker) (result UpdateEndpointMapResult) {
164196
result.StaleEndpoints = make([]ServiceEndpoint, 0)
165197
result.StaleServiceNames = make([]ServicePortName, 0)
198+
result.LastChangeTriggerTimes = make([]time.Time, 0)
166199

167-
endpointsMap.apply(changes, &result.StaleEndpoints, &result.StaleServiceNames)
200+
endpointsMap.apply(
201+
changes, &result.StaleEndpoints, &result.StaleServiceNames, &result.LastChangeTriggerTimes)
168202

169203
// TODO: If this will appear to be computationally expensive, consider
170204
// computing this incrementally similarly to endpointsMap.
@@ -241,7 +275,10 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint
241275
// apply the changes to EndpointsMap and updates stale endpoints and service-endpoints pair. The `staleEndpoints` argument
242276
// is passed in to store the stale udp endpoints and `staleServiceNames` argument is passed in to store the stale udp service.
243277
// The changes map is cleared after applying them.
244-
func (endpointsMap EndpointsMap) apply(changes *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName) {
278+
// In addition it returns (via argument) and resets the lastChangeTriggerTimes for all endpoints
279+
// that were changed and will result in syncing the proxy rules.
280+
func (endpointsMap EndpointsMap) apply(changes *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint,
281+
staleServiceNames *[]ServicePortName, lastChangeTriggerTimes *[]time.Time) {
245282
if changes == nil {
246283
return
247284
}
@@ -253,6 +290,10 @@ func (endpointsMap EndpointsMap) apply(changes *EndpointChangeTracker, staleEndp
253290
detectStaleConnections(change.previous, change.current, staleEndpoints, staleServiceNames)
254291
}
255292
changes.items = make(map[types.NamespacedName]*endpointsChange)
293+
for _, lastChangeTriggerTime := range changes.lastChangeTriggerTimes {
294+
*lastChangeTriggerTimes = append(*lastChangeTriggerTimes, lastChangeTriggerTime...)
295+
}
296+
changes.lastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time)
256297
}
257298

258299
// Merge ensures that the current EndpointsMap contains all <service, endpoints> pairs from the EndpointsMap passed in.

pkg/proxy/endpoints_test.go

Lines changed: 119 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ package proxy
1818

1919
import (
2020
"reflect"
21+
"sort"
2122
"testing"
23+
"time"
2224

2325
"github.com/davecgh/go-spew/spew"
2426

@@ -121,8 +123,9 @@ func TestGetLocalEndpointIPs(t *testing.T) {
121123
func makeTestEndpoints(namespace, name string, eptFunc func(*v1.Endpoints)) *v1.Endpoints {
122124
ept := &v1.Endpoints{
123125
ObjectMeta: metav1.ObjectMeta{
124-
Name: name,
125-
Namespace: namespace,
126+
Name: name,
127+
Namespace: namespace,
128+
Annotations: make(map[string]string),
126129
},
127130
}
128131
eptFunc(ept)
@@ -1268,6 +1271,120 @@ func TestUpdateEndpointsMap(t *testing.T) {
12681271
}
12691272
}
12701273

1274+
func TestLastChangeTriggerTime(t *testing.T) {
1275+
t0 := time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC)
1276+
t1 := t0.Add(time.Second)
1277+
t2 := t1.Add(time.Second)
1278+
t3 := t2.Add(time.Second)
1279+
1280+
createEndpoints := func(namespace, name string, triggerTime time.Time) *v1.Endpoints {
1281+
e := makeTestEndpoints(namespace, name, func(ept *v1.Endpoints) {
1282+
ept.Subsets = []v1.EndpointSubset{{
1283+
Addresses: []v1.EndpointAddress{{IP: "1.1.1.1"}},
1284+
Ports: []v1.EndpointPort{{Port: 11}},
1285+
}}
1286+
})
1287+
e.Annotations[v1.EndpointsLastChangeTriggerTime] = triggerTime.Format(time.RFC3339Nano)
1288+
return e
1289+
}
1290+
1291+
modifyEndpoints := func(endpoints *v1.Endpoints, triggerTime time.Time) *v1.Endpoints {
1292+
e := endpoints.DeepCopy()
1293+
e.Subsets[0].Ports[0].Port++
1294+
e.Annotations[v1.EndpointsLastChangeTriggerTime] = triggerTime.Format(time.RFC3339Nano)
1295+
return e
1296+
}
1297+
1298+
sortTimeSlice := func(data []time.Time) {
1299+
sort.Slice(data, func(i, j int) bool { return data[i].Before(data[j]) })
1300+
}
1301+
1302+
testCases := []struct {
1303+
name string
1304+
scenario func(fp *FakeProxier)
1305+
expected []time.Time
1306+
}{
1307+
{
1308+
name: "Single addEndpoints",
1309+
scenario: func(fp *FakeProxier) {
1310+
e := createEndpoints("ns", "ep1", t0)
1311+
fp.addEndpoints(e)
1312+
},
1313+
expected: []time.Time{t0},
1314+
},
1315+
{
1316+
name: "addEndpoints then updatedEndpoints",
1317+
scenario: func(fp *FakeProxier) {
1318+
e := createEndpoints("ns", "ep1", t0)
1319+
fp.addEndpoints(e)
1320+
1321+
e1 := modifyEndpoints(e, t1)
1322+
fp.updateEndpoints(e, e1)
1323+
},
1324+
expected: []time.Time{t0, t1},
1325+
},
1326+
{
1327+
name: "Add two endpoints then modify one",
1328+
scenario: func(fp *FakeProxier) {
1329+
e1 := createEndpoints("ns", "ep1", t1)
1330+
fp.addEndpoints(e1)
1331+
1332+
e2 := createEndpoints("ns", "ep2", t2)
1333+
fp.addEndpoints(e2)
1334+
1335+
e11 := modifyEndpoints(e1, t3)
1336+
fp.updateEndpoints(e1, e11)
1337+
},
1338+
expected: []time.Time{t1, t2, t3},
1339+
},
1340+
{
1341+
name: "Endpoints without annotation set",
1342+
scenario: func(fp *FakeProxier) {
1343+
e := createEndpoints("ns", "ep1", t1)
1344+
delete(e.Annotations, v1.EndpointsLastChangeTriggerTime)
1345+
fp.addEndpoints(e)
1346+
},
1347+
expected: []time.Time{},
1348+
},
1349+
{
1350+
name: "addEndpoints then deleteEndpoints",
1351+
scenario: func(fp *FakeProxier) {
1352+
e := createEndpoints("ns", "ep1", t1)
1353+
fp.addEndpoints(e)
1354+
fp.deleteEndpoints(e)
1355+
},
1356+
expected: []time.Time{},
1357+
},
1358+
{
1359+
name: "add then delete then add again",
1360+
scenario: func(fp *FakeProxier) {
1361+
e := createEndpoints("ns", "ep1", t1)
1362+
fp.addEndpoints(e)
1363+
fp.deleteEndpoints(e)
1364+
e = modifyEndpoints(e, t2)
1365+
fp.addEndpoints(e)
1366+
},
1367+
expected: []time.Time{t2},
1368+
},
1369+
}
1370+
1371+
for _, tc := range testCases {
1372+
fp := newFakeProxier()
1373+
1374+
tc.scenario(fp)
1375+
1376+
result := UpdateEndpointsMap(fp.endpointsMap, fp.endpointsChanges)
1377+
got := result.LastChangeTriggerTimes
1378+
sortTimeSlice(got)
1379+
sortTimeSlice(tc.expected)
1380+
1381+
if !reflect.DeepEqual(got, tc.expected) {
1382+
t.Errorf("%s: Invalid LastChangeTriggerTimes, expected: %v, got: %v",
1383+
tc.name, tc.expected, result.LastChangeTriggerTimes)
1384+
}
1385+
}
1386+
}
1387+
12711388
func compareEndpointsMaps(t *testing.T, tci int, newMap EndpointsMap, expected map[ServicePortName][]*BaseEndpointInfo) {
12721389
if len(newMap) != len(expected) {
12731390
t.Errorf("[%d] expected %d results, got %d: %v", tci, len(expected), len(newMap), newMap)

pkg/proxy/iptables/proxier.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1341,6 +1341,11 @@ func (proxier *Proxier) syncProxyRules() {
13411341
utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
13421342
return
13431343
}
1344+
for _, lastChangeTriggerTime := range endpointUpdateResult.LastChangeTriggerTimes {
1345+
latency := metrics.SinceInSeconds(lastChangeTriggerTime)
1346+
metrics.NetworkProgrammingLatency.Observe(latency)
1347+
klog.V(4).Infof("Network programming took %f seconds", latency)
1348+
}
13441349

13451350
// Close old local ports and save new ones.
13461351
for k, v := range proxier.portsMap {

pkg/proxy/ipvs/proxier.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1200,6 +1200,11 @@ func (proxier *Proxier) syncProxyRules() {
12001200
utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap)
12011201
return
12021202
}
1203+
for _, lastChangeTriggerTime := range endpointUpdateResult.LastChangeTriggerTimes {
1204+
latency := metrics.SinceInSeconds(lastChangeTriggerTime)
1205+
metrics.NetworkProgrammingLatency.Observe(latency)
1206+
klog.V(4).Infof("Network programming took %f seconds", latency)
1207+
}
12031208

12041209
// Close old local ports and save new ones.
12051210
for k, v := range proxier.portsMap {

pkg/proxy/metrics/metrics.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,34 @@ var (
4545
Buckets: prometheus.ExponentialBuckets(1000, 2, 15),
4646
},
4747
)
48+
49+
// NetworkProgrammingLatency is defined as the time it took to program the network - from the time
50+
// the service or pod has changed to the time the change was propagated and the proper kube-proxy
51+
// rules were synced. Exported for each endpoints object that were part of the rules sync.
52+
// See https://github.com/kubernetes/community/blob/master/sig-scalability/slos/network_programming_latency.md
53+
// Note that the metrics is partially based on the time exported by the endpoints controller on
54+
// the master machine. The measurement may be inaccurate if there is a clock drift between the
55+
// node and master machine.
56+
NetworkProgrammingLatency = prometheus.NewHistogram(
57+
prometheus.HistogramOpts{
58+
Subsystem: kubeProxySubsystem,
59+
Name: "network_programming_latency_seconds",
60+
Help: "In Cluster Network Programming Latency in seconds",
61+
// TODO(mm4tt): Reevaluate buckets before 1.14 release.
62+
// The last bucket will be [0.001s*2^20 ~= 17min, +inf)
63+
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20),
64+
},
65+
)
4866
)
4967

5068
var registerMetricsOnce sync.Once
5169

52-
// RegisterMetrics registers sync proxy rules latency metrics
70+
// RegisterMetrics registers kube-proxy metrics.
5371
func RegisterMetrics() {
5472
registerMetricsOnce.Do(func() {
5573
prometheus.MustRegister(SyncProxyRulesLatency)
5674
prometheus.MustRegister(DeprecatedSyncProxyRulesLatency)
75+
prometheus.MustRegister(NetworkProgrammingLatency)
5776
})
5877
}
5978

0 commit comments

Comments
 (0)