Skip to content

Commit f0f601d

Browse files
committed
larger refactor of catalogd's structure
Signed-off-by: Joe Lanford <[email protected]>
1 parent 12c2ca9 commit f0f601d

File tree

28 files changed

+1015
-1787
lines changed

28 files changed

+1015
-1787
lines changed

cmd/catalogd/main.go

Lines changed: 80 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -46,17 +46,16 @@ import (
4646
"sigs.k8s.io/controller-runtime/pkg/client"
4747
"sigs.k8s.io/controller-runtime/pkg/healthz"
4848
"sigs.k8s.io/controller-runtime/pkg/log"
49-
"sigs.k8s.io/controller-runtime/pkg/metrics"
5049
"sigs.k8s.io/controller-runtime/pkg/metrics/filters"
5150
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
5251
crwebhook "sigs.k8s.io/controller-runtime/pkg/webhook"
5352

5453
ocv1 "github.com/operator-framework/operator-controller/api/v1"
55-
corecontrollers "github.com/operator-framework/operator-controller/internal/catalogd/controllers/core"
54+
"github.com/operator-framework/operator-controller/internal/catalogd/controllers"
5655
"github.com/operator-framework/operator-controller/internal/catalogd/features"
5756
"github.com/operator-framework/operator-controller/internal/catalogd/garbagecollection"
58-
"github.com/operator-framework/operator-controller/internal/catalogd/handlers"
59-
catalogdmetrics "github.com/operator-framework/operator-controller/internal/catalogd/metrics"
57+
v1 "github.com/operator-framework/operator-controller/internal/catalogd/handlers/api/v1"
58+
"github.com/operator-framework/operator-controller/internal/catalogd/handlers/middleware"
6059
"github.com/operator-framework/operator-controller/internal/catalogd/serverutil"
6160
"github.com/operator-framework/operator-controller/internal/catalogd/storage"
6261
"github.com/operator-framework/operator-controller/internal/catalogd/webhook"
@@ -328,61 +327,55 @@ func run(ctx context.Context) error {
328327
},
329328
}
330329

331-
var localStorage storage.Instance
332-
metrics.Registry.MustRegister(catalogdmetrics.RequestDurationMetric)
333-
334330
storeDir := filepath.Join(cfg.cacheDir, storageDir)
335331
if err := os.MkdirAll(storeDir, 0700); err != nil {
336332
setupLog.Error(err, "unable to create storage directory for catalogs")
337333
return err
338334
}
339335

340-
baseStorageURL, err := url.Parse(fmt.Sprintf("%s/catalogs/", cfg.externalAddr))
336+
const catalogsSubPath = "catalogs"
337+
baseCatalogsURL, err := url.Parse(fmt.Sprintf("%s/%s", cfg.externalAddr, catalogsSubPath))
341338
if err != nil {
342339
setupLog.Error(err, "unable to create base storage URL")
343340
return err
344341
}
345342

346-
indexer := storage.NewIndexer()
347-
handlersMap := map[string]http.Handler{
348-
"/all": handlers.V1AllHandler(indexer),
349-
}
350-
351-
if features.CatalogdFeatureGate.Enabled(features.APIV1MetasHandler) {
352-
handlersMap["/metas"] = handlers.V1MetasHandler(indexer)
353-
}
354-
355-
if features.CatalogdFeatureGate.Enabled(features.APIV1GraphQLHandler) {
356-
handlersMap["/graphql"] = handlers.V1GraphQLHandler()
357-
}
358-
359-
localStorage = &storage.LocalDirV1{
360-
Indexer: indexer,
361-
Handlers: handlersMap,
362-
RootDir: storeDir,
363-
RootURL: baseStorageURL,
364-
}
343+
storageInstances := configureStorage(storeDir)
344+
handler := configureHandler(configAPIV1Handler(catalogsSubPath, storageInstances))
365345

