Skip to content

Commit 117bece

Browse files
authored
Refactor catalogmetadata client and cache (#1318)
It is now possible to cache errors occured on the cache population. This will be useful when we start proactively populating cache from `ClusterCatalogReconciler` instead of doing this on-demand. This refactoring also moves the responsibility of performing network requests out of the cache backend into the client. Signed-off-by: Mikalai Radchuk <[email protected]>
1 parent ab99642 commit 117bece

File tree

5 files changed

+499
-403
lines changed

5 files changed

+499
-403
lines changed

cmd/manager/main.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,10 +222,10 @@ func main() {
222222
setupLog.Error(err, "unable to create catalogs cache directory")
223223
os.Exit(1)
224224
}
225-
cacheFetcher := cache.NewFilesystemCache(catalogsCachePath, func() (*http.Client, error) {
225+
catalogClientBackend := cache.NewFilesystemCache(catalogsCachePath)
226+
catalogClient := catalogclient.New(catalogClientBackend, func() (*http.Client, error) {
226227
return httputil.BuildHTTPClient(certPoolWatcher)
227228
})
228-
catalogClient := catalogclient.New(cacheFetcher)
229229

230230
resolver := &resolve.CatalogResolver{
231231
WalkCatalogsFunc: resolve.CatalogWalker(
@@ -284,7 +284,7 @@ func main() {
284284

285285
if err = (&controllers.ClusterCatalogReconciler{
286286
Client: cl,
287-
Cache: cacheFetcher,
287+
Cache: catalogClientBackend,
288288
}).SetupWithManager(mgr); err != nil {
289289
setupLog.Error(err, "unable to create controller", "controller", "ClusterCatalog")
290290
os.Exit(1)

internal/catalogmetadata/cache/cache.go

Lines changed: 72 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,24 @@
11
package cache
22

33
import (
4-
"context"
54
"fmt"
5+
"io"
66
"io/fs"
7-
"net/http"
87
"os"
98
"path/filepath"
109
"sync"
1110

12-
catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1"
1311
"github.com/operator-framework/operator-registry/alpha/declcfg"
1412

1513
"github.com/operator-framework/operator-controller/internal/catalogmetadata/client"
1614
)
1715

18-
var _ client.Fetcher = &filesystemCache{}
19-
20-
// NewFilesystemCache returns a client.Fetcher implementation that uses a
21-
// local filesystem to cache Catalog contents. When fetching the Catalog contents
22-
// it will:
23-
// - Check if the Catalog is cached
24-
// - IF !cached it will fetch from the catalogd HTTP server and cache the response
25-
// - IF cached it will verify the cache is up to date. If it is up to date it will return
26-
// the cached contents, if not it will fetch the new contents from the catalogd HTTP
27-
// server and update the cached contents.
28-
func NewFilesystemCache(cachePath string, clientFunc func() (*http.Client, error)) *filesystemCache {
16+
var _ client.Cache = &filesystemCache{}
17+
18+
func NewFilesystemCache(cachePath string) *filesystemCache {
2919
return &filesystemCache{
3020
cachePath: cachePath,
3121
mutex: sync.RWMutex{},
32-
getClient: clientFunc,
3322
cacheDataByCatalogName: map[string]cacheData{},
3423
}
3524
}
@@ -40,75 +29,34 @@ func NewFilesystemCache(cachePath string, clientFunc func() (*http.Client, error
4029
// the cache.
4130
type cacheData struct {
4231
ResolvedRef string
32+
Error error
4333
}
4434

4535
// FilesystemCache is a cache that
4636
// uses the local filesystem for caching
47-
// catalog contents. It will fetch catalog
48-
// contents if the catalog does not already
49-
// exist in the cache.
37+
// catalog contents.
5038
type filesystemCache struct {
5139
mutex sync.RWMutex
5240
cachePath string
53-
getClient func() (*http.Client, error)
5441
cacheDataByCatalogName map[string]cacheData
5542
}
5643

57-
// FetchCatalogContents implements the client.Fetcher interface and
58-
// will fetch the contents for the provided Catalog from the filesystem.
59-
// If the provided Catalog has not yet been cached, it will make a GET
60-
// request to the Catalogd HTTP server to get the Catalog contents and cache
61-
// them. The cache will be updated automatically if a Catalog is noticed to
62-
// have a different resolved image reference.
63-
// The Catalog provided to this function is expected to:
64-
// - Be non-nil
65-
// - Have a non-nil Catalog.Status.ResolvedSource.Image
66-
// This ensures that we are only attempting to fetch catalog contents for Catalog
67-
// resources that have been successfully reconciled, unpacked, and are being served.
68-
// These requirements help ensure that we can rely on status conditions to determine
69-
// when to issue a request to update the cached Catalog contents.
70-
func (fsc *filesystemCache) FetchCatalogContents(ctx context.Context, catalog *catalogd.ClusterCatalog) (fs.FS, error) {
71-
if catalog == nil {
72-
return nil, fmt.Errorf("error: provided catalog must be non-nil")
73-
}
74-
75-
if catalog.Status.ResolvedSource == nil {
76-
return nil, fmt.Errorf("error: catalog %q has a nil status.resolvedSource value", catalog.Name)
77-
}
78-
79-
if catalog.Status.ResolvedSource.Image == nil {
80-
return nil, fmt.Errorf("error: catalog %q has a nil status.resolvedSource.image value", catalog.Name)
81-
}
82-
83-
cacheDir := fsc.cacheDir(catalog.Name)
84-
fsc.mutex.RLock()
85-
if data, ok := fsc.cacheDataByCatalogName[catalog.Name]; ok {
86-
if catalog.Status.ResolvedSource.Image.ResolvedRef == data.ResolvedRef {
87-
fsc.mutex.RUnlock()
88-
return os.DirFS(cacheDir), nil
89-
}
90-
}
91-
fsc.mutex.RUnlock()
92-
93-
req, err := http.NewRequestWithContext(ctx, http.MethodGet, catalog.Status.ContentURL, nil)
94-
if err != nil {
95-
return nil, fmt.Errorf("error forming request: %v", err)
96-
}
97-
98-
client, err := fsc.getClient()
99-
if err != nil {
100-
return nil, fmt.Errorf("error getting HTTP client: %w", err)
101-
}
102-
resp, err := client.Do(req)
103-
if err != nil {
104-
return nil, fmt.Errorf("error performing request: %v", err)
105-
}
106-
defer resp.Body.Close()
107-
108-
if resp.StatusCode != http.StatusOK {
109-
return nil, fmt.Errorf("error: received unexpected response status code %d", resp.StatusCode)
110-
}
111-
44+
// Put writes content from source to the filesystem and stores errToCache
45+
// for a specified catalog name and version (resolvedRef).
46+
//
47+
// Method behaviour is as follows:
48+
// - If successfully populated cache for catalogName and resolvedRef exists,
49+
// errToCache is ignored and existing cache returned with nil error
50+
// - If existing cache for catalogName and resolvedRef exists but
51+
// is populated with an error, update the cache with either
52+
// new content from source or errToCache.
53+
// - If cache doesn't exist, populate it with either new content
54+
// from source or errToCache.
55+
//
56+
// This cache implementation tracks only one version of cache per catalog,
57+
// so Put will override any existing cache on the filesystem for catalogName
58+
// if resolvedRef does not match the one which is already tracked.
59+
func (fsc *filesystemCache) Put(catalogName, resolvedRef string, source io.Reader, errToCache error) (fs.FS, error) {
11260
fsc.mutex.Lock()
11361
defer fsc.mutex.Unlock()
11462

@@ -117,19 +65,35 @@ func (fsc *filesystemCache) FetchCatalogContents(ctx context.Context, catalog *c
11765
// updating this, has no way to tell if the current ref is the
11866
// newest possible ref. If another thread has already updated
11967
// this to be the same value, skip the write logic and return
120-
// the cached contents
121-
if data, ok := fsc.cacheDataByCatalogName[catalog.Name]; ok {
122-
if data.ResolvedRef == catalog.Status.ResolvedSource.Image.ResolvedRef {
123-
return os.DirFS(cacheDir), nil
124-
}
68+
// the cached contents.
69+
if cache, err := fsc.get(catalogName, resolvedRef); err == nil && cache != nil {
70+
// We only return here if the was no error during
71+
// the previous (likely concurrent) cache population attempt.
72+
// If there was an error - we want to try and populate the cache again.
73+
return cache, nil
74+
}
75+
76+
var cacheFS fs.FS
77+
if errToCache == nil {
78+
cacheFS, errToCache = fsc.writeFS(catalogName, source)
12579
}
80+
fsc.cacheDataByCatalogName[catalogName] = cacheData{
81+
ResolvedRef: resolvedRef,
82+
Error: errToCache,
83+
}
84+
85+
return cacheFS, errToCache
86+
}
87+
88+
func (fsc *filesystemCache) writeFS(catalogName string, source io.Reader) (fs.FS, error) {
89+
cacheDir := fsc.cacheDir(catalogName)
12690

127-
tmpDir, err := os.MkdirTemp(fsc.cachePath, fmt.Sprintf(".%s-", catalog.Name))
91+
tmpDir, err := os.MkdirTemp(fsc.cachePath, fmt.Sprintf(".%s-", catalogName))
12892
if err != nil {
12993
return nil, fmt.Errorf("error creating temporary directory to unpack catalog metadata: %v", err)
13094
}
13195

132-
if err := declcfg.WalkMetasReader(resp.Body, func(meta *declcfg.Meta, err error) error {
96+
if err := declcfg.WalkMetasReader(source, func(meta *declcfg.Meta, err error) error {
13397
if err != nil {
13498
return fmt.Errorf("error parsing catalog contents: %v", err)
13599
}
@@ -160,11 +124,35 @@ func (fsc *filesystemCache) FetchCatalogContents(ctx context.Context, catalog *c
160124
return nil, fmt.Errorf("error moving temporary directory to cache directory: %v", err)
161125
}
162126

163-
fsc.cacheDataByCatalogName[catalog.Name] = cacheData{
164-
ResolvedRef: catalog.Status.ResolvedSource.Image.ResolvedRef,
127+
return os.DirFS(cacheDir), nil
128+
}
129+
130+
// Get returns cache for a specified catalog name and version (resolvedRef).
131+
//
132+
// Method behaviour is as follows:
133+
// - If cache exists, it returns a non-nil fs.FS and nil error
134+
// - If cache doesn't exist, it returns nil fs.FS and nil error
135+
// - If there was an error during cache population,
136+
// it returns nil fs.FS and the error from the cache population.
137+
// In other words - cache population errors are also cached.
138+
func (fsc *filesystemCache) Get(catalogName, resolvedRef string) (fs.FS, error) {
139+
fsc.mutex.RLock()
140+
defer fsc.mutex.RUnlock()
141+
return fsc.get(catalogName, resolvedRef)
142+
}
143+
144+
func (fsc *filesystemCache) get(catalogName, resolvedRef string) (fs.FS, error) {
145+
cacheDir := fsc.cacheDir(catalogName)
146+
if data, ok := fsc.cacheDataByCatalogName[catalogName]; ok {
147+
if resolvedRef == data.ResolvedRef {
148+
if data.Error != nil {
149+
return nil, data.Error
150+
}
151+
return os.DirFS(cacheDir), nil
152+
}
165153
}
166154

167-
return os.DirFS(cacheDir), nil
155+
return nil, nil
168156
}
169157

170158
// Remove deletes cache directory for a given catalog from the filesystem

0 commit comments

Comments
 (0)