Skip to content

Commit b11ccb0

Browse files
committed
larger refactor of catalogd's structure
Signed-off-by: Joe Lanford <[email protected]>
1 parent 12c2ca9 commit b11ccb0

File tree

22 files changed

+791
-1682
lines changed

22 files changed

+791
-1682
lines changed

cmd/catalogd/main.go

Lines changed: 46 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,10 @@ import (
5252
crwebhook "sigs.k8s.io/controller-runtime/pkg/webhook"
5353

5454
ocv1 "github.com/operator-framework/operator-controller/api/v1"
55-
corecontrollers "github.com/operator-framework/operator-controller/internal/catalogd/controllers/core"
55+
"github.com/operator-framework/operator-controller/internal/catalogd/controllers"
5656
"github.com/operator-framework/operator-controller/internal/catalogd/features"
5757
"github.com/operator-framework/operator-controller/internal/catalogd/garbagecollection"
58-
"github.com/operator-framework/operator-controller/internal/catalogd/handlers"
58+
handlers2 "github.com/operator-framework/operator-controller/internal/catalogd/handlers"
5959
catalogdmetrics "github.com/operator-framework/operator-controller/internal/catalogd/metrics"
6060
"github.com/operator-framework/operator-controller/internal/catalogd/serverutil"
6161
"github.com/operator-framework/operator-controller/internal/catalogd/storage"
@@ -328,7 +328,6 @@ func run(ctx context.Context) error {
328328
},
329329
}
330330

331-
var localStorage storage.Instance
332331
metrics.Registry.MustRegister(catalogdmetrics.RequestDurationMetric)
333332

334333
storeDir := filepath.Join(cfg.cacheDir, storageDir)
@@ -337,39 +336,21 @@ func run(ctx context.Context) error {
337336
return err
338337
}
339338

340-
baseStorageURL, err := url.Parse(fmt.Sprintf("%s/catalogs/", cfg.externalAddr))
339+
const catalogsSubPath = "catalogs"
340+
baseCatalogsURL, err := url.Parse(fmt.Sprintf("%s/%s", cfg.externalAddr, catalogsSubPath))
341341
if err != nil {
342342
setupLog.Error(err, "unable to create base storage URL")
343343
return err
344344
}
345345

346-
indexer := storage.NewIndexer()
347-
handlersMap := map[string]http.Handler{
348-
"/all": handlers.V1AllHandler(indexer),
349-
}
350-
351-
if features.CatalogdFeatureGate.Enabled(features.APIV1MetasHandler) {
352-
handlersMap["/metas"] = handlers.V1MetasHandler(indexer)
353-
}
354-
355-
if features.CatalogdFeatureGate.Enabled(features.APIV1GraphQLHandler) {
356-
handlersMap["/graphql"] = handlers.V1GraphQLHandler()
357-
}
358-
359-
localStorage = &storage.LocalDirV1{
360-
Indexer: indexer,
361-
Handlers: handlersMap,
362-
RootDir: storeDir,
363-
RootURL: baseStorageURL,
364-
}
346+
storageInstances, handlers := configureStorageAndHandlers(storeDir, catalogsSubPath)
365347

366348
// Config for the catalogd web server
367349
catalogServerConfig := serverutil.CatalogServerConfig{
368-
ExternalAddr: cfg.externalAddr,
369-
CatalogAddr: cfg.catalogServerAddr,
370-
CertFile: cfg.certFile,
371-
KeyFile: cfg.keyFile,
372-
LocalStorage: localStorage,
350+
CatalogAddr: cfg.catalogServerAddr,
351+
CertFile: cfg.certFile,
352+
KeyFile: cfg.keyFile,
353+
Handlers: handlers,
373354
}
374355

375356
err = serverutil.AddCatalogServerToManager(mgr, catalogServerConfig, cw)
@@ -378,11 +359,14 @@ func run(ctx context.Context) error {
378359
return err
379360
}
380361

