Skip to content

Commit 366f5cf

Browse files
committed
Cache HelmRepository index files
If implemented, will provide users with a way to cache index files. This addresses issues where the index file is loaded and unmarshalled in concurrent reconciliation resulting in a heavy memory footprint. The caching strategy used is cache aside, and the cache is a k/v store with expiration. The cache number of entries and ttl for entries are configurable. The cache is optional and is disabled by default Signed-off-by: Soule BA <[email protected]>
1 parent 65b7468 commit 366f5cf

File tree

7 files changed

+397
-4
lines changed

7 files changed

+397
-4
lines changed

api/v1beta2/condition_types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,4 +97,6 @@ const (
9797
// ArtifactUpToDateReason signals that an existing Artifact is up-to-date
9898
// with the Source.
9999
ArtifactUpToDateReason string = "ArtifactUpToDate"
100+
// CacheOperationFailedReason signals a failure in cache operation.
101+
CacheOperationFailedReason string = "CacheOperationFailed"
100102
)

controllers/helmchart_controller.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030

3131
securejoin "github.com/cyphar/filepath-securejoin"
3232
helmgetter "helm.sh/helm/v3/pkg/getter"
33+
helmrepo "helm.sh/helm/v3/pkg/repo"
3334
corev1 "k8s.io/api/core/v1"
3435
apierrs "k8s.io/apimachinery/pkg/api/errors"
3536
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -55,6 +56,7 @@ import (
5556
"github.com/fluxcd/pkg/untar"
5657

5758
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
59+
"github.com/fluxcd/source-controller/internal/cache"
5860
serror "github.com/fluxcd/source-controller/internal/error"
5961
"github.com/fluxcd/source-controller/internal/helm/chart"
6062
"github.com/fluxcd/source-controller/internal/helm/getter"
@@ -111,6 +113,9 @@ type HelmChartReconciler struct {
111113
Storage *Storage
112114
Getters helmgetter.Providers
113115
ControllerName string
116+
117+
Cache *cache.Cache
118+
TTL time.Duration
114119
}
115120

116121
func (r *HelmChartReconciler) SetupWithManager(mgr ctrl.Manager) error {
@@ -456,6 +461,15 @@ func (r *HelmChartReconciler) buildFromHelmRepository(ctx context.Context, obj *
456461
}
457462
}
458463

464+
// Try to retrieve the repository index from the cache
465+
if r.Cache != nil {
466+
if index, found := r.Cache.Get(r.Storage.LocalPath(*repo.GetArtifact())); err == nil {
467+
if found {
468+
chartRepo.Index = index.(*helmrepo.IndexFile)
469+
}
470+
}
471+
}
472+
459473
// Construct the chart builder with scoped configuration
460474
cb := chart.NewRemoteBuilder(chartRepo)
461475
opts := chart.BuildOptions{
@@ -479,6 +493,26 @@ func (r *HelmChartReconciler) buildFromHelmRepository(ctx context.Context, obj *
479493
return sreconcile.ResultEmpty, err
480494
}
481495

496+
defer func() {
497+
// Cache the index if it was successfully retrieved
498+
// and the chart was successfully built
499+
if r.Cache != nil && chartRepo.Index != nil {
500+
// The cache key have to be safe in multi-tenancy environments,
501+
// as otherwise it could be used as a vector to bypass the helm repository's authentication.
502+
// Using r.Storage.LocalPath(*repo.GetArtifact() is safe as the path is in the format /<helm-repository-name>/<chart-name>/<filename>.
503+
err := r.Cache.Set(r.Storage.LocalPath(*repo.GetArtifact()), chartRepo.Index, r.TTL)
504+
if err != nil {
505+
r.eventLogf(ctx, obj, events.EventTypeTrace, sourcev1.CacheOperationFailedReason, "failed to cache index: %v", err)
506+
}
507+
508+
}
509+
510+
// Delete the index reference
511+
if chartRepo.Index != nil {
512+
chartRepo.Unload()
513+
}
514+
}()
515+
482516
*b = *build
483517
return sreconcile.ResultSuccess, nil
484518
}

controllers/suite_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
"github.com/fluxcd/pkg/testserver"
3636

3737
sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"
38+
"github.com/fluxcd/source-controller/internal/cache"
3839
// +kubebuilder:scaffold:imports
3940
)
4041

@@ -126,12 +127,15 @@ func TestMain(m *testing.M) {
126127
panic(fmt.Sprintf("Failed to start HelmRepositoryReconciler: %v", err))
127128
}
128129

130+
cache := cache.New(5, 1*time.Second)
129131
if err := (&HelmChartReconciler{
130132
Client: testEnv,
131133
EventRecorder: record.NewFakeRecorder(32),
132134
Metrics: testMetricsH,
133135
Getters: testGetters,
134136
Storage: testStorage,
137+
Cache: cache,
138+
TTL: 1 * time.Second,
135139
}).SetupWithManager(testEnv); err != nil {
136140
panic(fmt.Sprintf("Failed to start HelmRepositoryReconciler: %v", err))
137141
}

