Skip to content

Commit 4247f3d

Browse files
author
abushwang
committed
Add optional TTL-based cleanup for httpcache to reduce disk and memory usage
Signed-off-by: abushwang <abushwang@tencent.com>
1 parent 0ef3e42 commit 4247f3d

File tree

5 files changed

+316
-7
lines changed

5 files changed

+316
-7
lines changed

cache/cache.go

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"os"
2525
"path/filepath"
2626
"sync"
27+
"time"
2728

2829
"github.com/containerd/stargz-snapshotter/util/cacheutil"
2930
"github.com/containerd/stargz-snapshotter/util/namedmutex"
@@ -65,6 +66,10 @@ type DirectoryCacheConfig struct {
6566

6667
// FadvDontNeed forcefully clean fscache pagecache for saving memory.
6768
FadvDontNeed bool
69+
70+
// EntryTTL enables TTL-based cleanup for cached blob files.
71+
// When zero or negative, TTL-based cleanup is disabled.
72+
EntryTTL time.Duration
6873
}
6974

7075
// TODO: contents validation.
@@ -178,8 +183,12 @@ func NewDirectoryCache(directory string, config DirectoryCacheConfig) (BlobCache
178183
bufPool: bufPool,
179184
direct: config.Direct,
180185
fadvDontNeed: config.FadvDontNeed,
186+
entryTTL: config.EntryTTL,
181187
}
182188
dc.syncAdd = config.SyncAdd
189+
190+
dc.startCleanupIfNeeded()
191+
183192
return dc, nil
184193
}
185194

@@ -197,8 +206,11 @@ type directoryCache struct {
197206
direct bool
198207
fadvDontNeed bool
199208

200-
closed bool
201-
closedMu sync.Mutex
209+
closed bool
210+
closedMu sync.Mutex
211+
entryTTL time.Duration
212+
cleanupStopCh chan struct{}
213+
cleanupWg sync.WaitGroup
202214
}
203215