381-
if err = (&corecontrollers.ClusterCatalogReconciler{
362+
if err = (&controllers.ClusterCatalogReconciler{
382363
Client: mgr.GetClient(),
383364
ImageCache: imageCache,
384365
ImagePuller: imagePuller,
385-
Storage: localStorage,
366+
Storage: storageInstances,
367+
GetBaseURL: func(catalogName string) string {
368+
return fmt.Sprintf("%s/%s", baseCatalogsURL, catalogName)
369+
},
386370
}).SetupWithManager(mgr); err != nil {
387371
setupLog.Error(err, "unable to create controller", "controller", "ClusterCatalog")
388372
return err
@@ -452,3 +436,35 @@ func podNamespace() string {
452436
}
453437
return string(namespace)
454438
}
439+
440+
func configureStorageAndHandlers(storeDir string, baseURLPath string) (storage.Instance, map[string]http.Handler) {
441+
files := storage.NewFiles(storeDir)
442+
storageInstances := storage.Instances{
443+
files,
444+
}
445+
446+
var indices *storage.Indices
447+
metasEnabled := features.CatalogdFeatureGate.Enabled(features.APIV1MetasHandler)
448+
graphqlEnabled := features.CatalogdFeatureGate.Enabled(features.APIV1GraphQLHandler)
449+
if metasEnabled || graphqlEnabled {
450+
indices = storage.NewIndices(storeDir)
451+
storageInstances = append(storageInstances, indices)
452+
}
453+
454+
v1APIPath := func(api string) string {
455+
return fmt.Sprintf("/%s/{catalog}/api/v1/%s", baseURLPath, api)
456+
}
457+
458+
handlersMap := map[string]http.Handler{
459+
v1APIPath("all"): handlers2.V1AllHandler(files),
460+
}
461+
if metasEnabled {
462+
handlersMap[v1APIPath("metas")] = handlers2.V1MetasHandler(files, indices)
463+
}
464+
if graphqlEnabled {
465+
schemas := storage.NewGraphQLSchemas()
466+
storageInstances = append(storageInstances, schemas)
467+
handlersMap[v1APIPath("graphql")] = handlers2.V1GraphQLHandler(files, indices, schemas)
468+
}
469+
return storageInstances, handlersMap
470+
}

internal/catalogd/controllers/core/clustercatalog_controller.go renamed to internal/catalogd/controllers/clustercatalog_controller.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package core
17+
package controllers
1818

1919
import (
2020
"context" // #nosec
2121
"errors"
2222
"fmt"
23+
"io/fs"
24+
"iter"
2325
"slices"
2426
"sync"
2527
"time"
@@ -38,6 +40,8 @@ import (
3840
"sigs.k8s.io/controller-runtime/pkg/log"
3941
"sigs.k8s.io/controller-runtime/pkg/reconcile"
4042

43+
"github.com/operator-framework/operator-registry/alpha/declcfg"
44+
4145
ocv1 "github.com/operator-framework/operator-controller/api/v1"
4246
"github.com/operator-framework/operator-controller/internal/catalogd/storage"
4347
imageutil "github.com/operator-framework/operator-controller/internal/shared/util/image"
@@ -57,7 +61,8 @@ type ClusterCatalogReconciler struct {
5761
ImageCache imageutil.Cache
5862
ImagePuller imageutil.Puller
5963

60-
Storage storage.Instance
64+
Storage storage.Instance
65+
GetBaseURL func(catalogName string) string
6166

6267
finalizers crfinalizer.Finalizers
6368

@@ -224,7 +229,7 @@ func (r *ClusterCatalogReconciler) reconcile(ctx context.Context, catalog *ocv1.
224229
case !hasStoredCatalog:
225230
l.Info("unpack required: no cached catalog metadata found for this catalog")
226231
needsUnpack = true
227-
case !r.Storage.ContentExists(catalog.Name):
232+
case !r.Storage.Exists(catalog.Name):
228233
l.Info("unpack required: no stored content found for this catalog")
229234
needsUnpack = true
230235
case !equality.Semantic.DeepEqual(catalog.Status, *expectedStatus):
@@ -265,12 +270,12 @@ func (r *ClusterCatalogReconciler) reconcile(ctx context.Context, catalog *ocv1.
265270
// TODO: We should check to see if the unpacked result has the same content
266271
// as the already unpacked content. If it does, we should skip this rest
267272
// of the unpacking steps.
268-
if err := r.Storage.Store(ctx, catalog.Name, fsys); err != nil {
273+
if err := r.Storage.Store(ctx, catalog.Name, walkMetasFSIterator(ctx, fsys)); err != nil {
269274
storageErr := fmt.Errorf("error storing fbc: %v", err)
270275
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), storageErr)
271276
return ctrl.Result{}, storageErr
272277
}
273-
baseURL := r.Storage.BaseURL(catalog.Name)
278+
baseURL := r.GetBaseURL(catalog.Name)
274279

275280
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), nil)
276281
updateStatusServing(&catalog.Status, canonicalRef, unpackTime, baseURL, catalog.GetGeneration())
@@ -296,8 +301,8 @@ func (r *ClusterCatalogReconciler) getCurrentState(catalog *ocv1.ClusterCatalog)
296301

297302
// Set expected status based on what we see in the stored catalog
298303
clearUnknownConditions(expectedStatus)
299-
if hasStoredCatalog && r.Storage.ContentExists(catalog.Name) {
300-
updateStatusServing(expectedStatus, storedCatalog.ref, storedCatalog.lastUnpack, r.Storage.BaseURL(catalog.Name), storedCatalog.observedGeneration)
304+
if hasStoredCatalog && r.Storage.Exists(catalog.Name) {
305+
updateStatusServing(expectedStatus, storedCatalog.ref, storedCatalog.lastUnpack, r.GetBaseURL(catalog.Name), storedCatalog.observedGeneration)
301306
updateStatusProgressing(expectedStatus, storedCatalog.observedGeneration, nil)
302307
}
303308

@@ -458,7 +463,7 @@ func (r *ClusterCatalogReconciler) deleteStoredCatalog(catalogName string) {
458463
}
459464

460465
func (r *ClusterCatalogReconciler) deleteCatalogCache(ctx context.Context, catalog *ocv1.ClusterCatalog) error {
461-
if err := r.Storage.Delete(catalog.Name); err != nil {
466+
if err := r.Storage.Delete(ctx, catalog.Name); err != nil {
462467
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), err)
463468
return err
464469
}
@@ -470,3 +475,12 @@ func (r *ClusterCatalogReconciler) deleteCatalogCache(ctx context.Context, catal
470475
r.deleteStoredCatalog(catalog.Name)
471476
return nil
472477
}
478+
479+
func walkMetasFSIterator(ctx context.Context, fsys fs.FS) iter.Seq2[*declcfg.Meta, error] {
480+
return func(yield func(*declcfg.Meta, error) bool) {
481+
_ = declcfg.WalkMetasFS(ctx, fsys, func(path string, meta *declcfg.Meta, err error) error {
482+
yield(meta, err)
483+
return nil
484+
}, declcfg.WithConcurrency(1))
485+
}
486+
}

internal/catalogd/controllers/core/clustercatalog_controller_test.go renamed to internal/catalogd/controllers/clustercatalog_controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package core
1+
package controllers
22

33
import (
44
"context"

internal/catalogd/handlers/all.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,29 @@
11
package handlers
22

33
import (
4-
"github.com/operator-framework/operator-controller/internal/catalogd/handlers/internal/handlerutil"
54
"net/http"
65

76
"k8s.io/klog/v2"
87

8+
"github.com/operator-framework/operator-controller/internal/catalogd/handlers/internal/handlerutil"
99
"github.com/operator-framework/operator-controller/internal/catalogd/storage"
1010
)
1111

12-
func V1AllHandler(indexer *storage.Indexer) http.Handler {
12+
func V1AllHandler(files *storage.Files) http.Handler {
1313
return handlerutil.AllowedMethodsHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1414
catalog := r.PathValue("catalog")
1515
logger := klog.FromContext(r.Context()).WithValues("catalog", catalog)
1616

17-
idx, err := indexer.GetIndex(catalog)
17+
catalogFile, err := files.Get(catalog)
1818
if err != nil {
19-
logger.Error(err, "error getting index")
19+
logger.Error(err, "error getting catalog file")
2020
http.Error(w, "Not found", http.StatusNotFound)
2121
return
2222
}
2323

24-
catalogFile := idx.All()
25-
catalogStat, err := idx.Stat()
24+
catalogStat, err := catalogFile.Stat()
2625
if err != nil {
27-
logger.Error(err, "error stat-ing index")
26+
logger.Error(err, "error stat-ing catalog file")
2827
http.Error(w, "Internal server error", http.StatusInternalServerError)
2928
return
3029
}
Lines changed: 31 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,62 +1,42 @@
11
package handlers
22

33
import (
4-
"context"
5-
"errors"
6-
"fmt"
74
"net/http"
8-
"sync"
95

6+
"github.com/graphql-go/handler"
107
"k8s.io/klog/v2"
118

12-
"github.com/operator-framework/operator-registry/alpha/declcfg"
13-
14-
"github.com/operator-framework/operator-controller/internal/catalogd/handlers/internal/graphql"
159
"github.com/operator-framework/operator-controller/internal/catalogd/storage"
1610
)
1711

18-
type GraphQLHandler struct {
19-
catalogHandlers map[string]http.Handler
20-
mu sync.RWMutex
21-
}
22-
23-
var (
24-
_ storage.MetaProcessor = (*GraphQLHandler)(nil)
25-
_ http.Handler = (*GraphQLHandler)(nil)
26-
)
27-
28-
func V1GraphQLHandler() *GraphQLHandler {
29-
return &GraphQLHandler{
30-
catalogHandlers: make(map[string]http.Handler),
31-
}
32-
}
33-
34-
func (h *GraphQLHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
35-
catalog := r.PathValue("catalog")
36-
logger := klog.FromContext(r.Context()).WithValues("catalog", catalog)
37-
38-
h.mu.RLock()
39-
defer h.mu.RUnlock()
40-
41-
handler, ok := h.catalogHandlers[catalog]
42-
if !ok {
43-
logger.Error(errors.New("no handler found for catalog"), "catalog not found")
44-
http.Error(w, "Not found", http.StatusNotFound)
45-
return
46-
}
47-
r = r.WithContext(graphql.NewRequestContext(r.Context()))
48-
handler.ServeHTTP(w, r)
49-
}
50-
51-
func (h *GraphQLHandler) ProcessMetas(ctx context.Context, catalog string, idx *storage.Index, metasChan <-chan *declcfg.Meta) error {
52-
handler, err := graphql.NewHandler(ctx, idx, metasChan)
53-
if err != nil {
54-
return fmt.Errorf("failed to create graphql handler: %w", err)
55-
}
56-
57-
h.mu.Lock()
58-
defer h.mu.Unlock()
59-
60-
h.catalogHandlers[catalog] = handler
61-
return nil
12+
func V1GraphQLHandler(files *storage.Files, indices *storage.Indices, schemas *storage.GraphQLSchemas) http.Handler {
13+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
14+
catalog := r.PathValue("catalog")
15+
logger := klog.FromContext(r.Context()).WithValues("catalog", catalog)
16+
17+
file, err := files.Get(catalog)
18+
if err != nil {
19+
logger.Error(err, "error getting catalog file")
20+
http.Error(w, "Not found", http.StatusNotFound)
21+
}
22+
defer file.Close()
23+
24+
idx, err := indices.Get(catalog)
25+
if err != nil {
26+
logger.Error(err, "error getting catalog index")
27+
}
28+
29+
schema, err := schemas.Get(catalog)
30+
if err != nil {
31+
logger.Error(err, "error getting catalog graphql schema")
32+
http.Error(w, "Not found", http.StatusNotFound)
33+
}
34+
35+
r = r.WithContext(storage.ContextWithCatalogData(r.Context(), file, idx))
36+
h := handler.New(&handler.Config{
37+
Schema: schema,
38+
GraphiQL: true,
39+
})
40+
h.ServeHTTP(w, r)
41+
})
6242
}

0 commit comments

Comments
 (0)