366346
// Config for the catalogd web server
367-
catalogServerConfig := serverutil.CatalogServerConfig{
368-
ExternalAddr: cfg.externalAddr,
369-
CatalogAddr: cfg.catalogServerAddr,
370-
CertFile: cfg.certFile,
371-
KeyFile: cfg.keyFile,
372-
LocalStorage: localStorage,
347+
catalogServerConfig := serverutil.ServerConfig{
348+
Name: "catalogs",
349+
OnlyServeWhenLeader: true,
350+
ListenAddr: cfg.catalogServerAddr,
351+
Server: &http.Server{
352+
Handler: handler,
353+
ReadTimeout: 5 * time.Second,
354+
WriteTimeout: 5 * time.Minute,
355+
},
356+
}
357+
if cfg.certFile != "" && cfg.keyFile != "" {
358+
catalogServerConfig.GetCertificate = cw.GetCertificate
373359
}
374360

375-
err = serverutil.AddCatalogServerToManager(mgr, catalogServerConfig, cw)
361+
catalogServer, err := serverutil.NewManagerServer(catalogServerConfig)
376362
if err != nil {
377363
setupLog.Error(err, "unable to configure catalog server")
378364
return err
379365
}
366+
if err := mgr.Add(catalogServer); err != nil {
367+
setupLog.Error(err, "unable to add catalog server to manager")
368+
return err
369+
}
380370

381-
if err = (&corecontrollers.ClusterCatalogReconciler{
371+
if err = (&controllers.ClusterCatalogReconciler{
382372
Client: mgr.GetClient(),
383373
ImageCache: imageCache,
384374
ImagePuller: imagePuller,
385-
Storage: localStorage,
375+
Storage: storageInstances,
376+
GetBaseURL: func(catalogName string) string {
377+
return fmt.Sprintf("%s/%s", baseCatalogsURL, catalogName)
378+
},
386379
}).SetupWithManager(mgr); err != nil {
387380
setupLog.Error(err, "unable to create controller", "controller", "ClusterCatalog")
388381
return err
@@ -452,3 +445,54 @@ func podNamespace() string {
452445
}
453446
return string(namespace)
454447
}
448+
449+
func configureStorage(storeDir string) *storage.Instances {
450+
metasEnabled := features.CatalogdFeatureGate.Enabled(features.APIV1MetasHandler)
451+
graphqlEnabled := features.CatalogdFeatureGate.Enabled(features.APIV1GraphQLHandler)
452+
needsIndices := metasEnabled || graphqlEnabled
453+
454+
// Setup storage instances
455+
storageInstances := storage.Instances{}
456+
storageInstances.Files = storage.NewFiles(storeDir)
457+
458+
if needsIndices {
459+
storageInstances.Indices = storage.NewIndices(storeDir)
460+
}
461+
if graphqlEnabled {
462+
storageInstances.GraphQLSchemas = storage.NewGraphQLSchemas()
463+
}
464+
465+
return &storageInstances
466+
}
467+
468+
func configAPIV1Handler(baseURLPath string, si *storage.Instances) subPathedHandler {
469+
metasEnabled := features.CatalogdFeatureGate.Enabled(features.APIV1MetasHandler)
470+
graphqlEnabled := features.CatalogdFeatureGate.Enabled(features.APIV1GraphQLHandler)
471+
472+
// Setup API v1 handler
473+
apiV1HandlerOpts := []v1.APIV1HandlerOption{}
474+
apiV1HandlerOpts = append(apiV1HandlerOpts, v1.WithAllHandler(si.Files))
475+
476+
if metasEnabled {
477+
apiV1HandlerOpts = append(apiV1HandlerOpts, v1.WithMetasHandler(si.Files, si.Indices))
478+
}
479+
480+
if graphqlEnabled {
481+
apiV1HandlerOpts = append(apiV1HandlerOpts, v1.WithGraphQLHandler(si.Files, si.Indices, si.GraphQLSchemas))
482+
}
483+
484+
return v1.NewAPIV1Handler(baseURLPath, apiV1HandlerOpts...)
485+
}
486+
487+
type subPathedHandler interface {
488+
SubPath() string
489+
http.Handler
490+
}
491+
492+
func configureHandler(handlers ...subPathedHandler) http.Handler {
493+
mux := http.NewServeMux()
494+
for _, h := range handlers {
495+
mux.Handle(h.SubPath(), h)
496+
}
497+
return middleware.Standard(mux)
498+
}

internal/catalogd/controllers/core/clustercatalog_controller.go renamed to internal/catalogd/controllers/clustercatalog_controller.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package core
17+
package controllers
1818

1919
import (
2020
"context" // #nosec
2121
"errors"
2222
"fmt"
23+
"io/fs"
24+
"iter"
2325
"slices"
2426
"sync"
2527
"time"
@@ -38,6 +40,8 @@ import (
3840
"sigs.k8s.io/controller-runtime/pkg/log"
3941
"sigs.k8s.io/controller-runtime/pkg/reconcile"
4042

43+
"github.com/operator-framework/operator-registry/alpha/declcfg"
44+
4145
ocv1 "github.com/operator-framework/operator-controller/api/v1"
4246
"github.com/operator-framework/operator-controller/internal/catalogd/storage"
4347
imageutil "github.com/operator-framework/operator-controller/internal/shared/util/image"
@@ -57,7 +61,8 @@ type ClusterCatalogReconciler struct {
5761
ImageCache imageutil.Cache
5862
ImagePuller imageutil.Puller
5963

60-
Storage storage.Instance
64+
Storage storage.Instance
65+
GetBaseURL func(catalogName string) string
6166

6267
finalizers crfinalizer.Finalizers
6368

@@ -224,7 +229,7 @@ func (r *ClusterCatalogReconciler) reconcile(ctx context.Context, catalog *ocv1.
224229
case !hasStoredCatalog:
225230
l.Info("unpack required: no cached catalog metadata found for this catalog")
226231
needsUnpack = true
227-
case !r.Storage.ContentExists(catalog.Name):
232+
case !r.Storage.Exists(catalog.Name):
228233
l.Info("unpack required: no stored content found for this catalog")
229234
needsUnpack = true
230235
case !equality.Semantic.DeepEqual(catalog.Status, *expectedStatus):
@@ -265,12 +270,12 @@ func (r *ClusterCatalogReconciler) reconcile(ctx context.Context, catalog *ocv1.
265270
// TODO: We should check to see if the unpacked result has the same content
266271
// as the already unpacked content. If it does, we should skip this rest
267272
// of the unpacking steps.
268-
if err := r.Storage.Store(ctx, catalog.Name, fsys); err != nil {
273+
if err := r.Storage.Store(ctx, catalog.Name, walkMetasFSIterator(ctx, fsys)); err != nil {
269274
storageErr := fmt.Errorf("error storing fbc: %v", err)
270275
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), storageErr)
271276
return ctrl.Result{}, storageErr
272277
}
273-
baseURL := r.Storage.BaseURL(catalog.Name)
278+
baseURL := r.GetBaseURL(catalog.Name)
274279

275280
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), nil)
276281
updateStatusServing(&catalog.Status, canonicalRef, unpackTime, baseURL, catalog.GetGeneration())
@@ -296,8 +301,8 @@ func (r *ClusterCatalogReconciler) getCurrentState(catalog *ocv1.ClusterCatalog)
296301

297302
// Set expected status based on what we see in the stored catalog
298303
clearUnknownConditions(expectedStatus)
299-
if hasStoredCatalog && r.Storage.ContentExists(catalog.Name) {
300-
updateStatusServing(expectedStatus, storedCatalog.ref, storedCatalog.lastUnpack, r.Storage.BaseURL(catalog.Name), storedCatalog.observedGeneration)
304+
if hasStoredCatalog && r.Storage.Exists(catalog.Name) {
305+
updateStatusServing(expectedStatus, storedCatalog.ref, storedCatalog.lastUnpack, r.GetBaseURL(catalog.Name), storedCatalog.observedGeneration)
301306
updateStatusProgressing(expectedStatus, storedCatalog.observedGeneration, nil)
302307
}
303308

@@ -458,7 +463,7 @@ func (r *ClusterCatalogReconciler) deleteStoredCatalog(catalogName string) {
458463
}
459464

460465
func (r *ClusterCatalogReconciler) deleteCatalogCache(ctx context.Context, catalog *ocv1.ClusterCatalog) error {
461-
if err := r.Storage.Delete(catalog.Name); err != nil {
466+
if err := r.Storage.Delete(ctx, catalog.Name); err != nil {
462467
updateStatusProgressing(&catalog.Status, catalog.GetGeneration(), err)
463468
return err
464469
}
@@ -470,3 +475,12 @@ func (r *ClusterCatalogReconciler) deleteCatalogCache(ctx context.Context, catal
470475
r.deleteStoredCatalog(catalog.Name)
471476
return nil
472477
}
478+
479+
func walkMetasFSIterator(ctx context.Context, fsys fs.FS) iter.Seq2[*declcfg.Meta, error] {
480+
return func(yield func(*declcfg.Meta, error) bool) {
481+
_ = declcfg.WalkMetasFS(ctx, fsys, func(path string, meta *declcfg.Meta, err error) error {
482+
yield(meta, err)
483+
return nil
484+
}, declcfg.WithConcurrency(1))
485+
}
486+
}

internal/catalogd/controllers/core/clustercatalog_controller_test.go renamed to internal/catalogd/controllers/clustercatalog_controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package core
1+
package controllers
22

33
import (
44
"context"
Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,35 @@
1-
package handlers
1+
package v1
22

33
import (
4-
"github.com/operator-framework/operator-controller/internal/catalogd/handlers/internal/handlerutil"
54
"net/http"
65

76
"k8s.io/klog/v2"
87

8+
"github.com/operator-framework/operator-controller/internal/catalogd/handlers/internal/handlerutil"
99
"github.com/operator-framework/operator-controller/internal/catalogd/storage"
1010
)
1111

12-
func V1AllHandler(indexer *storage.Indexer) http.Handler {
13-
return handlerutil.AllowedMethodsHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
12+
func apiV1AllHandler(files *storage.Files) http.Handler {
13+
allHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
1414
catalog := r.PathValue("catalog")
1515
logger := klog.FromContext(r.Context()).WithValues("catalog", catalog)
1616

17-
idx, err := indexer.GetIndex(catalog)
17+
catalogFile, err := files.Get(catalog)
1818
if err != nil {
19-
logger.Error(err, "error getting index")
19+
logger.Error(err, "error getting catalog file")
2020
http.Error(w, "Not found", http.StatusNotFound)
2121
return
2222
}
2323

24-
catalogFile := idx.All()
25-
catalogStat, err := idx.Stat()
24+
catalogStat, err := catalogFile.Stat()
2625
if err != nil {
27-
logger.Error(err, "error stat-ing index")
26+
logger.Error(err, "error stat-ing catalog file")
2827
http.Error(w, "Internal server error", http.StatusInternalServerError)
2928
return
3029
}
3130

3231
w.Header().Add("Content-Type", "application/jsonl")
3332
http.ServeContent(w, r, "", catalogStat.ModTime(), catalogFile)
34-
}), http.MethodGet, http.MethodHead)
33+
})
34+
return handlerutil.AllowedMethodsHandler(allHandler, http.MethodGet, http.MethodHead)
3535
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package v1
2+
3+
import (
4+
"net/http"
5+
6+
"github.com/graphql-go/handler"
7+
"k8s.io/klog/v2"
8+
9+
"github.com/operator-framework/operator-controller/internal/catalogd/storage"
10+
)
11+
12+
func apiV1GraphQLHandler(files *storage.Files, indices *storage.Indices, schemas *storage.GraphQLSchemas) http.Handler {
13+
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
14+
catalog := r.PathValue("catalog")
15+
logger := klog.FromContext(r.Context()).WithValues("catalog", catalog)
16+
17+
file, err := files.Get(catalog)
18+
if err != nil {
19+
logger.Error(err, "error getting catalog file")
20+
http.Error(w, "Not found", http.StatusNotFound)
21+
}
22+
defer file.Close()
23+
24+
idx, err := indices.Get(catalog)
25+
if err != nil {
26+
logger.Error(err, "error getting catalog index")
27+
}
28+
29+
schema, err := schemas.Get(catalog)
30+
if err != nil {
31+
logger.Error(err, "error getting catalog graphql schema")
32+
http.Error(w, "Not found", http.StatusNotFound)
33+
}
34+
35+
r = r.WithContext(storage.ContextWithCatalogData(r.Context(), file, idx))
36+
h := handler.New(&handler.Config{
37+
Schema: schema,
38+
GraphiQL: true,
39+
})
40+
h.ServeHTTP(w, r)
41+
})
42+
}

0 commit comments

Comments
 (0)