Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,8 +314,9 @@ func main() {
}

if err = (&controllers.ClusterCatalogReconciler{
Client: cl,
Cache: catalogClientBackend,
Client: cl,
CatalogCache: catalogClientBackend,
CatalogCachePopulator: catalogClient,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ClusterCatalog")
os.Exit(1)
Expand Down
11 changes: 2 additions & 9 deletions internal/catalogmetadata/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,10 @@ func (c *Client) GetPackage(ctx context.Context, catalog *catalogd.ClusterCatalo

catalogFsys, err := c.cache.Get(catalog.Name, catalog.Status.ResolvedSource.Image.Ref)
if err != nil {
return nil, fmt.Errorf("error retrieving catalog cache: %v", err)
return nil, fmt.Errorf("error retrieving cache for catalog %q: %v", catalog.Name, err)
}
if catalogFsys == nil {
// TODO: https://github.com/operator-framework/operator-controller/pull/1284
// For now we are still populating cache (if absent) on-demand,
// but we might end up just returning a "cache not found" error here
// once we implement cache population in the controller.
catalogFsys, err = c.PopulateCache(ctx, catalog)
if err != nil {
return nil, fmt.Errorf("error fetching catalog contents: %v", err)
}
return nil, fmt.Errorf("cache for catalog %q not found", catalog.Name)
}

pkgFsys, err := fs.Sub(catalogFsys, pkgName)
Expand Down
19 changes: 2 additions & 17 deletions internal/catalogmetadata/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestClientGetPackage(t *testing.T) {
catalog: defaultCatalog,
cache: &fakeCache{getErr: errors.New("fetch error")},
assert: func(t *testing.T, dc *declcfg.DeclarativeConfig, err error) {
assert.ErrorContains(t, err, `error retrieving catalog cache`)
assert.ErrorContains(t, err, `error retrieving cache for catalog "catalog-1"`)
},
},
{
Expand Down Expand Up @@ -114,18 +114,7 @@ func TestClientGetPackage(t *testing.T) {
return testFS, nil
}},
assert: func(t *testing.T, fbc *declcfg.DeclarativeConfig, err error) {
require.NoError(t, err)
assert.Equal(t, &declcfg.DeclarativeConfig{Packages: []declcfg.Package{{Schema: declcfg.SchemaPackage, Name: "pkg-present"}}}, fbc)
},
},
{
name: "cache unpopulated and fails to populate",
catalog: defaultCatalog,
pkgName: "pkg-present",
cache: &fakeCache{putErr: errors.New("fake cache put error")},
assert: func(t *testing.T, fbc *declcfg.DeclarativeConfig, err error) {
assert.Nil(t, fbc)
assert.ErrorContains(t, err, "error fetching catalog contents")
assert.ErrorContains(t, err, `cache for catalog "catalog-1" not found`)
},
},
} {
Expand Down Expand Up @@ -278,7 +267,6 @@ type fakeCache struct {
getErr error

putFunc func(source string, errToCache error) (fs.FS, error)
putErr error
}

func (c *fakeCache) Get(catalogName, resolvedRef string) (fs.FS, error) {
Expand All @@ -293,9 +281,6 @@ func (c *fakeCache) Put(catalogName, resolvedRef string, source io.Reader, errTo
}
return c.putFunc(buf.String(), errToCache)
}
if c.putErr != nil {
return nil, c.putErr
}

return nil, errors.New("unexpected error")
}
Expand Down
58 changes: 38 additions & 20 deletions internal/controllers/clustercatalog_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,30 @@ package controllers

import (
"context"
"fmt"
"io/fs"

apierrors "k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1"
)

type CatalogCacheRemover interface {
type CatalogCache interface {
Get(catalogName, resolvedRef string) (fs.FS, error)
Remove(catalogName string) error
}

type CatalogCachePopulator interface {
PopulateCache(ctx context.Context, catalog *catalogd.ClusterCatalog) (fs.FS, error)
}

// ClusterCatalogReconciler reconciles a ClusterCatalog object
type ClusterCatalogReconciler struct {
client.Client
Cache CatalogCacheRemover
CatalogCache CatalogCache
CatalogCachePopulator CatalogCachePopulator
}

//+kubebuilder:rbac:groups=olm.operatorframework.io,resources=clustercatalogs,verbs=get;list;watch
Expand All @@ -45,31 +50,44 @@ func (r *ClusterCatalogReconciler) Reconcile(ctx context.Context, req ctrl.Reque
existingCatalog := &catalogd.ClusterCatalog{}
err := r.Client.Get(ctx, req.NamespacedName, existingCatalog)
if apierrors.IsNotFound(err) {
return ctrl.Result{}, r.Cache.Remove(req.Name)
if err := r.CatalogCache.Remove(req.Name); err != nil {
return ctrl.Result{}, fmt.Errorf("error removing cache for catalog %q: %v", req.Name, err)
}
return ctrl.Result{}, nil
}
if err != nil {
return ctrl.Result{}, err
}

if existingCatalog.Status.ResolvedSource == nil ||
existingCatalog.Status.ResolvedSource.Image == nil ||
existingCatalog.Status.ResolvedSource.Image.Ref == "" {
// Reference is not known yet - skip cache population with no error.
// Once the reference is resolved another reconcile cycle
// will be triggered and we will progress further.
return ctrl.Result{}, nil
}

catalogFsys, err := r.CatalogCache.Get(existingCatalog.Name, existingCatalog.Status.ResolvedSource.Image.Ref)
if err != nil {
return ctrl.Result{}, fmt.Errorf("error retrieving cache for catalog %q: %v", existingCatalog.Name, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WDYT about propagating this error into the cache so that cache clients have more context about why the content is not available?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joelanford it is currently implemented how we discussed here: #1284 (comment)

Cache client caches content fetch errors and r.CatalogCache.Get(...) will return fetch error to the clients. So here we will have something like error retrieving cache for catalog "my-catalog": network request timeout or other error from actual network call

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see now. I didn't realize most of the plumbing had already been done! Beautiful!

}
if catalogFsys != nil {
// Cache already exists so we do not need to populate it
return ctrl.Result{}, nil
}

if _, err = r.CatalogCachePopulator.PopulateCache(ctx, existingCatalog); err != nil {
return ctrl.Result{}, fmt.Errorf("error populating cache for catalog %q: %v", existingCatalog.Name, err)
}

return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *ClusterCatalogReconciler) SetupWithManager(mgr ctrl.Manager) error {
_, err := ctrl.NewControllerManagedBy(mgr).
For(&catalogd.ClusterCatalog{}, builder.WithPredicates(predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return false
},
UpdateFunc: func(e event.UpdateEvent) bool {
return false
},
DeleteFunc: func(e event.DeleteEvent) bool {
return true
},
GenericFunc: func(e event.GenericEvent) bool {
return false
},
})).
For(&catalogd.ClusterCatalog{}).
Build(r)

return err
Expand Down
Loading
Loading