Skip to content

Commit bf8dd69

Browse files
authored
Merge pull request kubernetes#73314 from mm4tt/trigger_time_tracker
Introduce the TriggerTimeTracker util in the Endpoints Controller.
2 parents fcb0d60 + 9e7f7df commit bf8dd69

File tree

5 files changed

+484
-1
lines changed

5 files changed

+484
-1
lines changed

pkg/controller/endpoint/BUILD

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ go_library(
1111
srcs = [
1212
"doc.go",
1313
"endpoints_controller.go",
14+
"trigger_time_tracker.go",
1415
],
1516
importpath = "k8s.io/kubernetes/pkg/controller/endpoint",
1617
deps = [
@@ -39,7 +40,10 @@ go_library(
3940

4041
go_test(
4142
name = "go_default_test",
42-
srcs = ["endpoints_controller_test.go"],
43+
srcs = [
44+
"endpoints_controller_test.go",
45+
"trigger_time_tracker_test.go",
46+
],
4347
embed = [":go_default_library"],
4448
deps = [
4549
"//pkg/api/testapi:go_default_library",

pkg/controller/endpoint/endpoints_controller.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,8 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme
101101
e.endpointsLister = endpointsInformer.Lister()
102102
e.endpointsSynced = endpointsInformer.Informer().HasSynced
103103

104+
e.triggerTimeTracker = NewTriggerTimeTracker()
105+
104106
return e
105107
}
106108

@@ -138,6 +140,10 @@ type EndpointController struct {
138140

139141
// workerLoopPeriod is the time between worker runs. The workers process the queue of service and pod changes.
140142
workerLoopPeriod time.Duration
143+
144+
// triggerTimeTracker is an util used to compute and export the EndpointsLastChangeTriggerTime
145+
// annotation.
146+
triggerTimeTracker *TriggerTimeTracker
141147
}
142148

143149
// Run will not return until stopCh is closed. workers determines how many
@@ -399,6 +405,7 @@ func (e *EndpointController) syncService(key string) error {
399405
if err != nil && !errors.IsNotFound(err) {
400406
return err
401407
}
408+
e.triggerTimeTracker.DeleteEndpoints(namespace, name)
402409
return nil
403410
}
404411

@@ -427,6 +434,12 @@ func (e *EndpointController) syncService(key string) error {
427434
}
428435
}
429436

437+
// We call ComputeEndpointsLastChangeTriggerTime here to make sure that the state of the trigger
438+
// time tracker gets updated even if the sync turns out to be no-op and we don't update the
439+
// endpoints object.
440+
endpointsLastChangeTriggerTime := e.triggerTimeTracker.
441+
ComputeEndpointsLastChangeTriggerTime(namespace, name, service, pods)
442+
430443
subsets := []v1.EndpointSubset{}
431444
var totalReadyEps int
432445
var totalNotReadyEps int
@@ -506,6 +519,11 @@ func (e *EndpointController) syncService(key string) error {
506519
newEndpoints.Annotations = make(map[string]string)
507520
}
508521

522+
if !endpointsLastChangeTriggerTime.IsZero() {
523+
newEndpoints.Annotations[v1.EndpointsLastChangeTriggerTime] =
524+
endpointsLastChangeTriggerTime.Format(time.RFC3339Nano)
525+
}
526+
509527
klog.V(4).Infof("Update endpoints for %v/%v, ready: %d not ready: %d", service.Namespace, service.Name, totalReadyEps, totalNotReadyEps)
510528
if createEndpoints {
511529
// No previous endpoints, create them

pkg/controller/endpoint/endpoints_controller_test.go

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@ import (
4444
var alwaysReady = func() bool { return true }
4545
var neverReady = func() bool { return false }
4646
var emptyNodeName string
47+
var triggerTime = time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC)
48+
var triggerTimeString = triggerTime.Format(time.RFC3339Nano)
49+
var oldTriggerTimeString = triggerTime.Add(-time.Hour).Format(time.RFC3339Nano)
4750

4851
func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int) {
4952
for i := 0; i < nPods+nNotReady; i++ {
@@ -1175,3 +1178,94 @@ func TestDetermineNeededServiceUpdates(t *testing.T) {
11751178
}
11761179
}
11771180
}
1181+
1182+
func TestLastTriggerChangeTimeAnnotation(t *testing.T) {
1183+
ns := "other"
1184+
testServer, endpointsHandler := makeTestServer(t, ns)
1185+
defer testServer.Close()
1186+
endpoints := newController(testServer.URL)
1187+
endpoints.endpointsStore.Add(&v1.Endpoints{
1188+
ObjectMeta: metav1.ObjectMeta{
1189+
Name: "foo",
1190+
Namespace: ns,
1191+
ResourceVersion: "1",
1192+
},
1193+
Subsets: []v1.EndpointSubset{{
1194+
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
1195+
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
1196+
}},
1197+
})
1198+
addPods(endpoints.podStore, ns, 1, 1, 0)
1199+
endpoints.serviceStore.Add(&v1.Service{
1200+
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
1201+
Spec: v1.ServiceSpec{
1202+
Selector: map[string]string{},
1203+
Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
1204+
},
1205+
})
1206+
endpoints.syncService(ns + "/foo")
1207+
1208+
endpointsHandler.ValidateRequestCount(t, 1)
1209+
data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
1210+
ObjectMeta: metav1.ObjectMeta{
1211+
Name: "foo",
1212+
Namespace: ns,
1213+
ResourceVersion: "1",
1214+
Annotations: map[string]string{
1215+
v1.EndpointsLastChangeTriggerTime: triggerTimeString,
1216+
},
1217+
},
1218+
Subsets: []v1.EndpointSubset{{
1219+
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
1220+
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
1221+
}},
1222+
})
1223+
endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
1224+
}
1225+
1226+
func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) {
1227+
ns := "other"
1228+
testServer, endpointsHandler := makeTestServer(t, ns)
1229+
defer testServer.Close()
1230+
endpoints := newController(testServer.URL)
1231+
endpoints.endpointsStore.Add(&v1.Endpoints{
1232+
ObjectMeta: metav1.ObjectMeta{
1233+
Name: "foo",
1234+
Namespace: ns,
1235+
ResourceVersion: "1",
1236+
Annotations: map[string]string{
1237+
v1.EndpointsLastChangeTriggerTime: oldTriggerTimeString,
1238+
},
1239+
},
1240+
Subsets: []v1.EndpointSubset{{
1241+
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
1242+
Ports: []v1.EndpointPort{{Port: 1000, Protocol: "TCP"}},
1243+
}},
1244+
})
1245+
addPods(endpoints.podStore, ns, 1, 1, 0)
1246+
endpoints.serviceStore.Add(&v1.Service{
1247+
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, CreationTimestamp: metav1.NewTime(triggerTime)},
1248+
Spec: v1.ServiceSpec{
1249+
Selector: map[string]string{},
1250+
Ports: []v1.ServicePort{{Port: 80, TargetPort: intstr.FromInt(8080), Protocol: "TCP"}},
1251+
},
1252+
})
1253+
endpoints.syncService(ns + "/foo")
1254+
1255+
endpointsHandler.ValidateRequestCount(t, 1)
1256+
data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
1257+
ObjectMeta: metav1.ObjectMeta{
1258+
Name: "foo",
1259+
Namespace: ns,
1260+
ResourceVersion: "1",
1261+
Annotations: map[string]string{
1262+
v1.EndpointsLastChangeTriggerTime: triggerTimeString,
1263+
},
1264+
},
1265+
Subsets: []v1.EndpointSubset{{
1266+
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
1267+
Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}},
1268+
}},
1269+
})
1270+
endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
1271+
}
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
/*
2+
Copyright 2019 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package endpoint
18+
19+
import (
20+
"sync"
21+
"time"
22+
23+
"k8s.io/api/core/v1"
24+
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
25+
)
26+
27+
// TriggerTimeTracker is a util used to compute the EndpointsLastChangeTriggerTime annotation which
28+
// is exported in the endpoints controller's sync function.
29+
// See the documentation of the EndpointsLastChangeTriggerTime annotation for more details.
30+
//
31+
// Please note that this util may compute a wrong EndpointsLastChangeTriggerTime if a same object
32+
// changes multiple times between two consecutive syncs. We're aware of this limitation but we
33+
// decided to accept it, as fixing it would require a major rewrite of the endpoints controller and
34+
// Informer framework. Such situations, i.e. frequent updates of the same object in a single sync
35+
// period, should be relatively rare and therefore this util should provide a good approximation of
36+
// the EndpointsLastChangeTriggerTime.
37+
// TODO(mm4tt): Implement a more robust mechanism that is not subject to the above limitations.
38+
type TriggerTimeTracker struct {
39+
// endpointsStates is a map, indexed by Endpoints object key, storing the last known Endpoints
40+
// object state observed during the most recent call of the ComputeEndpointsLastChangeTriggerTime
41+
// function.
42+
endpointsStates map[endpointsKey]endpointsState
43+
44+
// mutex guarding the endpointsStates map.
45+
mutex sync.Mutex
46+
}
47+
48+
// NewTriggerTimeTracker creates a new instance of the TriggerTimeTracker.
49+
func NewTriggerTimeTracker() *TriggerTimeTracker {
50+
return &TriggerTimeTracker{
51+
endpointsStates: make(map[endpointsKey]endpointsState),
52+
}
53+
}
54+
55+
// endpointsKey is a key uniquely identifying an Endpoints object.
56+
type endpointsKey struct {
57+
// namespace, name composing a namespaced name - an unique identifier of every Endpoints object.
58+
namespace, name string
59+
}
60+
61+
// endpointsState represents a state of an Endpoints object that is known to this util.
62+
type endpointsState struct {
63+
// lastServiceTriggerTime is a service trigger time observed most recently.
64+
lastServiceTriggerTime time.Time
65+
// lastPodTriggerTimes is a map (Pod name -> time) storing the pod trigger times that were
66+
// observed during the most recent call of the ComputeEndpointsLastChangeTriggerTime function.
67+
lastPodTriggerTimes map[string]time.Time
68+
}
69+
70+
// ComputeEndpointsLastChangeTriggerTime updates the state of the Endpoints object being synced
71+
// and returns the time that should be exported as the EndpointsLastChangeTriggerTime annotation.
72+
//
73+
// If the method returns a 'zero' time the EndpointsLastChangeTriggerTime annotation shouldn't be
74+
// exported.
75+
//
76+
// Please note that this function may compute a wrong EndpointsLastChangeTriggerTime value if the
77+
// same object (pod/service) changes multiple times between two consecutive syncs.
78+
//
79+
// Important: This method is go-routing safe but only when called for different keys. The method
80+
// shouldn't be called concurrently for the same key! This contract is fulfilled in the current
81+
// implementation of the endpoints controller.
82+
func (t *TriggerTimeTracker) ComputeEndpointsLastChangeTriggerTime(
83+
namespace, name string, service *v1.Service, pods []*v1.Pod) time.Time {
84+
85+
key := endpointsKey{namespace: namespace, name: name}
86+
// As there won't be any concurrent calls for the same key, we need to guard access only to the
87+
// endpointsStates map.
88+
t.mutex.Lock()
89+
state, wasKnown := t.endpointsStates[key]
90+
t.mutex.Unlock()
91+
92+
// Update the state before returning.
93+
defer func() {
94+
t.mutex.Lock()
95+
t.endpointsStates[key] = state
96+
t.mutex.Unlock()
97+
}()
98+
99+
// minChangedTriggerTime is the min trigger time of all trigger times that have changed since the
100+
// last sync.
101+
var minChangedTriggerTime time.Time
102+
// TODO(mm4tt): If memory allocation / GC performance impact of recreating map in every call
103+
// turns out to be too expensive, we should consider rewriting this to reuse the existing map.
104+
podTriggerTimes := make(map[string]time.Time)
105+
for _, pod := range pods {
106+
if podTriggerTime := getPodTriggerTime(pod); !podTriggerTime.IsZero() {
107+
podTriggerTimes[pod.Name] = podTriggerTime
108+
if podTriggerTime.After(state.lastPodTriggerTimes[pod.Name]) {
109+
// Pod trigger time has changed since the last sync, update minChangedTriggerTime.
110+
minChangedTriggerTime = min(minChangedTriggerTime, podTriggerTime)
111+
}
112+
}
113+
}
114+
serviceTriggerTime := getServiceTriggerTime(service)
115+
if serviceTriggerTime.After(state.lastServiceTriggerTime) {
116+
// Service trigger time has changed since the last sync, update minChangedTriggerTime.
117+
minChangedTriggerTime = min(minChangedTriggerTime, serviceTriggerTime)
118+
}
119+
120+
state.lastPodTriggerTimes = podTriggerTimes
121+
state.lastServiceTriggerTime = serviceTriggerTime
122+
123+
if !wasKnown {
124+
// New Endpoints object / new Service, use Service creationTimestamp.
125+
return service.CreationTimestamp.Time
126+
} else {
127+
// Regular update of the Endpoints object, return min of changed trigger times.
128+
return minChangedTriggerTime
129+
}
130+
}
131+
132+
// DeleteEndpoints deletes endpoints state stored in this util.
133+
func (t *TriggerTimeTracker) DeleteEndpoints(namespace, name string) {
134+
key := endpointsKey{namespace: namespace, name: name}
135+
t.mutex.Lock()
136+
defer t.mutex.Unlock()
137+
delete(t.endpointsStates, key)
138+
}
139+
140+
// getPodTriggerTime returns the time of the pod change (trigger) that resulted or will result in
141+
// the endpoints object change.
142+
func getPodTriggerTime(pod *v1.Pod) (triggerTime time.Time) {
143+
if readyCondition := podutil.GetPodReadyCondition(pod.Status); readyCondition != nil {
144+
triggerTime = readyCondition.LastTransitionTime.Time
145+
}
146+
// TODO(mm4tt): Implement missing cases: deletionTime set, pod label change
147+
return triggerTime
148+
}
149+
150+
// getServiceTriggerTime returns the time of the service change (trigger) that resulted or will
151+
// result in the endpoints object change.
152+
func getServiceTriggerTime(service *v1.Service) (triggerTime time.Time) {
153+
// TODO(mm4tt): Ideally we should look at service.LastUpdateTime, but such thing doesn't exist.
154+
return service.CreationTimestamp.Time
155+
}
156+
157+
// min returns minimum of the currentMin and newValue or newValue if the currentMin is not set.
158+
func min(currentMin, newValue time.Time) time.Time {
159+
if currentMin.IsZero() || newValue.Before(currentMin) {
160+
return newValue
161+
}
162+
return currentMin
163+
}

0 commit comments

Comments
 (0)