Skip to content

Commit 9e7f7df

Browse files
committed
Introduce the TriggerTimeTracker util.
It will be used by the Endpoints Controller to compute and export the EndpointsLastChangeTriggerTime annotation, which in turn will be used to compute the In-Cluster Network Programming Latency SLI. See https://github.com/kubernetes/community/blob/master/sig-scalability/slos/network_programming_latency.md for more details.
1 parent d5a1ebb commit 9e7f7df

File tree

5 files changed

+484
-1
lines changed

5 files changed

+484
-1
lines changed

pkg/controller/endpoint/BUILD

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ go_library(
1111
srcs = [
1212
"doc.go",
1313
"endpoints_controller.go",
14+
"trigger_time_tracker.go",
1415
],
1516
importpath = "k8s.io/kubernetes/pkg/controller/endpoint",
1617
deps = [
@@ -39,7 +40,10 @@ go_library(
3940

4041
go_test(
4142
name = "go_default_test",
42-
srcs = ["endpoints_controller_test.go"],
43+
srcs = [
44+
"endpoints_controller_test.go",
45+
"trigger_time_tracker_test.go",
46+
],
4347
embed = [":go_default_library"],
4448
deps = [
4549
"//pkg/api/testapi:go_default_library",

pkg/controller/endpoint/endpoints_controller.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme
101101
e.endpointsLister = endpointsInformer.Lister()
102102
e.endpointsSynced = endpointsInformer.Informer().HasSynced
103103

104+
e.triggerTimeTracker = NewTriggerTimeTracker()
105+
104106
return e
105107
}
106108

@@ -138,6 +140,10 @@ type EndpointController struct {
138140

139141
// workerLoopPeriod is the time between worker runs. The workers process the queue of service and pod changes.
140142
workerLoopPeriod time.Duration
143+
144+
// triggerTimeTracker is an util used to compute and export the EndpointsLastChangeTriggerTime
145+
// annotation.
146+
triggerTimeTracker *TriggerTimeTracker
141147
}
142148

143149
// Run will not return until stopCh is closed. workers determines how many
@@ -399,6 +405,7 @@ func (e *EndpointController) syncService(key string) error {
399405
if err != nil && !errors.IsNotFound(err) {
400406
return err
401407
}
408+
e.triggerTimeTracker.DeleteEndpoints(namespace, name)
402409
return nil
403410
}
404411

@@ -427,6 +434,12 @@ func (e *EndpointController) syncService(key string) error {
427434
}
428435
}
429436

437+
// We call ComputeEndpointsLastChangeTriggerTime here to make sure that the state of the trigger
438+
// time tracker gets updated even if the sync turns out to be no-op and we don't update the
439+
// endpoints object.
440+
endpointsLastChangeTriggerTime := e.triggerTimeTracker.
441+
ComputeEndpointsLastChangeTriggerTime(namespace, name, service, pods)
442+
430443
subsets := []v1.EndpointSubset{}
431444
var totalReadyEps int
432445
var totalNotReadyEps int
@@ -506,6 +519,11 @@ func (e *EndpointController) syncService(key string) error {
506519
newEndpoints.Annotations = make(map[string]string)
507520
}
508521

522+
if !endpointsLastChangeTriggerTime.IsZero() {
523+
newEndpoints.Annotations[v1.EndpointsLastChangeTriggerTime] =
524+
endpointsLastChangeTriggerTime.Format(time.RFC3339Nano)
525+
}
526+
509527
klog.V(4).Infof("Update endpoints for %v/%v, ready: %d not ready: %d", service.Namespace, service.Name, totalReadyEps, totalNotReadyEps)
510528
if createEndpoints {
511529
// No previous endpoints, create them

pkg/controller/endpoint/endpoints_controller_test.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ import (
4444
var alwaysReady = func() bool { return true }
4545
var neverReady = func() bool { return false }
4646
var emptyNodeName string
47+
var triggerTime = time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC)
48+
var triggerTimeString = triggerTime.Format(time.RFC3339Nano)
49+
var oldTriggerTimeString = triggerTime.Add(-time.Hour).Format(time.RFC3339Nano)
4750

4851
func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int) {
4952
for i := 0; i < nPods+nNotReady; i++ {
@@ -1175,3 +1178,94 @@ func TestDetermineNeededServiceUpdates(t *testing.T) {
11751178
}
11761179
}
11771180
}
1181+
1182+
func TestLastTriggerChangeTimeAnnotation(t *testing.T) {
1183+
ns := "other"
1184+
testServer, endpointsHandler := makeTestServer(t, ns)
1185+
defer testServer.Close()
1186+
endpoints := newController(testServer.URL)
1187+
endpoints.endpointsStore.Add(&v1.Endpoints{
1188+
ObjectMeta: metav1.ObjectMeta{
1189+
Name: "foo",
1190+
Namespace: ns,
1191+
ResourceVersion: "1",
1192+
},
1193+
Subsets: []v1.EndpointSubset{{
1194+
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
1195+
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
1196+
}},
1197+
})
1198+
addPods(endpoints.podStore, ns, 1, 1, 0)
1199+
endpoints.serviceStore.Add(&v1.Service{
1200+
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
1201+
Spec: v1.ServiceSpec{
1202+
Selector: map[string]string{},
1203+
Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
1204+
},
1205+
})
1206+
endpoints.syncService(ns + "/foo")
1207+
1208+
endpointsHandler.ValidateRequestCount(t, 1)
1209+
data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
1210+
ObjectMeta: metav1.ObjectMeta{
1211+
Name: "foo",
1212+
Namespace: ns,
1213+
ResourceVersion: "1",
1214+
Annotations: map[string]string{
1215+
v1.EndpointsLastChangeTriggerTime: triggerTimeString,
1216+
},
1217+
},
1218+
Subsets: []v1.EndpointSubset{{
1219+
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
1220+
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
1221+
}},
1222+
})
1223+
endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
1224+
}
1225+
1226+
func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) {
1227+
ns := "other"
1228+
testServer, endpointsHandler := makeTestServer(t, ns)
1229+
defer testServer.Close()
1230+
endpoints := newController(testServer.URL)
1231+
endpoints.endpointsStore.Add(&v1.Endpoints{
1232+
ObjectMeta: metav1.ObjectMeta{
1233+
Name: "foo",
1234+
Namespace: ns,
1235+
ResourceVersion: "1",
1236+
Annotations: map[string]string{
1237+
v1.EndpointsLastChangeTriggerTime: oldTriggerTimeString,
1238+
},
1239+
},
1240+
Subsets: []v1.EndpointSubset{{
1241+
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
1242+
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
1243+
}},
1244+
})
1245+
addPods(endpoints.podStore, ns, 1, 1, 0)
1246+
endpoints.serviceStore.Add(&v1.Service{
1247+
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
1248+
Spec: v1.ServiceSpec{
1249+
Selector: map[string]string{},
1250+
Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
1251+
},
1252+
})
1253+
endpoints.syncService(ns + "/foo")
1254+
1255+
endpointsHandler.ValidateRequestCount(t, 1)
1256+
data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
1257+
ObjectMeta: metav1.ObjectMeta{
1258+
Name: "foo",
1259+
Namespace: ns,
1260+
ResourceVersion: "1",
1261+
Annotations: map[string]string{
1262+
v1.EndpointsLastChangeTriggerTime: triggerTimeString,
1263+
},
1264+
},
1265+
Subsets: []v1.EndpointSubset{{
1266+
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
1267+
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
1268+
}},
1269+
})
1270+
endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
1271+
}
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package endpoint
18+
19+
import (
20+
"sync"
21+
"time"
22+
23+
"k8s.io/api/core/v1"
24+
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
25+
)
26+
27+
// TriggerTimeTracker is a util used to compute the EndpointsLastChangeTriggerTime annotation which
28+
// is exported in the endpoints controller's sync function.
29+
// See the documentation of the EndpointsLastChangeTriggerTime annotation for more details.
30+
//
31+
// Please note that this util may compute a wrong EndpointsLastChangeTriggerTime if a same object
32+
// changes multiple times between two consecutive syncs. We're aware of this limitation but we
33+
// decided to accept it, as fixing it would require a major rewrite of the endpoints controller and
34+
// Informer framework. Such situations, i.e. frequent updates of the same object in a single sync
35+
// period, should be relatively rare and therefore this util should provide a good approximation of
36+
// the EndpointsLastChangeTriggerTime.
37+
// TODO(mm4tt): Implement a more robust mechanism that is not subject to the above limitations.
38+
type TriggerTimeTracker struct {
39+
// endpointsStates is a map, indexed by Endpoints object key, storing the last known Endpoints
40+
// object state observed during the most recent call of the ComputeEndpointsLastChangeTriggerTime
41+
// function.
42+
endpointsStates map[endpointsKey]endpointsState
43+
44+
// mutex guarding the endpointsStates map.
45+
mutex sync.Mutex
46+
}
47+
48+
// NewTriggerTimeTracker creates a new instance of the TriggerTimeTracker.
49+
func NewTriggerTimeTracker() *TriggerTimeTracker {
50+
return &TriggerTimeTracker{
51+
endpointsStates: make(map[endpointsKey]endpointsState),
52+
}
53+
}
54+
55+
// endpointsKey is a key uniquely identifying an Endpoints object.
56+
type endpointsKey struct {
57+
// namespace, name composing a namespaced name - an unique identifier of every Endpoints object.
58+
namespace, name string
59+
}
60+
61+
// endpointsState represents a state of an Endpoints object that is known to this util.
62+
type endpointsState struct {
63+
// lastServiceTriggerTime is a service trigger time observed most recently.
64+
lastServiceTriggerTime time.Time
65+
// lastPodTriggerTimes is a map (Pod name -> time) storing the pod trigger times that were
66+
// observed during the most recent call of the ComputeEndpointsLastChangeTriggerTime function.
67+
lastPodTriggerTimes map[string]time.Time
68+
}
69+
70+
// ComputeEndpointsLastChangeTriggerTime updates the state of the Endpoints object being synced
71+
// and returns the time that should be exported as the EndpointsLastChangeTriggerTime annotation.
72+
//
73+
// If the method returns a 'zero' time the EndpointsLastChangeTriggerTime annotation shouldn't be
74+
// exported.
75+
//
76+
// Please note that this function may compute a wrong EndpointsLastChangeTriggerTime value if the
77+
// same object (pod/service) changes multiple times between two consecutive syncs.
78+
//
79+
// Important: This method is go-routing safe but only when called for different keys. The method
80+
// shouldn't be called concurrently for the same key! This contract is fulfilled in the current
81+
// implementation of the endpoints controller.
82+
func (t *TriggerTimeTracker) ComputeEndpointsLastChangeTriggerTime(
83+
namespace, name string, service *v1.Service, pods []*v1.Pod) time.Time {
84+
85+
key := endpointsKey{namespace: namespace, name: name}
86+
// As there won't be any concurrent calls for the same key, we need to guard access only to the
87+
// endpointsStates map.
88+
t.mutex.Lock()
89+
state, wasKnown := t.endpointsStates[key]
90+
t.mutex.Unlock()
91+
92+
// Update the state before returning.
93+
defer func() {
94+
t.mutex.Lock()
95+
t.endpointsStates[key] = state
96+
t.mutex.Unlock()
97+
}()
98+
99+
// minChangedTriggerTime is the min trigger time of all trigger times that have changed since the
100+
// last sync.
101+
var minChangedTriggerTime time.Time
102+
// TODO(mm4tt): If memory allocation / GC performance impact of recreating map in every call
103+
// turns out to be too expensive, we should consider rewriting this to reuse the existing map.
104+
podTriggerTimes := make(map[string]time.Time)
105+
for _, pod := range pods {
106+
if podTriggerTime := getPodTriggerTime(pod); !podTriggerTime.IsZero() {
107+
podTriggerTimes[pod.Name] = podTriggerTime
108+
if podTriggerTime.After(state.lastPodTriggerTimes[pod.Name]) {
109+
// Pod trigger time has changed since the last sync, update minChangedTriggerTime.
110+
minChangedTriggerTime = min(minChangedTriggerTime, podTriggerTime)
111+
}
112+
}
113+
}
114+
serviceTriggerTime := getServiceTriggerTime(service)
115+
if serviceTriggerTime.After(state.lastServiceTriggerTime) {
116+
// Service trigger time has changed since the last sync, update minChangedTriggerTime.
117+
minChangedTriggerTime = min(minChangedTriggerTime, serviceTriggerTime)
118+
}
119+
120+
state.lastPodTriggerTimes = podTriggerTimes
121+
state.lastServiceTriggerTime = serviceTriggerTime
122+
123+
if !wasKnown {
124+
// New Endpoints object / new Service, use Service creationTimestamp.
125+
return service.CreationTimestamp.Time
126+
} else {
127+
// Regular update of the Endpoints object, return min of changed trigger times.
128+
return minChangedTriggerTime
129+
}
130+
}
131+
132+
// DeleteEndpoints deletes endpoints state stored in this util.
133+
func (t *TriggerTimeTracker) DeleteEndpoints(namespace, name string) {
134+
key := endpointsKey{namespace: namespace, name: name}
135+
t.mutex.Lock()
136+
defer t.mutex.Unlock()
137+
delete(t.endpointsStates, key)
138+
}
139+
140+
// getPodTriggerTime returns the time of the pod change (trigger) that resulted or will result in
141+
// the endpoints object change.
142+
func getPodTriggerTime(pod *v1.Pod) (triggerTime time.Time) {
143+
if readyCondition := podutil.GetPodReadyCondition(pod.Status); readyCondition != nil {
144+
triggerTime = readyCondition.LastTransitionTime.Time
145+
}
146+
// TODO(mm4tt): Implement missing cases: deletionTime set, pod label change
147+
return triggerTime
148+
}
149+
150+
// getServiceTriggerTime returns the time of the service change (trigger) that resulted or will
151+
// result in the endpoints object change.
152+
func getServiceTriggerTime(service *v1.Service) (triggerTime time.Time) {
153+
// TODO(mm4tt): Ideally we should look at service.LastUpdateTime, but such thing doesn't exist.
154+
return service.CreationTimestamp.Time
155+
}
156+
157+
// min returns minimum of the currentMin and newValue or newValue if the currentMin is not set.
158+
func min(currentMin, newValue time.Time) time.Time {
159+
if currentMin.IsZero() || newValue.Before(currentMin) {
160+
return newValue
161+
}
162+
return currentMin
163+
}

0 commit comments

Comments
 (0)