internal/cache/cache.go

Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
/*
2+
Copyright 2022 The Flux authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package cache
18+
19+
import (
20+
"fmt"
21+
"runtime"
22+
"sync"
23+
"time"
24+
)
25+
26+
// NOTE: this is heavily based on patrickmn/go-cache:
27+
// https://github.com/patrickmn/go-cache
28+
29+
// Cache is a thread-safe in-memory key/value store.
30+
type Cache struct {
31+
*cache
32+
}
33+
34+
// Item is an item stored in the cache.
35+
type Item struct {
36+
Object interface{}
37+
Expiration int64
38+
}
39+
40+
type cache struct {
41+
// Items holds the elements in the cache.
42+
Items map[string]Item
43+
// Maximum number of items the cache can hold.
44+
MaxItems int
45+
mu sync.RWMutex
46+
janitor *janitor
47+
}
48+
49+
// ItemCount returns the number of items in the cache.
50+
// This may include items that have expired, but have not yet been cleaned up.
51+
func (c *cache) ItemCount() int {
52+
c.mu.RLock()
53+
n := len(c.Items)
54+
c.mu.RUnlock()
55+
return n
56+
}
57+
58+
func (c *cache) set(key string, value interface{}, expiration time.Duration) {
59+
var e int64
60+
if expiration > 0 {
61+
e = time.Now().Add(expiration).UnixNano()
62+
}
63+
64+
c.Items[key] = Item{
65+
Object: value,
66+
Expiration: e,
67+
}
68+
}
69+
70+
// Set adds an item to the cache, replacing any existing item.
71+
// If expiration is zero, the item never expires.
72+
// If the cache is full, Set will return an error.
73+
func (c *cache) Set(key string, value interface{}, expiration time.Duration) error {
74+
c.mu.Lock()
75+
_, found := c.Items[key]
76+
if found {
77+
c.set(key, value, expiration)
78+
c.mu.Unlock()
79+
return nil
80+
}
81+
82+
if c.MaxItems > 0 && len(c.Items) < c.MaxItems {
83+
c.set(key, value, expiration)
84+
c.mu.Unlock()
85+
return nil
86+
}
87+
88+
c.mu.Unlock()
89+
return fmt.Errorf("Cache is full")
90+
}
91+
92+
func (c *cache) Add(key string, value interface{}, expiration time.Duration) error {
93+
c.mu.Lock()
94+
_, found := c.Items[key]
95+
if found {
96+
c.mu.Unlock()
97+
return fmt.Errorf("Item %s already exists", key)
98+
}
99+
100+
if c.MaxItems > 0 && len(c.Items) < c.MaxItems {
101+
c.set(key, value, expiration)
102+
c.mu.Unlock()
103+
return nil
104+
}
105+
106+
c.mu.Unlock()
107+
return fmt.Errorf("Cache is full")
108+
}
109+
110+
func (c *cache) Get(key string) (interface{}, bool) {
111+
c.mu.RLock()
112+
item, found := c.Items[key]
113+
if !found {
114+
c.mu.RUnlock()
115+
return nil, false
116+
}
117+
if item.Expiration > 0 {
118+
if item.Expiration < time.Now().UnixNano() {
119+
c.mu.RUnlock()
120+
return nil, false
121+
}
122+
}
123+
c.mu.RUnlock()
124+
return item.Object, true
125+
}
126+
127+
func (c *cache) Delete(key string) {
128+
c.mu.Lock()
129+
delete(c.Items, key)
130+
c.mu.Unlock()
131+
}
132+
133+
func (c *cache) Clear() {
134+
c.mu.Lock()
135+
c.Items = make(map[string]Item)
136+
c.mu.Unlock()
137+
}
138+
139+
func (c *cache) HasExpired(key string) bool {
140+
c.mu.RLock()
141+
item, ok := c.Items[key]
142+
if !ok {
143+
c.mu.RUnlock()
144+
return true
145+
}
146+
if item.Expiration > 0 {
147+
if item.Expiration < time.Now().UnixNano() {
148+
c.mu.RUnlock()
149+
return true
150+
}
151+
}
152+
c.mu.RUnlock()
153+
return false
154+
}
155+
156+
func (c *cache) SetExpiration(key string, expiration time.Duration) {
157+
c.mu.Lock()
158+
item, ok := c.Items[key]
159+
if !ok {
160+
c.mu.Unlock()
161+
return
162+
}
163+
item.Expiration = time.Now().Add(expiration).UnixNano()
164+
c.mu.Unlock()
165+
}
166+
167+
func (c *cache) GetExpiration(key string) time.Duration {
168+
c.mu.RLock()
169+
item, ok := c.Items[key]
170+
if !ok {
171+
c.mu.RUnlock()
172+
return 0
173+
}
174+
if item.Expiration > 0 {
175+
if item.Expiration < time.Now().UnixNano() {
176+
c.mu.RUnlock()
177+
return 0
178+
}
179+
}
180+
c.mu.RUnlock()
181+
return time.Duration(item.Expiration - time.Now().UnixNano())
182+
}
183+
184+
func (c *cache) DeleteExpired() {
185+
c.mu.Lock()
186+
for k, v := range c.Items {
187+
if v.Expiration > 0 && v.Expiration < time.Now().UnixNano() {
188+
delete(c.Items, k)
189+
}
190+
}
191+
c.mu.Unlock()
192+
}
193+
194+
type janitor struct {
195+
Interval time.Duration
196+
stop chan bool
197+
}
198+
199+
func (j *janitor) Run(c *cache) {
200+
ticker := time.NewTicker(j.Interval)
201+
for {
202+
select {
203+
case <-ticker.C:
204+
c.DeleteExpired()
205+
case <-j.stop:
206+
ticker.Stop()
207+
return
208+
}
209+
}
210+
}
211+
212+
func stopJanitor(c *Cache) {
213+
c.janitor.stop <- true
214+
}
215+
216+
func New(maxItems int, interval time.Duration) *Cache {
217+
c := &cache{
218+
Items: make(map[string]Item),
219+
MaxItems: maxItems,
220+
janitor: &janitor{
221+
Interval: interval,
222+
stop: make(chan bool),
223+
},
224+
}
225+
226+
C := &Cache{c}
227+
228+
if interval > 0 {
229+
go c.janitor.Run(c)
230+
runtime.SetFinalizer(C, stopJanitor)
231+
}
232+
233+
return C
234+
}

0 commit comments

Comments
 (0)