Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,10 @@ func main() {
setupLog.Error(err, "unable to create catalogs cache directory")
os.Exit(1)
}
cacheFetcher := cache.NewFilesystemCache(catalogsCachePath, func() (*http.Client, error) {
catalogClientBackend := cache.NewFilesystemCache(catalogsCachePath)
catalogClient := catalogclient.New(catalogClientBackend, func() (*http.Client, error) {
return httputil.BuildHTTPClient(certPoolWatcher)
})
catalogClient := catalogclient.New(cacheFetcher)

resolver := &resolve.CatalogResolver{
WalkCatalogsFunc: resolve.CatalogWalker(
Expand Down Expand Up @@ -284,7 +284,7 @@ func main() {

if err = (&controllers.ClusterCatalogReconciler{
Client: cl,
Cache: cacheFetcher,
Cache: catalogClientBackend,
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "ClusterCatalog")
os.Exit(1)
Expand Down
156 changes: 72 additions & 84 deletions internal/catalogmetadata/cache/cache.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,24 @@
package cache

import (
"context"
"fmt"
"io"
"io/fs"
"net/http"
"os"
"path/filepath"
"sync"

catalogd "github.com/operator-framework/catalogd/api/core/v1alpha1"
"github.com/operator-framework/operator-registry/alpha/declcfg"

"github.com/operator-framework/operator-controller/internal/catalogmetadata/client"
)

var _ client.Fetcher = &filesystemCache{}

// NewFilesystemCache returns a client.Fetcher implementation that uses a
// local filesystem to cache Catalog contents. When fetching the Catalog contents
// it will:
// - Check if the Catalog is cached
// - IF !cached it will fetch from the catalogd HTTP server and cache the response
// - IF cached it will verify the cache is up to date. If it is up to date it will return
// the cached contents, if not it will fetch the new contents from the catalogd HTTP
// server and update the cached contents.
func NewFilesystemCache(cachePath string, clientFunc func() (*http.Client, error)) *filesystemCache {
var _ client.Cache = &filesystemCache{}

func NewFilesystemCache(cachePath string) *filesystemCache {
return &filesystemCache{
cachePath: cachePath,
mutex: sync.RWMutex{},
getClient: clientFunc,
cacheDataByCatalogName: map[string]cacheData{},
}
}
Expand All @@ -40,75 +29,34 @@ func NewFilesystemCache(cachePath string, clientFunc func() (*http.Client, error
// the cache.
type cacheData struct {
ResolvedRef string
Error error
}

// FilesystemCache is a cache that
// uses the local filesystem for caching
// catalog contents. It will fetch catalog
// contents if the catalog does not already
// exist in the cache.
// catalog contents.
type filesystemCache struct {
mutex sync.RWMutex
cachePath string
getClient func() (*http.Client, error)
cacheDataByCatalogName map[string]cacheData
}

// FetchCatalogContents implements the client.Fetcher interface and
// will fetch the contents for the provided Catalog from the filesystem.
// If the provided Catalog has not yet been cached, it will make a GET
// request to the Catalogd HTTP server to get the Catalog contents and cache
// them. The cache will be updated automatically if a Catalog is noticed to
// have a different resolved image reference.
// The Catalog provided to this function is expected to:
// - Be non-nil
// - Have a non-nil Catalog.Status.ResolvedSource.Image
// This ensures that we are only attempting to fetch catalog contents for Catalog
// resources that have been successfully reconciled, unpacked, and are being served.
// These requirements help ensure that we can rely on status conditions to determine
// when to issue a request to update the cached Catalog contents.
func (fsc *filesystemCache) FetchCatalogContents(ctx context.Context, catalog *catalogd.ClusterCatalog) (fs.FS, error) {
if catalog == nil {
return nil, fmt.Errorf("error: provided catalog must be non-nil")
}

if catalog.Status.ResolvedSource == nil {
return nil, fmt.Errorf("error: catalog %q has a nil status.resolvedSource value", catalog.Name)
}

if catalog.Status.ResolvedSource.Image == nil {
return nil, fmt.Errorf("error: catalog %q has a nil status.resolvedSource.image value", catalog.Name)
}

cacheDir := fsc.cacheDir(catalog.Name)
fsc.mutex.RLock()
if data, ok := fsc.cacheDataByCatalogName[catalog.Name]; ok {
if catalog.Status.ResolvedSource.Image.ResolvedRef == data.ResolvedRef {
fsc.mutex.RUnlock()
return os.DirFS(cacheDir), nil
}
}
fsc.mutex.RUnlock()

req, err := http.NewRequestWithContext(ctx, http.MethodGet, catalog.Status.ContentURL, nil)
if err != nil {
return nil, fmt.Errorf("error forming request: %v", err)
}

client, err := fsc.getClient()
if err != nil {
return nil, fmt.Errorf("error getting HTTP client: %w", err)
}
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("error performing request: %v", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("error: received unexpected response status code %d", resp.StatusCode)
}

// Put writes content from source to the filesystem and stores errToCache
// for a specified catalog name and version (resolvedRef).
//
// Method behaviour is as follows:
// - If successfully populated cache for catalogName and resolvedRef exists,
// errToCache is ignored and existing cache returned with nil error
// - If existing cache for catalogName and resolvedRef exists but
// is populated with an error, update the cache with either
// new content from source or errToCache.
// - If cache doesn't exist, populate it with either new content
// from source or errToCache.
//
// This cache implementation tracks only one version of cache per catalog,
// so Put will override any existing cache on the filesystem for catalogName
// if resolvedRef does not match the one which is already tracked.
func (fsc *filesystemCache) Put(catalogName, resolvedRef string, source io.Reader, errToCache error) (fs.FS, error) {
fsc.mutex.Lock()
defer fsc.mutex.Unlock()

Expand All @@ -117,19 +65,35 @@ func (fsc *filesystemCache) FetchCatalogContents(ctx context.Context, catalog *c
// updating this, has no way to tell if the current ref is the
// newest possible ref. If another thread has already updated
// this to be the same value, skip the write logic and return
// the cached contents
if data, ok := fsc.cacheDataByCatalogName[catalog.Name]; ok {
if data.ResolvedRef == catalog.Status.ResolvedSource.Image.ResolvedRef {
return os.DirFS(cacheDir), nil
}
// the cached contents.
if cache, err := fsc.get(catalogName, resolvedRef); err == nil && cache != nil {
// We only return here if the was no error during
// the previous (likely concurrent) cache population attempt.
// If there was an error - we want to try and populate the cache again.
return cache, nil
}
Comment on lines +68 to +74
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies for missing this the first time around, but do we actually still need this?

I think this was put in place with more complex thread safety logic than we require now since we are doing thread-safe "Put" and "Get" operations. Refreshing my memory on the previous logic, it was done for the specific scenario of:

  • We do a read-only lock to verify if the content has been cached before
  • We release the read-only lock once we have verified that either we have or have not cached this content before
  • Some other thread could now potentially get a write lock and write the contents for this catalog to the cache
  • When we go to write content for the catalog to the cache, we get a write lock and re-verify whether or not it was written by some other process. We let it win because we don't know whether our revision of the content is newer or not.

I don't think the same scenario is possible here since we are doing a lock for the entire duration of the Put method and as such we can probably discard this initial get and ensure Put has the single responsibility of always writing the new cache.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That being said, it may still be valid to say "we have successfully cached this revision before so we will ignore a failed put for the same revision". I would lean more towards putting the responsibility of this verification to the caller though to simplify the responsibility of these cache functions.

Copy link
Member Author

@m1kola m1kola Oct 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@everettraven it is still possible that two callers still do the following concurrently:

  1. Get cache - it is missing - we lock for reading
  2. Do an HTTP request - we do not lock this
  3. Write into FS - we lock this

So if this happens we have and we remove the get from Put we have two redundant things: 1) network request 2) FS IO.

So if we remove this check we are risking two sequential writes to FS which will make reads (from all catalogs) blocked as well. With this check we ensure that only one FS write happens even if there was multiple concurrent attempts to fetch cache.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. I get that it would still be possible for that scenario to occur, but I'm questioning whether it should be the responsibility of the Put method to handle that or the caller. I would think the caller should take that into consideration as opposed to the Put method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@everettraven if we make this responsibility of the caller then the caller will have to be responsible for locking of the mutex as well. And we probably don't want to make locking the caller's responsibility.

Or am I still not getting the idea?


var cacheFS fs.FS
if errToCache == nil {
cacheFS, errToCache = fsc.writeFS(catalogName, source)
}
fsc.cacheDataByCatalogName[catalogName] = cacheData{
ResolvedRef: resolvedRef,
Error: errToCache,
}

return cacheFS, errToCache
}

func (fsc *filesystemCache) writeFS(catalogName string, source io.Reader) (fs.FS, error) {
cacheDir := fsc.cacheDir(catalogName)

tmpDir, err := os.MkdirTemp(fsc.cachePath, fmt.Sprintf(".%s-", catalog.Name))
tmpDir, err := os.MkdirTemp(fsc.cachePath, fmt.Sprintf(".%s-", catalogName))
if err != nil {
return nil, fmt.Errorf("error creating temporary directory to unpack catalog metadata: %v", err)
}

if err := declcfg.WalkMetasReader(resp.Body, func(meta *declcfg.Meta, err error) error {
if err := declcfg.WalkMetasReader(source, func(meta *declcfg.Meta, err error) error {
if err != nil {
return fmt.Errorf("error parsing catalog contents: %v", err)
}
Expand Down Expand Up @@ -160,11 +124,35 @@ func (fsc *filesystemCache) FetchCatalogContents(ctx context.Context, catalog *c
return nil, fmt.Errorf("error moving temporary directory to cache directory: %v", err)
}

fsc.cacheDataByCatalogName[catalog.Name] = cacheData{
ResolvedRef: catalog.Status.ResolvedSource.Image.ResolvedRef,
return os.DirFS(cacheDir), nil
}

// Get returns cache for a specified catalog name and version (resolvedRef).
//
// Method behaviour is as follows:
// - If cache exists, it returns a non-nil fs.FS and nil error
// - If cache doesn't exist, it returns nil fs.FS and nil error
// - If there was an error during cache population,
// it returns nil fs.FS and the error from the cache population.
// In other words - cache population errors are also cached.
func (fsc *filesystemCache) Get(catalogName, resolvedRef string) (fs.FS, error) {
fsc.mutex.RLock()
defer fsc.mutex.RUnlock()
return fsc.get(catalogName, resolvedRef)
}

func (fsc *filesystemCache) get(catalogName, resolvedRef string) (fs.FS, error) {
cacheDir := fsc.cacheDir(catalogName)
if data, ok := fsc.cacheDataByCatalogName[catalogName]; ok {
if resolvedRef == data.ResolvedRef {
if data.Error != nil {
return nil, data.Error
}
return os.DirFS(cacheDir), nil
}
}

return os.DirFS(cacheDir), nil
return nil, nil
}

// Remove deletes cache directory for a given catalog from the filesystem
Expand Down
Loading