Skip to content

Commit b2d14c9

Browse files
authored
feat(catalog): load YAML catalogs into the database (kubeflow#1697)
* feat(catalog): load YAML catalogs into the database - Add new ModelProviderFunc interface for pluggable model providers - Implement SourceCollection for managing catalog sources and origins - Refactor catalog loading to support multiple source formats - Add comprehensive test coverage for new loading functionality - Include database migrations for foreign key constraints - Extend catalog models to support new artifact types This introduces a more flexible catalog loading architecture that supports loading models from YAML sources while maintaining backward compatibility. Signed-off-by: Paul Boyd <[email protected]> * fix(catalog): use the correct types for logo and readme fields Signed-off-by: Paul Boyd <[email protected]> * fix(catalog): silence missed event warnings Signed-off-by: Paul Boyd <[email protected]> * fix(catalog): default language and tasks to empty slices Signed-off-by: Paul Boyd <[email protected]> * fix(catalog): pause after receiving fsnotify events We see multiple change events for files that are partially written, so pause a bit to give time for the write to finish before we process it. Signed-off-by: Paul Boyd <[email protected]> --------- Signed-off-by: Paul Boyd <[email protected]>
1 parent b8ea543 commit b2d14c9

34 files changed

+2535
-889
lines changed

catalog/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ type CatalogSourceProvider interface {
122122

123123
2. Register your provider:
124124
```go
125-
catalog.RegisterCatalogType("my-catalog", func(source *CatalogSourceConfig) (CatalogSourceProvider, error) {
125+
catalog.RegisterCatalogType("my-catalog", func(source *Source) (CatalogSourceProvider, error) {
126126
return NewMyCatalogProvider(source)
127127
})
128128
```
@@ -145,4 +145,4 @@ The catalog service is designed to complement the main Model Registry service by
145145
- Unified metadata aggregation
146146
- Read-only access to distributed model catalogs
147147

148-
For complete Model Registry documentation, see the [main README](../README.md).
148+
For complete Model Registry documentation, see the [main README](../README.md).

catalog/cmd/catalog.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package cmd
22

33
import (
4+
"context"
45
"fmt"
56
"net/http"
67
"reflect"
@@ -53,7 +54,14 @@ func runCatalogServer(cmd *cobra.Command, args []string) error {
5354
return fmt.Errorf("error initializing datastore: %v", err)
5455
}
5556

56-
sources, err := catalog.LoadCatalogSources(catalogCfg.ConfigPath)
57+
services := service.NewServices(
58+
getRepo[models.CatalogModelRepository](repoSet),
59+
getRepo[models.CatalogArtifactRepository](repoSet),
60+
getRepo[models.CatalogModelArtifactRepository](repoSet),
61+
getRepo[models.CatalogMetricsArtifactRepository](repoSet),
62+
)
63+
64+
sources, err := catalog.LoadCatalogSources(context.Background(), services, catalogCfg.ConfigPath)
5765
if err != nil {
5866
return fmt.Errorf("error loading catalog sources: %v", err)
5967
}

catalog/internal/catalog/catalog.go

Lines changed: 2 additions & 168 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,6 @@ package catalog
22

33
import (
44
"context"
5-
"fmt"
6-
"os"
7-
"path/filepath"
8-
"sync"
9-
10-
"github.com/golang/glog"
11-
"k8s.io/apimachinery/pkg/util/yaml"
125

136
model "github.com/kubeflow/model-registry/catalog/pkg/openapi"
147
)
@@ -29,8 +22,8 @@ type ListArtifactsParams struct {
2922
NextPageToken *string
3023
}
3124

32-
// CatalogSourceProvider is implemented by catalog source types, e.g. YamlCatalog
33-
type CatalogSourceProvider interface {
25+
// APIProvider implements the API endpoints.
26+
type APIProvider interface {
3427
// GetModel returns model metadata for a single model by its name. If
3528
// nothing is found with the name provided it returns nil, without an
3629
// error.
@@ -46,162 +39,3 @@ type CatalogSourceProvider interface {
4639
// found, but has no artifacts, an empty list is returned.
4740
GetArtifacts(ctx context.Context, modelName string, sourceID string, params ListArtifactsParams) (model.CatalogArtifactList, error)
4841
}
49-
50-
// CatalogSourceConfig is a single entry from the catalog sources YAML file.
51-
type CatalogSourceConfig struct {
52-
model.CatalogSource `json:",inline"`
53-
54-
// Catalog type to use, must match one of the registered types
55-
Type string `json:"type"`
56-
57-
// Properties used for configuring the catalog connection based on catalog implementation
58-
Properties map[string]any `json:"properties,omitempty"`
59-
}
60-
61-
// sourceConfig is the structure for the catalog sources YAML file.
62-
type sourceConfig struct {
63-
Catalogs []CatalogSourceConfig `json:"catalogs"`
64-
}
65-
66-
type CatalogTypeRegisterFunc func(source *CatalogSourceConfig, reldir string) (CatalogSourceProvider, error)
67-
68-
var registeredCatalogTypes = make(map[string]CatalogTypeRegisterFunc, 0)
69-
70-
func RegisterCatalogType(catalogType string, callback CatalogTypeRegisterFunc) error {
71-
if _, exists := registeredCatalogTypes[catalogType]; exists {
72-
return fmt.Errorf("catalog type %s already exists", catalogType)
73-
}
74-
registeredCatalogTypes[catalogType] = callback
75-
return nil
76-
}
77-
78-
type CatalogSource struct {
79-
Provider CatalogSourceProvider
80-
Metadata model.CatalogSource
81-
}
82-
83-
type SourceCollection struct {
84-
sourcesMu sync.RWMutex
85-
sources map[string]CatalogSource
86-
}
87-
88-
func NewSourceCollection(sources map[string]CatalogSource) *SourceCollection {
89-
return &SourceCollection{sources: sources}
90-
}
91-
92-
func (sc *SourceCollection) All() map[string]CatalogSource {
93-
sc.sourcesMu.RLock()
94-
defer sc.sourcesMu.RUnlock()
95-
96-
return sc.sources
97-
}
98-
99-
func (sc *SourceCollection) Get(name string) (src CatalogSource, ok bool) {
100-
sc.sourcesMu.RLock()
101-
defer sc.sourcesMu.RUnlock()
102-
103-
src, ok = sc.sources[name]
104-
return
105-
}
106-
107-
func (sc *SourceCollection) load(path string) error {
108-
// Get absolute path of the catalog config file
109-
absConfigPath, err := filepath.Abs(path)
110-
if err != nil {
111-
return fmt.Errorf("failed to get absolute path for %s: %v", path, err)
112-
}
113-
114-
// Get the directory of the config file to resolve relative paths
115-
configDir := filepath.Dir(absConfigPath)
116-
117-
config := sourceConfig{}
118-
bytes, err := os.ReadFile(absConfigPath)
119-
if err != nil {
120-
return err
121-
}
122-
123-
if err = yaml.UnmarshalStrict(bytes, &config); err != nil {
124-
return err
125-
}
126-
127-
sources := make(map[string]CatalogSource, len(config.Catalogs))
128-
for _, catalogConfig := range config.Catalogs {
129-
// If enabled is explicitly set to false, skip
130-
hasEnabled := catalogConfig.HasEnabled()
131-
if hasEnabled && *catalogConfig.Enabled == false {
132-
continue
133-
}
134-
// If not explicitly set, default to enabled
135-
if !hasEnabled {
136-
t := true
137-
catalogConfig.CatalogSource.Enabled = &t
138-
}
139-
140-
catalogType := catalogConfig.Type
141-
glog.Infof("reading config type %s...", catalogType)
142-
registerFunc, ok := registeredCatalogTypes[catalogType]
143-
if !ok {
144-
return fmt.Errorf("catalog type %s not registered", catalogType)
145-
}
146-
id := catalogConfig.GetId()
147-
if len(id) == 0 {
148-
return fmt.Errorf("invalid catalog id %s", id)
149-
}
150-
if _, exists := sources[id]; exists {
151-
return fmt.Errorf("duplicate catalog id %s", id)
152-
}
153-
154-
labels := make([]string, 0)
155-
if catalogConfig.GetLabels() != nil {
156-
labels = catalogConfig.GetLabels()
157-
}
158-
catalogConfig.CatalogSource.Labels = labels
159-
provider, err := registerFunc(&catalogConfig, configDir)
160-
if err != nil {
161-
return fmt.Errorf("error reading catalog type %s with id %s: %v", catalogType, id, err)
162-
}
163-
164-
sources[id] = CatalogSource{
165-
Provider: provider,
166-
Metadata: catalogConfig.CatalogSource,
167-
}
168-
169-
glog.Infof("loaded config %s of type %s", id, catalogType)
170-
}
171-
172-
sc.sourcesMu.Lock()
173-
defer sc.sourcesMu.Unlock()
174-
sc.sources = sources
175-
176-
return nil
177-
}
178-
179-
func LoadCatalogSources(paths []string) (*SourceCollection, error) {
180-
sc := &SourceCollection{}
181-
182-
for _, path := range paths {
183-
err := sc.load(path)
184-
if err != nil {
185-
return nil, err
186-
}
187-
188-
go func(path string) {
189-
changes, err := getMonitor().Path(path)
190-
if err != nil {
191-
glog.Errorf("unable to watch sources file (%s): %v", path, err)
192-
// Not fatal, we just won't get automatic updates.
193-
}
194-
195-
for range changes {
196-
glog.Infof("Reloading sources %s", path)
197-
198-
err = sc.load(path)
199-
if err != nil {
200-
glog.Errorf("unable to load sources: %v", err)
201-
}
202-
}
203-
}(path)
204-
}
205-
206-
return sc, nil
207-
}

0 commit comments

Comments
 (0)