204216
func (dc *directoryCache) Get(key string, opts ...Option) (Reader, error) {
@@ -384,11 +396,19 @@ func (dc *directoryCache) putBuffer(b *bytes.Buffer) {
384396

385397
func (dc *directoryCache) Close() error {
386398
dc.closedMu.Lock()
387-
defer dc.closedMu.Unlock()
388399
if dc.closed {
400+
dc.closedMu.Unlock()
389401
return nil
390402
}
391403
dc.closed = true
404+
stopCh := dc.cleanupStopCh
405+
dc.cleanupStopCh = nil
406+
dc.closedMu.Unlock()
407+
408+
if stopCh != nil {
409+
close(stopCh)
410+
dc.cleanupWg.Wait()
411+
}
392412
return os.RemoveAll(dc.directory)
393413
}
394414

cache/httpcache_ttl.go

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
package cache
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
"io"
7+
"io/fs"
8+
"os"
9+
"path/filepath"
10+
"sync"
11+
"time"
12+
13+
"github.com/containerd/log"
14+
"github.com/containerd/stargz-snapshotter/util/cacheutil"
15+
)
16+
17+
func (dc *directoryCache) startCleanupIfNeeded() {
18+
if dc.entryTTL <= 0 {
19+
return
20+
}
21+
dc.closedMu.Lock()
22+
if dc.closed || dc.cleanupStopCh != nil {
23+
dc.closedMu.Unlock()
24+
return
25+
}
26+
stopCh := make(chan struct{})
27+
dc.cleanupStopCh = stopCh
28+
dc.cleanupWg.Add(1)
29+
dc.closedMu.Unlock()
30+
31+
interval := dc.entryTTL
32+
go func() {
33+
defer dc.cleanupWg.Done()
34+
35+
ticker := time.NewTicker(interval)
36+
defer ticker.Stop()
37+
38+
dc.cleanupOnce()
39+
for {
40+
select {
41+
case <-ticker.C:
42+
dc.cleanupOnce()
43+
case <-stopCh:
44+
return
45+
}
46+
}
47+
}()
48+
}
49+
50+
func (dc *directoryCache) cleanupOnce() {
51+
if dc.entryTTL <= 0 {
52+
return
53+
}
54+
if dc.isClosed() {
55+
return
56+
}
57+
58+
cutoff := time.Now().Add(-dc.entryTTL)
59+
wipBase := dc.wipDirectory
60+
61+
_ = filepath.WalkDir(dc.directory, func(path string, d fs.DirEntry, walkErr error) error {
62+
if walkErr != nil {
63+
return nil
64+
}
65+
if dc.isClosed() {
66+
return fs.SkipAll
67+
}
68+
if d.IsDir() {
69+
if path == wipBase {
70+
return fs.SkipDir
71+
}
72+
return nil
73+
}
74+
75+
info, err := d.Info()
76+
if err != nil {
77+
return nil
78+
}
79+
if info.ModTime().After(cutoff) {
80+
return nil
81+
}
82+
83+
if err := os.Remove(path); err != nil && !os.IsNotExist(err) {
84+
log.L.WithError(err).Debugf("failed to remove expired cache entry %q", path)
85+
}
86+
return nil
87+
})
88+
89+
_ = filepath.WalkDir(wipBase, func(path string, d fs.DirEntry, walkErr error) error {
90+
if walkErr != nil {
91+
return nil
92+
}
93+
if dc.isClosed() {
94+
return fs.SkipAll
95+
}
96+
if d.IsDir() {
97+
return nil
98+
}
99+
100+
info, err := d.Info()
101+
if err != nil {
102+
return nil
103+
}
104+
if info.ModTime().After(cutoff) {
105+
return nil
106+
}
107+
_ = os.Remove(path)
108+
return nil
109+
})
110+
}
111+
112+
func NewMemoryCacheWithTTL(ttl time.Duration) BlobCache {
113+
if ttl <= 0 {
114+
return NewMemoryCache()
115+
}
116+
bufPool := &sync.Pool{
117+
New: func() interface{} {
118+
return new(bytes.Buffer)
119+
},
120+
}
121+
c := cacheutil.NewTTLCache(ttl)
122+
c.OnEvicted = func(key string, value interface{}) {
123+
b := value.(*bytes.Buffer)
124+
b.Reset()
125+
bufPool.Put(b)
126+
}
127+
return &ttlMemoryCache{
128+
c: c,
129+
bufPool: bufPool,
130+
}
131+
}
132+
133+
type ttlMemoryCache struct {
134+
c *cacheutil.TTLCache
135+
bufPool *sync.Pool
136+
}
137+
138+
func (mc *ttlMemoryCache) Get(key string, opts ...Option) (Reader, error) {
139+
v, done, ok := mc.c.Get(key)
140+
if !ok {
141+
return nil, fmt.Errorf("missed cache: %q", key)
142+
}
143+
b := v.(*bytes.Buffer)
144+
return &reader{
145+
ReaderAt: bytes.NewReader(b.Bytes()),
146+
closeFunc: func() error {
147+
done(false)
148+
return nil
149+
},
150+
}, nil
151+
}
152+
153+
func (mc *ttlMemoryCache) Add(key string, opts ...Option) (Writer, error) {
154+
b := mc.bufPool.Get().(*bytes.Buffer)
155+
b.Reset()
156+
return &writer{
157+
WriteCloser: nopWriteCloser(io.Writer(b)),
158+
commitFunc: func() error {
159+
mc.c.Remove(key)
160+
_, done, _ := mc.c.Add(key, b)
161+
done(false)
162+
return nil
163+
},
164+
abortFunc: func() error {
165+
b.Reset()
166+
mc.bufPool.Put(b)
167+
return nil
168+
},
169+
}, nil
170+
}
171+
172+
func (mc *ttlMemoryCache) Close() error {
173+
return nil
174+
}

cache/httpcache_ttl_test.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package cache
2+
3+
import (
4+
"os"
5+
"path/filepath"
6+
"testing"
7+
"time"
8+
)
9+
10+
func TestNewMemoryCacheWithTTL_Disabled(t *testing.T) {
11+
c := NewMemoryCacheWithTTL(0)
12+
if _, ok := c.(*MemoryCache); !ok {
13+
t.Fatalf("expected *MemoryCache when ttl is disabled; got %T", c)
14+
}
15+
}
16+
17+
func TestNewMemoryCacheWithTTL_Expires(t *testing.T) {
18+
ttl := 30 * time.Millisecond
19+
c := NewMemoryCacheWithTTL(ttl)
20+
21+
w, err := c.Add("k1")
22+
if err != nil {
23+
t.Fatalf("Add failed: %v", err)
24+
}
25+
if _, err := w.Write([]byte("abc")); err != nil {
26+
t.Fatalf("Write failed: %v", err)
27+
}
28+
if err := w.Commit(); err != nil {
29+
t.Fatalf("Commit failed: %v", err)
30+
}
31+
_ = w.Close()
32+
33+
r, err := c.Get("k1")
34+
if err != nil {
35+
t.Fatalf("Get failed: %v", err)
36+
}
37+
_ = r.Close()
38+
39+
deadline := time.Now().Add(2 * time.Second)
40+
for {
41+
if time.Now().After(deadline) {
42+
t.Fatalf("entry did not expire within deadline")
43+
}
44+
time.Sleep(10 * time.Millisecond)
45+
if _, err := c.Get("k1"); err == nil {
46+
continue
47+
}
48+
break
49+
}
50+
}
51+
52+
func TestDirectoryCacheCleanupOnce_RemovesExpiredFiles(t *testing.T) {
53+
base := t.TempDir()
54+
wip := filepath.Join(base, "wip")
55+
if err := os.MkdirAll(wip, 0700); err != nil {
56+
t.Fatalf("mkdir wip failed: %v", err)
57+
}
58+
59+
dc := &directoryCache{
60+
directory: base,
61+
wipDirectory: wip,
62+
entryTTL: 100 * time.Millisecond,
63+
}
64+
65+
expired := filepath.Join(base, "aa", "expired")
66+
if err := os.MkdirAll(filepath.Dir(expired), 0700); err != nil {
67+
t.Fatalf("mkdir expired dir failed: %v", err)
68+
}
69+
if err := os.WriteFile(expired, []byte("x"), 0600); err != nil {
70+
t.Fatalf("write expired failed: %v", err)
71+
}
72+
old := time.Now().Add(-2 * time.Second)
73+
if err := os.Chtimes(expired, old, old); err != nil {
74+
t.Fatalf("chtimes expired failed: %v", err)
75+
}
76+
77+
fresh := filepath.Join(base, "bb", "fresh")
78+
if err := os.MkdirAll(filepath.Dir(fresh), 0700); err != nil {
79+
t.Fatalf("mkdir fresh dir failed: %v", err)
80+
}
81+
if err := os.WriteFile(fresh, []byte("y"), 0600); err != nil {
82+
t.Fatalf("write fresh failed: %v", err)
83+
}
84+
85+
expiredWip := filepath.Join(wip, "tmp-expired")
86+
if err := os.WriteFile(expiredWip, []byte("z"), 0600); err != nil {
87+
t.Fatalf("write expired wip failed: %v", err)
88+
}
89+
if err := os.Chtimes(expiredWip, old, old); err != nil {
90+
t.Fatalf("chtimes expired wip failed: %v", err)
91+
}
92+
93+
dc.cleanupOnce()
94+
95+
if _, err := os.Stat(expired); err == nil {
96+
t.Fatalf("expected expired file to be removed")
97+
}
98+
if _, err := os.Stat(expiredWip); err == nil {
99+
t.Fatalf("expected expired wip file to be removed")
100+
}
101+
if _, err := os.Stat(fresh); err != nil {
102+
t.Fatalf("expected fresh file to remain; err=%v", err)
103+
}
104+
}

fs/config/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ type Config struct {
3939
// Other values default to cache them on disk.
4040
HTTPCacheType string `toml:"http_cache_type" json:"http_cache_type"`
4141

42+
// HTTPCacheChunkTTLSec specifies TTL (in sec) for each http cache chunk.
43+
// Default is 0, which disables TTL-based cleanup.
44+
// When zero or negative, TTL-based cleanup is disabled.
45+
HTTPCacheChunkTTLSec int `toml:"http_cache_chunk_ttl_sec" json:"http_cache_chunk_ttl_sec"`
46+
4247
// Type of cache for uncompressed files contents. "memory" stores them on memory. Other values
4348
// default to cache them on disk.
4449
FSCacheType string `toml:"filesystem_cache_type" json:"filesystem_cache_type"`

0 commit comments

Comments
 (0)