Skip to content

Commit d195fc2

Browse files
committed
Ensure runtimeCache contains all observed started containers on pod delete
1 parent de18bd6 commit d195fc2

File tree

4 files changed

+142
-0
lines changed

4 files changed

+142
-0
lines changed

pkg/kubelet/BUILD

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ go_library(
2929
"reason_cache.go",
3030
"runonce.go",
3131
"runtime.go",
32+
"time_cache.go",
3233
"util.go",
3334
"volume_host.go",
3435
],
@@ -179,6 +180,7 @@ go_test(
179180
"pod_workers_test.go",
180181
"reason_cache_test.go",
181182
"runonce_test.go",
183+
"time_cache_test.go",
182184
],
183185
embed = [":go_default_library"],
184186
deps = [
@@ -251,6 +253,7 @@ go_test(
251253
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
252254
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
253255
"//staging/src/k8s.io/component-base/version:go_default_library",
256+
"//vendor/github.com/golang/groupcache/lru:go_default_library",
254257
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
255258
"//vendor/github.com/google/cadvisor/info/v2:go_default_library",
256259
"//vendor/github.com/stretchr/testify/assert:go_default_library",

pkg/kubelet/kubelet.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
525525
experimentalHostUserNamespaceDefaulting: utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalHostUserNamespaceDefaultingGate),
526526
keepTerminatedPodVolumes: keepTerminatedPodVolumes,
527527
nodeStatusMaxImages: nodeStatusMaxImages,
528+
lastContainerStartedTime: newTimeCache(),
528529
}
529530

530531
if klet.cloud != nil {
@@ -975,6 +976,9 @@ type Kubelet struct {
975976
// lastStatusReportTime is the time when node status was last reported.
976977
lastStatusReportTime time.Time
977978

979+
// lastContainerStartedTime is the time of the last ContainerStarted event observed per pod
980+
lastContainerStartedTime *timeCache
981+
978982
// syncNodeStatusMux is a lock on updating the node status, because this path is not thread-safe.
979983
// This lock is used by Kubelet.syncNodeStatus function and shouldn't be used anywhere else.
980984
syncNodeStatusMux sync.Mutex
@@ -1655,6 +1659,13 @@ func (kl *Kubelet) deletePod(pod *v1.Pod) error {
16551659
}
16561660
kl.podWorkers.ForgetWorker(pod.UID)
16571661

1662+
// make sure our runtimeCache is at least as fresh as the last container started event we observed.
1663+
// this ensures we correctly send graceful deletion signals to all containers we've reported started.
1664+
if lastContainerStarted, ok := kl.lastContainerStartedTime.Get(pod.UID); ok {
1665+
if err := kl.runtimeCache.ForceUpdateIfOlder(lastContainerStarted); err != nil {
1666+
return fmt.Errorf("error updating containers: %v", err)
1667+
}
1668+
}
16581669
// Runtime cache may not have been updated to with the pod, but it's okay
16591670
// because the periodic cleanup routine will attempt to delete again later.
16601671
runningPods, err := kl.runtimeCache.GetPods()
@@ -1839,6 +1850,12 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
18391850
kl.sourcesReady.AddSource(u.Source)
18401851

18411852
case e := <-plegCh:
1853+
if e.Type == pleg.ContainerStarted {
1854+
// record the most recent time we observed a container start for this pod.
1855+
// this lets us selectively invalidate the runtimeCache when processing a delete for this pod
1856+
// to make sure we don't miss handling graceful termination for containers we reported as having started.
1857+
kl.lastContainerStartedTime.Add(e.ID, time.Now())
1858+
}
18421859
if isSyncPodWorthy(e) {
18431860
// PLEG event for a pod; sync it.
18441861
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {

pkg/kubelet/time_cache.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package kubelet
18+
19+
import (
20+
"sync"
21+
"time"
22+
23+
"github.com/golang/groupcache/lru"
24+
25+
"k8s.io/apimachinery/pkg/types"
26+
)
27+
28+
// timeCache stores a time keyed by uid
29+
type timeCache struct {
30+
lock sync.RWMutex
31+
cache *lru.Cache
32+
}
33+
34+
// maxTimeCacheEntries is the cache entry number in lru cache. 1000 is a proper number
35+
// for our 100 pods per node target. If we support more pods per node in the future, we
36+
// may want to increase the number.
37+
const maxTimeCacheEntries = 1000
38+
39+
func newTimeCache() *timeCache {
40+
return &timeCache{cache: lru.New(maxTimeCacheEntries)}
41+
}
42+
43+
func (c *timeCache) Add(uid types.UID, t time.Time) {
44+
c.lock.Lock()
45+
defer c.lock.Unlock()
46+
c.cache.Add(uid, t)
47+
}
48+
49+
func (c *timeCache) Remove(uid types.UID) {
50+
c.lock.Lock()
51+
defer c.lock.Unlock()
52+
c.cache.Remove(uid)
53+
}
54+
55+
func (c *timeCache) Get(uid types.UID) (time.Time, bool) {
56+
c.lock.RLock()
57+
defer c.lock.RUnlock()
58+
value, ok := c.cache.Get(uid)
59+
if !ok {
60+
return time.Time{}, false
61+
}
62+
t, ok := value.(time.Time)
63+
if !ok {
64+
return time.Time{}, false
65+
}
66+
return t, true
67+
}

pkg/kubelet/time_cache_test.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
Copyright 2020 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package kubelet
18+
19+
import (
20+
"testing"
21+
"time"
22+
23+
"github.com/golang/groupcache/lru"
24+
)
25+
26+
func TestTimeCache(t *testing.T) {
27+
cache := &timeCache{cache: lru.New(2)}
28+
if a, ok := cache.Get("123"); ok {
29+
t.Errorf("expected cache miss, got %v, %v", a, ok)
30+
}
31+
32+
now := time.Now()
33+
soon := now.Add(time.Minute)
34+
cache.Add("now", now)
35+
cache.Add("soon", soon)
36+
37+
if a, ok := cache.Get("now"); !ok || !a.Equal(now) {
38+
t.Errorf("expected cache hit matching %v, got %v, %v", now, a, ok)
39+
}
40+
if a, ok := cache.Get("soon"); !ok || !a.Equal(soon) {
41+
t.Errorf("expected cache hit matching %v, got %v, %v", soon, a, ok)
42+
}
43+
44+
then := now.Add(-time.Minute)
45+
cache.Add("then", then)
46+
if a, ok := cache.Get("now"); ok {
47+
t.Errorf("expected cache miss from oldest evicted value, got %v, %v", a, ok)
48+
}
49+
if a, ok := cache.Get("soon"); !ok || !a.Equal(soon) {
50+
t.Errorf("expected cache hit matching %v, got %v, %v", soon, a, ok)
51+
}
52+
if a, ok := cache.Get("then"); !ok || !a.Equal(then) {
53+
t.Errorf("expected cache hit matching %v, got %v, %v", then, a, ok)
54+
}
55+
}

0 commit comments

Comments
 (0)