diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 72465b4dd..947fca826 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -222,10 +222,10 @@ func main() { setupLog.Error(err, "unable to create catalogs cache directory") os.Exit(1) } - cacheFetcher := cache.NewFilesystemCache(catalogsCachePath, func() (*http.Client, error) { + catalogClientBackend := cache.NewFilesystemCache(catalogsCachePath) + catalogClient := catalogclient.New(catalogClientBackend, func() (*http.Client, error) { return httputil.BuildHTTPClient(certPoolWatcher) }) - catalogClient := catalogclient.New(cacheFetcher) resolver := &resolve.CatalogResolver{ WalkCatalogsFunc: resolve.CatalogWalker( @@ -284,7 +284,7 @@ func main() { if err = (&controllers.ClusterCatalogReconciler{ Client: cl, - Cache: cacheFetcher, + Cache: catalogClientBackend, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "ClusterCatalog") os.Exit(1) diff --git a/internal/catalogmetadata/cache/cache.go b/internal/catalogmetadata/cache/cache.go index e986c9714..70dbe51f8 100644 --- a/internal/catalogmetadata/cache/cache.go +++ b/internal/catalogmetadata/cache/cache.go @@ -1,35 +1,24 @@ package cache import ( - "context" "fmt" + "io" "io/fs" - "net/http" "os" "path/filepath" "sync" - catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1" "github.com/operator-framework/operator-registry/alpha/declcfg" "github.com/operator-framework/operator-controller/internal/catalogmetadata/client" ) -var _ client.Fetcher = &filesystemCache{} - -// NewFilesystemCache returns a client.Fetcher implementation that uses a -// local filesystem to cache Catalog contents. When fetching the Catalog contents -// it will: -// - Check if the Catalog is cached -// - IF !cached it will fetch from the catalogd HTTP server and cache the response -// - IF cached it will verify the cache is up to date. If it is up to date it will return -// the cached contents, if not it will fetch the new contents from the catalogd HTTP -// server and update the cached contents. -func NewFilesystemCache(cachePath string, clientFunc func() (*http.Client, error)) *filesystemCache { +var _ client.Cache = &filesystemCache{} + +func NewFilesystemCache(cachePath string) *filesystemCache { return &filesystemCache{ cachePath: cachePath, mutex: sync.RWMutex{}, - getClient: clientFunc, cacheDataByCatalogName: map[string]cacheData{}, } } @@ -40,75 +29,34 @@ func NewFilesystemCache(cachePath string, clientFunc func() (*http.Client, error // the cache. type cacheData struct { ResolvedRef string + Error error } // FilesystemCache is a cache that // uses the local filesystem for caching -// catalog contents. It will fetch catalog -// contents if the catalog does not already -// exist in the cache. +// catalog contents. type filesystemCache struct { mutex sync.RWMutex cachePath string - getClient func() (*http.Client, error) cacheDataByCatalogName map[string]cacheData } -// FetchCatalogContents implements the client.Fetcher interface and -// will fetch the contents for the provided Catalog from the filesystem. -// If the provided Catalog has not yet been cached, it will make a GET -// request to the Catalogd HTTP server to get the Catalog contents and cache -// them. The cache will be updated automatically if a Catalog is noticed to -// have a different resolved image reference. -// The Catalog provided to this function is expected to: -// - Be non-nil -// - Have a non-nil Catalog.Status.ResolvedSource.Image -// This ensures that we are only attempting to fetch catalog contents for Catalog -// resources that have been successfully reconciled, unpacked, and are being served. -// These requirements help ensure that we can rely on status conditions to determine -// when to issue a request to update the cached Catalog contents. -func (fsc *filesystemCache) FetchCatalogContents(ctx context.Context, catalog *catalogd.ClusterCatalog) (fs.FS, error) { - if catalog == nil { - return nil, fmt.Errorf("error: provided catalog must be non-nil") - } - - if catalog.Status.ResolvedSource == nil { - return nil, fmt.Errorf("error: catalog %q has a nil status.resolvedSource value", catalog.Name) - } - - if catalog.Status.ResolvedSource.Image == nil { - return nil, fmt.Errorf("error: catalog %q has a nil status.resolvedSource.image value", catalog.Name) - } - - cacheDir := fsc.cacheDir(catalog.Name) - fsc.mutex.RLock() - if data, ok := fsc.cacheDataByCatalogName[catalog.Name]; ok { - if catalog.Status.ResolvedSource.Image.ResolvedRef == data.ResolvedRef { - fsc.mutex.RUnlock() - return os.DirFS(cacheDir), nil - } - } - fsc.mutex.RUnlock() - - req, err := http.NewRequestWithContext(ctx, http.MethodGet, catalog.Status.ContentURL, nil) - if err != nil { - return nil, fmt.Errorf("error forming request: %v", err) - } - - client, err := fsc.getClient() - if err != nil { - return nil, fmt.Errorf("error getting HTTP client: %w", err) - } - resp, err := client.Do(req) - if err != nil { - return nil, fmt.Errorf("error performing request: %v", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("error: received unexpected response status code %d", resp.StatusCode) - } - +// Put writes content from source to the filesystem and stores errToCache +// for a specified catalog name and version (resolvedRef). +// +// Method behaviour is as follows: +// - If successfully populated cache for catalogName and resolvedRef exists, +// errToCache is ignored and existing cache returned with nil error +// - If existing cache for catalogName and resolvedRef exists but +// is populated with an error, update the cache with either +// new content from source or errToCache. +// - If cache doesn't exist, populate it with either new content +// from source or errToCache. +// +// This cache implementation tracks only one version of cache per catalog, +// so Put will override any existing cache on the filesystem for catalogName +// if resolvedRef does not match the one which is already tracked. +func (fsc *filesystemCache) Put(catalogName, resolvedRef string, source io.Reader, errToCache error) (fs.FS, error) { fsc.mutex.Lock() defer fsc.mutex.Unlock() @@ -117,19 +65,35 @@ func (fsc *filesystemCache) FetchCatalogContents(ctx context.Context, catalog *c // updating this, has no way to tell if the current ref is the // newest possible ref. If another thread has already updated // this to be the same value, skip the write logic and return - // the cached contents - if data, ok := fsc.cacheDataByCatalogName[catalog.Name]; ok { - if data.ResolvedRef == catalog.Status.ResolvedSource.Image.ResolvedRef { - return os.DirFS(cacheDir), nil - } + // the cached contents. + if cache, err := fsc.get(catalogName, resolvedRef); err == nil && cache != nil { + // We only return here if the was no error during + // the previous (likely concurrent) cache population attempt. + // If there was an error - we want to try and populate the cache again. + return cache, nil + } + + var cacheFS fs.FS + if errToCache == nil { + cacheFS, errToCache = fsc.writeFS(catalogName, source) } + fsc.cacheDataByCatalogName[catalogName] = cacheData{ + ResolvedRef: resolvedRef, + Error: errToCache, + } + + return cacheFS, errToCache +} + +func (fsc *filesystemCache) writeFS(catalogName string, source io.Reader) (fs.FS, error) { + cacheDir := fsc.cacheDir(catalogName) - tmpDir, err := os.MkdirTemp(fsc.cachePath, fmt.Sprintf(".%s-", catalog.Name)) + tmpDir, err := os.MkdirTemp(fsc.cachePath, fmt.Sprintf(".%s-", catalogName)) if err != nil { return nil, fmt.Errorf("error creating temporary directory to unpack catalog metadata: %v", err) } - if err := declcfg.WalkMetasReader(resp.Body, func(meta *declcfg.Meta, err error) error { + if err := declcfg.WalkMetasReader(source, func(meta *declcfg.Meta, err error) error { if err != nil { return fmt.Errorf("error parsing catalog contents: %v", err) } @@ -160,11 +124,35 @@ func (fsc *filesystemCache) FetchCatalogContents(ctx context.Context, catalog *c return nil, fmt.Errorf("error moving temporary directory to cache directory: %v", err) } - fsc.cacheDataByCatalogName[catalog.Name] = cacheData{ - ResolvedRef: catalog.Status.ResolvedSource.Image.ResolvedRef, + return os.DirFS(cacheDir), nil +} + +// Get returns cache for a specified catalog name and version (resolvedRef). +// +// Method behaviour is as follows: +// - If cache exists, it returns a non-nil fs.FS and nil error +// - If cache doesn't exist, it returns nil fs.FS and nil error +// - If there was an error during cache population, +// it returns nil fs.FS and the error from the cache population. +// In other words - cache population errors are also cached. +func (fsc *filesystemCache) Get(catalogName, resolvedRef string) (fs.FS, error) { + fsc.mutex.RLock() + defer fsc.mutex.RUnlock() + return fsc.get(catalogName, resolvedRef) +} + +func (fsc *filesystemCache) get(catalogName, resolvedRef string) (fs.FS, error) { + cacheDir := fsc.cacheDir(catalogName) + if data, ok := fsc.cacheDataByCatalogName[catalogName]; ok { + if resolvedRef == data.ResolvedRef { + if data.Error != nil { + return nil, data.Error + } + return os.DirFS(cacheDir), nil + } } - return os.DirFS(cacheDir), nil + return nil, nil } // Remove deletes cache directory for a given catalog from the filesystem diff --git a/internal/catalogmetadata/cache/cache_test.go b/internal/catalogmetadata/cache/cache_test.go index 74f7d79c0..f898e23e3 100644 --- a/internal/catalogmetadata/cache/cache_test.go +++ b/internal/catalogmetadata/cache/cache_test.go @@ -2,28 +2,22 @@ package cache_test import ( "bytes" - "context" "encoding/json" "errors" "fmt" "io" "io/fs" - "maps" - "net/http" "os" "path/filepath" + "strings" "testing" "testing/fstest" "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" - catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1" - "github.com/operator-framework/operator-registry/alpha/declcfg" - "github.com/operator-framework/operator-controller/internal/catalogmetadata/cache" ) @@ -58,233 +52,105 @@ const ( }` ) -var defaultFS = fstest.MapFS{ - "fake1/olm.package/fake1.json": &fstest.MapFile{Data: []byte(package1)}, - "fake1/olm.bundle/fake1.v1.0.0.json": &fstest.MapFile{Data: []byte(bundle1)}, - "fake1/olm.channel/stable.json": &fstest.MapFile{Data: []byte(stableChannel)}, +func defaultContent() io.Reader { + return strings.NewReader(package1 + bundle1 + stableChannel) } -func TestFilesystemCacheFetchCatalogContents(t *testing.T) { - type test struct { - name string - catalog *catalogd.ClusterCatalog - contents fstest.MapFS - wantErr bool - tripper *mockTripper - testCaching bool - shouldHitCache bool +func defaultFS() fstest.MapFS { + return fstest.MapFS{ + "fake1/olm.package/fake1.json": &fstest.MapFile{Data: []byte(package1)}, + "fake1/olm.bundle/fake1.v1.0.0.json": &fstest.MapFile{Data: []byte(bundle1)}, + "fake1/olm.channel/stable.json": &fstest.MapFile{Data: []byte(stableChannel)}, } - for _, tt := range []test{ - { - name: "valid non-cached fetch", - catalog: &catalogd.ClusterCatalog{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-catalog", - }, - Status: catalogd.ClusterCatalogStatus{ - ResolvedSource: &catalogd.ResolvedCatalogSource{ - Type: catalogd.SourceTypeImage, - Image: &catalogd.ResolvedImageSource{ - ResolvedRef: "fake/catalog@sha256:fakesha", - }, - }, - }, - }, - contents: defaultFS, - tripper: &mockTripper{}, - }, - { - name: "valid cached fetch", - catalog: &catalogd.ClusterCatalog{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-catalog", - }, - Status: catalogd.ClusterCatalogStatus{ - ResolvedSource: &catalogd.ResolvedCatalogSource{ - Type: catalogd.SourceTypeImage, - Image: &catalogd.ResolvedImageSource{ - ResolvedRef: "fake/catalog@sha256:fakesha", - }, - }, - }, - }, - contents: defaultFS, - tripper: &mockTripper{}, - testCaching: true, - shouldHitCache: true, - }, - { - name: "cached update fetch with changes", - catalog: &catalogd.ClusterCatalog{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-catalog", - }, - Status: catalogd.ClusterCatalogStatus{ - ResolvedSource: &catalogd.ResolvedCatalogSource{ - Type: catalogd.SourceTypeImage, - Image: &catalogd.ResolvedImageSource{ - ResolvedRef: "fake/catalog@sha256:fakesha", - }, - }, - }, - }, - contents: defaultFS, - tripper: &mockTripper{}, - testCaching: true, - shouldHitCache: false, - }, - { - name: "fetch error", - catalog: &catalogd.ClusterCatalog{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-catalog", - }, - Status: catalogd.ClusterCatalogStatus{ - ResolvedSource: &catalogd.ResolvedCatalogSource{ - Type: catalogd.SourceTypeImage, - Image: &catalogd.ResolvedImageSource{ - ResolvedRef: "fake/catalog@sha256:fakesha", - }, - }, - }, - }, - contents: defaultFS, - tripper: &mockTripper{shouldError: true}, - wantErr: true, - }, - { - name: "fetch internal server error response", - catalog: &catalogd.ClusterCatalog{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-catalog", - }, - Status: catalogd.ClusterCatalogStatus{ - ResolvedSource: &catalogd.ResolvedCatalogSource{ - Type: catalogd.SourceTypeImage, - Image: &catalogd.ResolvedImageSource{ - ResolvedRef: "fake/catalog@sha256:fakesha", - }, - }, - }, - }, - contents: defaultFS, - tripper: &mockTripper{serverError: true}, - wantErr: true, - }, - { - name: "nil catalog", - catalog: nil, - contents: defaultFS, - tripper: &mockTripper{serverError: true}, - wantErr: true, - }, - { - name: "nil catalog.status.resolvedSource", - catalog: &catalogd.ClusterCatalog{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-catalog", - }, - Status: catalogd.ClusterCatalogStatus{ - ResolvedSource: nil, - }, - }, - contents: defaultFS, - tripper: &mockTripper{serverError: true}, - wantErr: true, - }, - { - name: "nil catalog.status.resolvedSource.image", - catalog: &catalogd.ClusterCatalog{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-catalog", - }, - Status: catalogd.ClusterCatalogStatus{ - ResolvedSource: &catalogd.ResolvedCatalogSource{ - Image: nil, - }, - }, - }, - contents: defaultFS, - tripper: &mockTripper{serverError: true}, - wantErr: true, - }, - } { - t.Run(tt.name, func(t *testing.T) { - ctx := context.Background() - cacheDir := t.TempDir() - tt.tripper.content = make(fstest.MapFS) - maps.Copy(tt.tripper.content, tt.contents) - httpClient := &http.Client{ - Transport: tt.tripper, - } - c := cache.NewFilesystemCache(cacheDir, func() (*http.Client, error) { - return httpClient, nil - }) - - actualFS, err := c.FetchCatalogContents(ctx, tt.catalog) - if !tt.wantErr { - assert.NoError(t, err) - assert.NoError(t, equalFilesystems(tt.contents, actualFS)) - } else { - assert.Error(t, err) - } +} - if tt.testCaching { - if !tt.shouldHitCache { - tt.catalog.Status.ResolvedSource.Image.ResolvedRef = "fake/catalog@sha256:shafake" - } - tt.tripper.content["foobar/olm.package/foobar.json"] = &fstest.MapFile{Data: []byte(`{"schema": "olm.package", "name": "foobar"}`)} - actualFS, err := c.FetchCatalogContents(ctx, tt.catalog) - assert.NoError(t, err) - if !tt.shouldHitCache { - assert.NoError(t, equalFilesystems(tt.tripper.content, actualFS)) - assert.ErrorContains(t, equalFilesystems(tt.contents, actualFS), "foobar/olm.package/foobar.json") - } else { - assert.NoError(t, equalFilesystems(tt.contents, actualFS)) - } - } - }) - } +func TestFilesystemCachePutAndGet(t *testing.T) { + const ( + catalogName = "test-catalog" + resolvedRef1 = "fake/catalog@sha256:fakesha1" + resolvedRef2 = "fake/catalog@sha256:fakesha2" + ) + + cacheDir := t.TempDir() + c := cache.NewFilesystemCache(cacheDir) + + catalogCachePath := filepath.Join(cacheDir, catalogName) + require.NoDirExists(t, catalogCachePath) + + t.Log("Get empty v1 cache") + actualFSGet, err := c.Get(catalogName, resolvedRef1) + assert.NoError(t, err) + assert.Nil(t, actualFSGet) + + t.Log("Put v1 content into cache") + actualFSPut, err := c.Put(catalogName, resolvedRef1, defaultContent(), nil) + assert.NoError(t, err) + require.NotNil(t, actualFSPut) + assert.NoError(t, equalFilesystems(defaultFS(), actualFSPut)) + + t.Log("Get v1 content from cache") + actualFSGet, err = c.Get(catalogName, resolvedRef1) + assert.NoError(t, err) + require.NotNil(t, actualFSGet) + assert.NoError(t, equalFilesystems(defaultFS(), actualFSPut)) + assert.NoError(t, equalFilesystems(actualFSPut, actualFSGet)) + + t.Log("Put v1 error into cache") + actualFSPut, err = c.Put(catalogName, resolvedRef1, nil, errors.New("fake put error")) + // Errors do not override previously successfully populated cache + assert.NoError(t, err) + require.NotNil(t, actualFSPut) + assert.NoError(t, equalFilesystems(defaultFS(), actualFSPut)) + assert.NoError(t, equalFilesystems(actualFSPut, actualFSGet)) + + t.Log("Put v2 error into cache") + actualFSPut, err = c.Put(catalogName, resolvedRef2, nil, errors.New("fake v2 put error")) + assert.Equal(t, errors.New("fake v2 put error"), err) + assert.Nil(t, actualFSPut) + + t.Log("Get v2 error from cache") + actualFSGet, err = c.Get(catalogName, resolvedRef2) + assert.Equal(t, errors.New("fake v2 put error"), err) + assert.Nil(t, actualFSGet) + + t.Log("Put v2 content into cache") + actualFSPut, err = c.Put(catalogName, resolvedRef2, defaultContent(), nil) + assert.NoError(t, err) + require.NotNil(t, actualFSPut) + assert.NoError(t, equalFilesystems(defaultFS(), actualFSPut)) + + t.Log("Get v2 content from cache") + actualFSGet, err = c.Get(catalogName, resolvedRef2) + assert.NoError(t, err) + require.NotNil(t, actualFSGet) + assert.NoError(t, equalFilesystems(defaultFS(), actualFSPut)) + assert.NoError(t, equalFilesystems(actualFSPut, actualFSGet)) + + t.Log("Get empty v1 cache") + // Cache should be empty and no error because + // Put with a new version overrides the old version + actualFSGet, err = c.Get(catalogName, resolvedRef1) + assert.NoError(t, err) + assert.Nil(t, actualFSGet) } func TestFilesystemCacheRemove(t *testing.T) { - testCatalog := &catalogd.ClusterCatalog{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test-catalog", - }, - Status: catalogd.ClusterCatalogStatus{ - ResolvedSource: &catalogd.ResolvedCatalogSource{ - Type: catalogd.SourceTypeImage, - Image: &catalogd.ResolvedImageSource{ - ResolvedRef: "fake/catalog@sha256:fakesha", - }, - }, - }, - } + catalogName := "test-catalog" + resolvedRef := "fake/catalog@sha256:fakesha" - ctx := context.Background() cacheDir := t.TempDir() + c := cache.NewFilesystemCache(cacheDir) - tripper := &mockTripper{} - tripper.content = make(fstest.MapFS) - maps.Copy(tripper.content, defaultFS) - httpClient := &http.Client{ - Transport: tripper, - } - c := cache.NewFilesystemCache(cacheDir, func() (*http.Client, error) { - return httpClient, nil - }) - - catalogCachePath := filepath.Join(cacheDir, testCatalog.Name) + catalogCachePath := filepath.Join(cacheDir, catalogName) t.Log("Remove cache before it exists") require.NoDirExists(t, catalogCachePath) - err := c.Remove(testCatalog.Name) + err := c.Remove(catalogName) require.NoError(t, err) assert.NoDirExists(t, catalogCachePath) t.Log("Fetch contents to populate cache") - _, err = c.FetchCatalogContents(ctx, testCatalog) + _, err = c.Put(catalogName, resolvedRef, defaultContent(), nil) require.NoError(t, err) require.DirExists(t, catalogCachePath) @@ -292,7 +158,7 @@ func TestFilesystemCacheRemove(t *testing.T) { require.NoError(t, os.Chmod(catalogCachePath, 0000)) t.Log("Remove cache causes an error") - err = c.Remove(testCatalog.Name) + err = c.Remove(catalogName) require.ErrorContains(t, err, "error removing cache directory") require.DirExists(t, catalogCachePath) @@ -300,49 +166,11 @@ func TestFilesystemCacheRemove(t *testing.T) { require.NoError(t, os.Chmod(catalogCachePath, 0777)) t.Log("Remove cache") - err = c.Remove(testCatalog.Name) + err = c.Remove(catalogName) require.NoError(t, err) assert.NoDirExists(t, catalogCachePath) } -var _ http.RoundTripper = &mockTripper{} - -type mockTripper struct { - content fstest.MapFS - shouldError bool - serverError bool -} - -func (mt *mockTripper) RoundTrip(_ *http.Request) (*http.Response, error) { - if mt.shouldError { - return nil, errors.New("mock tripper error") - } - - if mt.serverError { - return &http.Response{ - StatusCode: http.StatusInternalServerError, - Body: http.NoBody, - }, nil - } - - pr, pw := io.Pipe() - - go func() { - _ = pw.CloseWithError(declcfg.WalkMetasFS(context.Background(), mt.content, func(_ string, meta *declcfg.Meta, err error) error { - if err != nil { - return err - } - _, err = pw.Write(meta.Blob) - return err - })) - }() - - return &http.Response{ - StatusCode: http.StatusOK, - Body: pr, - }, nil -} - func equalFilesystems(expected, actual fs.FS) error { normalizeJSON := func(data []byte) []byte { var v interface{} diff --git a/internal/catalogmetadata/client/client.go b/internal/catalogmetadata/client/client.go index 6a9efe285..50033fdb2 100644 --- a/internal/catalogmetadata/client/client.go +++ b/internal/catalogmetadata/client/client.go @@ -4,7 +4,9 @@ import ( "context" "errors" "fmt" + "io" "io/fs" + "net/http" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -13,39 +15,62 @@ import ( "github.com/operator-framework/operator-registry/alpha/declcfg" ) -// Fetcher is an interface to facilitate fetching -// catalog contents from catalogd. -type Fetcher interface { - // FetchCatalogContents fetches contents from the catalogd HTTP - // server for the catalog provided. It returns a fs.FS containing the FBC contents. - // Each sub directory contains FBC for a single package - // and the directory name is package name. - // Returns an error if any occur. - FetchCatalogContents(ctx context.Context, catalog *catalogd.ClusterCatalog) (fs.FS, error) +type Cache interface { + // Get returns cache for a specified catalog name and version (resolvedRef). + // + // Method behaviour is as follows: + // - If cache exists, it returns a non-nil fs.FS and nil error + // - If cache doesn't exist, it returns nil fs.FS and nil error + // - If there was an error during cache population, + // it returns nil fs.FS and the error from the cache population. + // In other words - cache population errors are also cached. + Get(catalogName, resolvedRef string) (fs.FS, error) + + // Put writes content from source or from errToCache in the cache backend + // for a specified catalog name and version (resolvedRef). + // + // Method behaviour is as follows: + // - If successfully populated cache for catalogName and resolvedRef exists, + // errToCache is ignored and existing cache returned with nil error + // - If existing cache for catalogName and resolvedRef exists but + // is populated with an error, update the cache with either + // new content from source or errToCache. + // - If cache doesn't exist, populate it with either new content + // from source or errToCache. + Put(catalogName, resolvedRef string, source io.Reader, errToCache error) (fs.FS, error) } -func New(fetcher Fetcher) *Client { +func New(cache Cache, httpClient func() (*http.Client, error)) *Client { return &Client{ - fetcher: fetcher, + cache: cache, + httpClient: httpClient, } } // Client is reading catalog metadata type Client struct { - // fetcher is the Fetcher to use for fetching catalog contents - fetcher Fetcher + cache Cache + httpClient func() (*http.Client, error) } func (c *Client) GetPackage(ctx context.Context, catalog *catalogd.ClusterCatalog, pkgName string) (*declcfg.DeclarativeConfig, error) { - // if the catalog has not been successfully unpacked, report an error. This ensures that our - // reconciles are deterministic and wait for all desired catalogs to be ready. - if !meta.IsStatusConditionPresentAndEqual(catalog.Status.Conditions, catalogd.TypeServing, metav1.ConditionTrue) { - return nil, fmt.Errorf("catalog %q is not being served", catalog.Name) + if err := validateCatalog(catalog); err != nil { + return nil, err } - catalogFsys, err := c.fetcher.FetchCatalogContents(ctx, catalog) + catalogFsys, err := c.cache.Get(catalog.Name, catalog.Status.ResolvedSource.Image.ResolvedRef) if err != nil { - return nil, fmt.Errorf("error fetching catalog contents: %v", err) + return nil, fmt.Errorf("error retrieving catalog cache: %v", err) + } + if catalogFsys == nil { + // TODO: https://github.com/operator-framework/operator-controller/pull/1284 + // For now we are still populating cache (if absent) on-demand, + // but we might end up just returning a "cache not found" error here + // once we implement cache population in the controller. + catalogFsys, err = c.PopulateCache(ctx, catalog) + if err != nil { + return nil, fmt.Errorf("error fetching catalog contents: %v", err) + } } pkgFsys, err := fs.Sub(catalogFsys, pkgName) @@ -65,3 +90,65 @@ func (c *Client) GetPackage(ctx context.Context, catalog *catalogd.ClusterCatalo } return pkgFBC, nil } + +func (c *Client) PopulateCache(ctx context.Context, catalog *catalogd.ClusterCatalog) (fs.FS, error) { + if err := validateCatalog(catalog); err != nil { + return nil, err + } + + resp, err := c.doRequest(ctx, catalog) + if err != nil { + // Any errors from the http request we want to cache + // so later on cache get they can be bubbled up to the user. + return c.cache.Put(catalog.Name, catalog.Status.ResolvedSource.Image.ResolvedRef, nil, err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + errToCache := fmt.Errorf("error: received unexpected response status code %d", resp.StatusCode) + return c.cache.Put(catalog.Name, catalog.Status.ResolvedSource.Image.ResolvedRef, nil, errToCache) + } + + return c.cache.Put(catalog.Name, catalog.Status.ResolvedSource.Image.ResolvedRef, resp.Body, nil) +} + +func (c *Client) doRequest(ctx context.Context, catalog *catalogd.ClusterCatalog) (*http.Response, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, catalog.Status.ContentURL, nil) + if err != nil { + return nil, fmt.Errorf("error forming request: %v", err) + } + + client, err := c.httpClient() + if err != nil { + return nil, fmt.Errorf("error getting HTTP client: %v", err) + } + + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("error performing request: %v", err) + } + + return resp, nil +} + +func validateCatalog(catalog *catalogd.ClusterCatalog) error { + if catalog == nil { + return fmt.Errorf("error: provided catalog must be non-nil") + } + + // if the catalog has not been successfully unpacked, report an error. This ensures that our + // reconciles are deterministic and wait for all desired catalogs to be ready. + if !meta.IsStatusConditionPresentAndEqual(catalog.Status.Conditions, catalogd.TypeServing, metav1.ConditionTrue) { + return fmt.Errorf("catalog %q is not being served", catalog.Name) + } + + if catalog.Status.ResolvedSource == nil { + return fmt.Errorf("error: catalog %q has a nil status.resolvedSource value", catalog.Name) + } + + if catalog.Status.ResolvedSource.Image == nil { + return fmt.Errorf("error: catalog %q has a nil status.resolvedSource.image value", catalog.Name) + } + + return nil +} diff --git a/internal/catalogmetadata/client/client_test.go b/internal/catalogmetadata/client/client_test.go index 989b66acc..a1cec5d4a 100644 --- a/internal/catalogmetadata/client/client_test.go +++ b/internal/catalogmetadata/client/client_test.go @@ -3,7 +3,10 @@ package client_test import ( "context" "errors" + "io" "io/fs" + "net/http" + "strings" "testing" "testing/fstest" @@ -16,99 +19,289 @@ import ( catalogClient "github.com/operator-framework/operator-controller/internal/catalogmetadata/client" ) -func TestClientNew(t *testing.T) { +func defaultCatalog() *catalogd.ClusterCatalog { + return &catalogd.ClusterCatalog{ + ObjectMeta: metav1.ObjectMeta{Name: "catalog-1"}, + Status: catalogd.ClusterCatalogStatus{ + Conditions: []metav1.Condition{{Type: catalogd.TypeServing, Status: metav1.ConditionTrue}}, + ResolvedSource: &catalogd.ResolvedCatalogSource{Image: &catalogd.ResolvedImageSource{ + ResolvedRef: "fake/catalog@sha256:fakesha", + }}, + ContentURL: "https://fake-url.svc.local/all.json", + }, + } +} + +func TestClientGetPackage(t *testing.T) { testFS := fstest.MapFS{ "pkg-present/olm.package/pkg-present.json": &fstest.MapFile{Data: []byte(`{"schema": "olm.package","name": "pkg-present"}`)}, } type testCase struct { name string - catalog *catalogd.ClusterCatalog + catalog func() *catalogd.ClusterCatalog pkgName string - fetcher catalogClient.Fetcher + cache catalogClient.Cache assert func(*testing.T, *declcfg.DeclarativeConfig, error) } for _, tc := range []testCase{ { - name: "not unpacked", - catalog: &catalogd.ClusterCatalog{ObjectMeta: metav1.ObjectMeta{Name: "catalog-1"}}, - fetcher: fetcherFunc(func(context.Context, *catalogd.ClusterCatalog) (fs.FS, error) { return testFS, nil }), + name: "not unpacked", + catalog: func() *catalogd.ClusterCatalog { + return &catalogd.ClusterCatalog{ObjectMeta: metav1.ObjectMeta{Name: "catalog-1"}} + }, assert: func(t *testing.T, dc *declcfg.DeclarativeConfig, err error) { assert.ErrorContains(t, err, `catalog "catalog-1" is not being served`) }, }, { - name: "unpacked, fetcher returns error", - catalog: &catalogd.ClusterCatalog{ - Status: catalogd.ClusterCatalogStatus{Conditions: []metav1.Condition{{Type: catalogd.TypeServing, Status: metav1.ConditionTrue}}}, - }, - fetcher: fetcherFunc(func(context.Context, *catalogd.ClusterCatalog) (fs.FS, error) { return nil, errors.New("fetch error") }), + name: "unpacked, cache returns error", + catalog: defaultCatalog, + cache: &fakeCache{getErr: errors.New("fetch error")}, assert: func(t *testing.T, dc *declcfg.DeclarativeConfig, err error) { - assert.ErrorContains(t, err, `error fetching catalog contents: fetch error`) + assert.ErrorContains(t, err, `error retrieving catalog cache`) }, }, { - name: "unpacked, invalid package path", - catalog: &catalogd.ClusterCatalog{ - Status: catalogd.ClusterCatalogStatus{Conditions: []metav1.Condition{{Type: catalogd.TypeServing, Status: metav1.ConditionTrue}}}, - }, - fetcher: fetcherFunc(func(context.Context, *catalogd.ClusterCatalog) (fs.FS, error) { return testFS, nil }), + name: "unpacked, invalid package path", + catalog: defaultCatalog, + cache: &fakeCache{getFS: testFS}, pkgName: "/", assert: func(t *testing.T, dc *declcfg.DeclarativeConfig, err error) { assert.ErrorContains(t, err, `error getting package "/"`) }, }, { - name: "unpacked, package missing", - catalog: &catalogd.ClusterCatalog{ - Status: catalogd.ClusterCatalogStatus{Conditions: []metav1.Condition{{Type: catalogd.TypeServing, Status: metav1.ConditionTrue}}}, - }, + name: "unpacked, package missing", + catalog: defaultCatalog, pkgName: "pkg-missing", - fetcher: fetcherFunc(func(context.Context, *catalogd.ClusterCatalog) (fs.FS, error) { return testFS, nil }), + cache: &fakeCache{getFS: testFS}, assert: func(t *testing.T, fbc *declcfg.DeclarativeConfig, err error) { assert.NoError(t, err) assert.Equal(t, &declcfg.DeclarativeConfig{}, fbc) }, }, { - name: "unpacked, invalid package present", - catalog: &catalogd.ClusterCatalog{ - Status: catalogd.ClusterCatalogStatus{Conditions: []metav1.Condition{{Type: catalogd.TypeServing, Status: metav1.ConditionTrue}}}, - }, + name: "unpacked, invalid package present", + catalog: defaultCatalog, pkgName: "invalid-pkg-present", - fetcher: fetcherFunc(func(context.Context, *catalogd.ClusterCatalog) (fs.FS, error) { - return fstest.MapFS{ - "invalid-pkg-present/olm.package/invalid-pkg-present.json": &fstest.MapFile{Data: []byte(`{"schema": "olm.package","name": 12345}`)}, - }, nil - }), + cache: &fakeCache{getFS: fstest.MapFS{ + "invalid-pkg-present/olm.package/invalid-pkg-present.json": &fstest.MapFile{Data: []byte(`{"schema": "olm.package","name": 12345}`)}, + }}, assert: func(t *testing.T, fbc *declcfg.DeclarativeConfig, err error) { assert.ErrorContains(t, err, `error loading package "invalid-pkg-present"`) assert.Nil(t, fbc) }, }, { - name: "unpacked, package present", - catalog: &catalogd.ClusterCatalog{ - Status: catalogd.ClusterCatalogStatus{Conditions: []metav1.Condition{{Type: catalogd.TypeServing, Status: metav1.ConditionTrue}}}, + name: "unpacked, package present", + catalog: defaultCatalog, + pkgName: "pkg-present", + cache: &fakeCache{getFS: testFS}, + assert: func(t *testing.T, fbc *declcfg.DeclarativeConfig, err error) { + assert.NoError(t, err) + assert.Equal(t, &declcfg.DeclarativeConfig{Packages: []declcfg.Package{{Schema: declcfg.SchemaPackage, Name: "pkg-present"}}}, fbc) }, + }, + { + name: "cache unpopulated", + catalog: defaultCatalog, pkgName: "pkg-present", - fetcher: fetcherFunc(func(context.Context, *catalogd.ClusterCatalog) (fs.FS, error) { return testFS, nil }), + cache: &fakeCache{putFunc: func(source string, errToCache error) (fs.FS, error) { + return testFS, nil + }}, assert: func(t *testing.T, fbc *declcfg.DeclarativeConfig, err error) { assert.NoError(t, err) assert.Equal(t, &declcfg.DeclarativeConfig{Packages: []declcfg.Package{{Schema: declcfg.SchemaPackage, Name: "pkg-present"}}}, fbc) }, }, + { + name: "cache unpopulated and fails to populate", + catalog: defaultCatalog, + pkgName: "pkg-present", + cache: &fakeCache{putErr: errors.New("fake cache put error")}, + assert: func(t *testing.T, fbc *declcfg.DeclarativeConfig, err error) { + assert.Nil(t, fbc) + assert.ErrorContains(t, err, "error fetching catalog contents") + }, + }, } { t.Run(tc.name, func(t *testing.T) { - c := catalogClient.New(tc.fetcher) - fbc, err := c.GetPackage(context.Background(), tc.catalog, tc.pkgName) + ctx := context.Background() + + c := catalogClient.New(tc.cache, func() (*http.Client, error) { + return &http.Client{ + // This is to prevent actual network calls + Transport: &fakeTripper{resp: &http.Response{ + StatusCode: http.StatusOK, + Body: http.NoBody, + }}, + }, nil + }) + fbc, err := c.GetPackage(ctx, tc.catalog(), tc.pkgName) tc.assert(t, fbc, err) }) } } -type fetcherFunc func(context.Context, *catalogd.ClusterCatalog) (fs.FS, error) +func TestClientPopulateCache(t *testing.T) { + testFS := fstest.MapFS{ + "pkg-present/olm.package/pkg-present.json": &fstest.MapFile{Data: []byte(`{"schema": "olm.package","name": "pkg-present"}`)}, + } + + type testCase struct { + name string + catalog func() *catalogd.ClusterCatalog + httpClient func() (*http.Client, error) + putFuncConstructor func(t *testing.T) func(source string, errToCache error) (fs.FS, error) + assert func(t *testing.T, fs fs.FS, err error) + } + for _, tt := range []testCase{ + { + name: "cache unpopulated, successful http request", + catalog: defaultCatalog, + httpClient: func() (*http.Client, error) { + return &http.Client{ + // This is to prevent actual network calls + Transport: &fakeTripper{resp: &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader("fake-success-response-body")), + }}, + }, nil + }, + assert: func(t *testing.T, fs fs.FS, err error) { + assert.NoError(t, err) + assert.Equal(t, testFS, fs) + }, + putFuncConstructor: func(t *testing.T) func(source string, errToCache error) (fs.FS, error) { + return func(source string, errToCache error) (fs.FS, error) { + assert.Equal(t, "fake-success-response-body", source) + assert.NoError(t, errToCache) + return testFS, errToCache + } + }, + }, + { + name: "not unpacked", + catalog: func() *catalogd.ClusterCatalog { + return &catalogd.ClusterCatalog{ObjectMeta: metav1.ObjectMeta{Name: "catalog-1"}} + }, + assert: func(t *testing.T, fs fs.FS, err error) { + assert.Nil(t, fs) + assert.ErrorContains(t, err, `catalog "catalog-1" is not being served`) + }, + }, + { + name: "cache unpopulated, error on getting a http client", + catalog: defaultCatalog, + httpClient: func() (*http.Client, error) { + return nil, errors.New("fake error getting a http client") + }, + putFuncConstructor: func(t *testing.T) func(source string, errToCache error) (fs.FS, error) { + return func(source string, errToCache error) (fs.FS, error) { + assert.Empty(t, source) + assert.Error(t, errToCache) + return nil, errToCache + } + }, + assert: func(t *testing.T, fs fs.FS, err error) { + assert.Nil(t, fs) + assert.ErrorContains(t, err, "error getting HTTP client") + }, + }, + { + name: "cache unpopulated, error on http request", + catalog: defaultCatalog, + httpClient: func() (*http.Client, error) { + return &http.Client{ + // This is to prevent actual network calls + Transport: &fakeTripper{err: errors.New("fake error on a http request")}, + }, nil + }, + putFuncConstructor: func(t *testing.T) func(source string, errToCache error) (fs.FS, error) { + return func(source string, errToCache error) (fs.FS, error) { + assert.Empty(t, source) + assert.Error(t, errToCache) + return nil, errToCache + } + }, + assert: func(t *testing.T, fs fs.FS, err error) { + assert.Nil(t, fs) + assert.ErrorContains(t, err, "error performing request") + }, + }, + { + name: "cache unpopulated, unexpected http status", + catalog: defaultCatalog, + httpClient: func() (*http.Client, error) { + return &http.Client{ + // This is to prevent actual network calls + Transport: &fakeTripper{resp: &http.Response{ + StatusCode: http.StatusInternalServerError, + Body: io.NopCloser(strings.NewReader("fake-unexpected-code-response-body")), + }}, + }, nil + }, + putFuncConstructor: func(t *testing.T) func(source string, errToCache error) (fs.FS, error) { + return func(source string, errToCache error) (fs.FS, error) { + assert.Empty(t, source) + assert.Error(t, errToCache) + return nil, errToCache + } + }, + assert: func(t *testing.T, fs fs.FS, err error) { + assert.Nil(t, fs) + assert.ErrorContains(t, err, "received unexpected response status code 500") + }, + }, + } { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + + cache := &fakeCache{} + if tt.putFuncConstructor != nil { + cache.putFunc = tt.putFuncConstructor(t) + } + + c := catalogClient.New(cache, tt.httpClient) + fs, err := c.PopulateCache(ctx, tt.catalog()) + tt.assert(t, fs, err) + }) + } +} + +type fakeCache struct { + getFS fs.FS + getErr error + + putFunc func(source string, errToCache error) (fs.FS, error) + putErr error +} + +func (c *fakeCache) Get(catalogName, resolvedRef string) (fs.FS, error) { + return c.getFS, c.getErr +} + +func (c *fakeCache) Put(catalogName, resolvedRef string, source io.Reader, errToCache error) (fs.FS, error) { + if c.putFunc != nil { + buf := new(strings.Builder) + if source != nil { + io.Copy(buf, source) // nolint:errcheck + } + return c.putFunc(buf.String(), errToCache) + } + if c.putErr != nil { + return nil, c.putErr + } + + return nil, errors.New("unexpected error") +} + +type fakeTripper struct { + resp *http.Response + err error +} -func (f fetcherFunc) FetchCatalogContents(ctx context.Context, catalog *catalogd.ClusterCatalog) (fs.FS, error) { - return f(ctx, catalog) +func (ft *fakeTripper) RoundTrip(*http.Request) (*http.Response, error) { + return ft.resp, ft.err }