Skip to content

Commit 3bf656a

Browse files
authored
feat(catalog): reload sources and static catalogs (kubeflow#1266)
Reload sources and static catalogs after changes are made to the files they were loaded from. Signed-off-by: Paul Boyd <[email protected]>
1 parent c7aed7f commit 3bf656a

File tree

8 files changed

+549
-46
lines changed

8 files changed

+549
-46
lines changed

catalog/internal/catalog/catalog.go

Lines changed: 71 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"os"
77
"path/filepath"
8+
"sync"
89

910
"github.com/golang/glog"
1011
"k8s.io/apimachinery/pkg/util/yaml"
@@ -75,11 +76,35 @@ type CatalogSource struct {
7576
Metadata model.CatalogSource
7677
}
7778

78-
func LoadCatalogSources(catalogsPath string) (map[string]CatalogSource, error) {
79+
type SourceCollection struct {
80+
sourcesMu sync.RWMutex
81+
sources map[string]CatalogSource
82+
}
83+
84+
func NewSourceCollection(sources map[string]CatalogSource) *SourceCollection {
85+
return &SourceCollection{sources: sources}
86+
}
87+
88+
func (sc *SourceCollection) All() map[string]CatalogSource {
89+
sc.sourcesMu.RLock()
90+
defer sc.sourcesMu.RUnlock()
91+
92+
return sc.sources
93+
}
94+
95+
func (sc *SourceCollection) Get(name string) (src CatalogSource, ok bool) {
96+
sc.sourcesMu.RLock()
97+
defer sc.sourcesMu.RUnlock()
98+
99+
src, ok = sc.sources[name]
100+
return
101+
}
102+
103+
func (sc *SourceCollection) load(path string) error {
79104
// Get absolute path of the catalog config file
80-
absConfigPath, err := filepath.Abs(catalogsPath)
105+
absConfigPath, err := filepath.Abs(path)
81106
if err != nil {
82-
return nil, fmt.Errorf("failed to get absolute path for %s: %v", catalogsPath, err)
107+
return fmt.Errorf("failed to get absolute path for %s: %v", path, err)
83108
}
84109

85110
// Get the directory of the config file to resolve relative paths
@@ -88,12 +113,12 @@ func LoadCatalogSources(catalogsPath string) (map[string]CatalogSource, error) {
88113
// Save current working directory
89114
originalWd, err := os.Getwd()
90115
if err != nil {
91-
return nil, fmt.Errorf("failed to get current working directory: %v", err)
116+
return fmt.Errorf("failed to get current working directory: %v", err)
92117
}
93118

94119
// Change to the config directory to make relative paths work
95120
if err := os.Chdir(configDir); err != nil {
96-
return nil, fmt.Errorf("failed to change to config directory %s: %v", configDir, err)
121+
return fmt.Errorf("failed to change to config directory %s: %v", configDir, err)
97122
}
98123

99124
// Ensure we restore the original working directory when we're done
@@ -106,40 +131,71 @@ func LoadCatalogSources(catalogsPath string) (map[string]CatalogSource, error) {
106131
config := sourceConfig{}
107132
bytes, err := os.ReadFile(absConfigPath)
108133
if err != nil {
109-
return nil, err
134+
return err
110135
}
111136

112137
if err = yaml.UnmarshalStrict(bytes, &config); err != nil {
113-
return nil, err
138+
return err
114139
}
115140

116-
catalogs := make(map[string]CatalogSource, len(config.Catalogs))
141+
sources := make(map[string]CatalogSource, len(config.Catalogs))
117142
for _, catalogConfig := range config.Catalogs {
118143
catalogType := catalogConfig.Type
119144
glog.Infof("reading config type %s...", catalogType)
120145
registerFunc, ok := registeredCatalogTypes[catalogType]
121146
if !ok {
122-
return nil, fmt.Errorf("catalog type %s not registered", catalogType)
147+
return fmt.Errorf("catalog type %s not registered", catalogType)
123148
}
124149
id := catalogConfig.GetId()
125150
if len(id) == 0 {
126-
return nil, fmt.Errorf("invalid catalog id %s", id)
151+
return fmt.Errorf("invalid catalog id %s", id)
127152
}
128-
if _, exists := catalogs[id]; exists {
129-
return nil, fmt.Errorf("duplicate catalog id %s", id)
153+
if _, exists := sources[id]; exists {
154+
return fmt.Errorf("duplicate catalog id %s", id)
130155
}
131156
provider, err := registerFunc(&catalogConfig)
132157
if err != nil {
133-
return nil, fmt.Errorf("error reading catalog type %s with id %s: %v", catalogType, id, err)
158+
return fmt.Errorf("error reading catalog type %s with id %s: %v", catalogType, id, err)
134159
}
135160

136-
catalogs[id] = CatalogSource{
161+
sources[id] = CatalogSource{
137162
Provider: provider,
138163
Metadata: catalogConfig.CatalogSource,
139164
}
140165

141166
glog.Infof("loaded config %s of type %s", id, catalogType)
142167
}
143168

144-
return catalogs, nil
169+
sc.sourcesMu.Lock()
170+
defer sc.sourcesMu.Unlock()
171+
sc.sources = sources
172+
173+
return nil
174+
}
175+
176+
func LoadCatalogSources(path string) (*SourceCollection, error) {
177+
sc := &SourceCollection{}
178+
err := sc.load(path)
179+
if err != nil {
180+
return nil, err
181+
}
182+
183+
go func() {
184+
changes, err := getMonitor().Path(path)
185+
if err != nil {
186+
glog.Errorf("unable to watch sources file: %v", err)
187+
// Not fatal, we just won't get automatic updates.
188+
}
189+
190+
for range changes {
191+
glog.Infof("Reloading sources %s", path)
192+
193+
err = sc.load(path)
194+
if err != nil {
195+
glog.Errorf("unable to load sources: %v", err)
196+
}
197+
}
198+
}()
199+
200+
return sc, nil
145201
}

catalog/internal/catalog/catalog_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ func TestLoadCatalogSources(t *testing.T) {
3030
t.Errorf("LoadCatalogSources() error = %v, wantErr %v", err, tt.wantErr)
3131
return
3232
}
33-
gotKeys := make([]string, 0, len(got))
34-
for k := range got {
33+
gotKeys := make([]string, 0, len(got.All()))
34+
for k := range got.All() {
3535
gotKeys = append(gotKeys, k)
3636
}
3737
sort.Strings(gotKeys)
Lines changed: 238 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,238 @@
1+
package catalog
2+
3+
import (
4+
"fmt"
5+
"hash/crc32"
6+
"io"
7+
"os"
8+
"path/filepath"
9+
"sync"
10+
"sync/atomic"
11+
12+
"github.com/fsnotify/fsnotify"
13+
"github.com/golang/glog"
14+
)
15+
16+
// monitor sends events when the contents of a file have changed.
17+
//
18+
// Unfortunately, simply watching the file misses events for our primary case
19+
// of k8s mounted configmaps because the files we're watching are actually
20+
// symlinks which aren't modified:
21+
//
22+
// drwxrwxrwx 1 root root 138 Jul 2 15:45 .
23+
// drwxr-xr-x 1 root root 116 Jul 2 15:52 ..
24+
// drwxr-xr-x 1 root root 62 Jul 2 15:45 ..2025_07_02_15_45_09.2837733502
25+
// lrwxrwxrwx 1 root root 32 Jul 2 15:45 ..data -> ..2025_07_02_15_45_09.2837733502
26+
// lrwxrwxrwx 1 root root 26 Jul 2 13:18 sample-catalog.yaml -> ..data/sample-catalog.yaml
27+
// lrwxrwxrwx 1 root root 19 Jul 2 13:18 sources.yaml -> ..data/sources.yaml
28+
//
29+
// Updates are written to a new directory and the ..data symlink is updated. No
30+
// fsnotify events will ever be triggered for the YAML files.
31+
//
32+
// The approach taken here is to watch the directory containing the file for
33+
// any change and then hash the contents of the file to avoid false-positives.
34+
type monitor struct {
35+
watcher *fsnotify.Watcher
36+
closed <-chan struct{}
37+
38+
recordsMu sync.RWMutex
39+
records map[string]map[string]*monitorRecord
40+
}
41+
42+
var _monitor *monitor
43+
var initMonitor sync.Once
44+
45+
// getMonitor returns a singleton monitor instance. Panics on failure.
46+
func getMonitor() *monitor {
47+
initMonitor.Do(func() {
48+
var err error
49+
_monitor, err = newMonitor()
50+
if err != nil {
51+
panic(fmt.Sprintf("Unable to create file monitor: %v", err))
52+
}
53+
})
54+
if _monitor == nil {
55+
// Panic in case someone traps the panic that occurred during
56+
// initialization and tries to call this again.
57+
panic("Unable to get file monitor")
58+
}
59+
60+
return _monitor
61+
}
62+
63+
func newMonitor() (*monitor, error) {
64+
watcher, err := fsnotify.NewWatcher()
65+
if err != nil {
66+
return nil, err
67+
}
68+
69+
m := &monitor{
70+
watcher: watcher,
71+
records: map[string]map[string]*monitorRecord{},
72+
}
73+
74+
go m.monitor()
75+
return m, nil
76+
}
77+
78+
// Close stops the monitor and waits for the background goroutine to exit.
79+
//
80+
// All channels returned by Path() will be closed.
81+
func (m *monitor) Close() {
82+
select {
83+
case <-m.closed:
84+
// Already closed, nothing to do.
85+
return
86+
default:
87+
// Fallthrough
88+
}
89+
90+
m.watcher.Close()
91+
<-m.closed
92+
93+
m.recordsMu.Lock()
94+
defer m.recordsMu.Unlock()
95+
96+
uniqCh := make(map[chan<- struct{}]struct{})
97+
for dir := range m.records {
98+
for file := range m.records[dir] {
99+
record, ok := m.records[dir][file]
100+
if !ok {
101+
continue
102+
}
103+
for _, ch := range record.channels {
104+
uniqCh[ch] = struct{}{}
105+
}
106+
}
107+
}
108+
for ch := range uniqCh {
109+
close(ch)
110+
}
111+
m.records = nil
112+
}
113+
114+
// Path returns a channel that receives an event when the contents of a file
115+
// change. The file does not need to exist before calling this method, however
116+
// the provided path should only be a file or a symlink (not a directory,
117+
// device, etc.). The returned channel will be closed when the monitor is
118+
// closed.
119+
func (m *monitor) Path(p string) (<-chan struct{}, error) {
120+
absPath, err := filepath.Abs(p)
121+
if err != nil {
122+
return nil, fmt.Errorf("abs: %w", err)
123+
}
124+
125+
m.recordsMu.Lock()
126+
defer m.recordsMu.Unlock()
127+
128+
dir, base := filepath.Split(absPath)
129+
dir = filepath.Clean(dir)
130+
131+
err = m.watcher.Add(dir)
132+
if err != nil {
133+
return nil, fmt.Errorf("unable to watch directory %q: %w", dir, err)
134+
}
135+
136+
if _, exists := m.records[dir]; !exists {
137+
m.records[dir] = make(map[string]*monitorRecord, 1)
138+
}
139+
140+
ch := make(chan struct{}, 1)
141+
142+
if _, exists := m.records[dir][base]; !exists {
143+
m.records[dir][base] = &monitorRecord{
144+
channels: []chan<- struct{}{ch},
145+
}
146+
} else {
147+
r := m.records[dir][base]
148+
r.channels = append(r.channels, ch)
149+
}
150+
m.records[dir][base].updateHash(filepath.Join(dir, base))
151+
152+
return ch, nil
153+
}
154+
155+
func (m *monitor) monitor() {
156+
closed := make(chan struct{})
157+
m.closed = closed
158+
defer close(closed)
159+
160+
for {
161+
select {
162+
case err, ok := <-m.watcher.Errors:
163+
if !ok {
164+
return
165+
}
166+
167+
glog.Errorf("fsnotify error: %v", err)
168+
case e, ok := <-m.watcher.Events:
169+
if !ok {
170+
return
171+
}
172+
173+
glog.V(2).Infof("fsnotify.Event: %v", e)
174+
175+
switch e.Op {
176+
case fsnotify.Create, fsnotify.Write:
177+
// Fallthrough
178+
default:
179+
// Ignore fsnotify.Remove, fsnotify.Rename and fsnotify.Chmod
180+
continue
181+
}
182+
183+
func() {
184+
m.recordsMu.RLock()
185+
defer m.recordsMu.RUnlock()
186+
187+
dir := filepath.Dir(e.Name)
188+
189+
dc := m.records[dir]
190+
if dc == nil {
191+
return
192+
}
193+
194+
for base, record := range dc {
195+
path := filepath.Join(dir, base)
196+
if !record.updateHash(path) {
197+
continue
198+
}
199+
for _, ch := range record.channels {
200+
// Send the event, ignore any that would block.
201+
select {
202+
case ch <- struct{}{}:
203+
default:
204+
glog.Errorf("monitor: missed event for path %s", path)
205+
}
206+
}
207+
}
208+
}()
209+
}
210+
}
211+
}
212+
213+
type monitorRecord struct {
214+
channels []chan<- struct{}
215+
hash uint32
216+
}
217+
218+
// updateHash recalculates the hash and returns true if it has changed.
219+
func (mr *monitorRecord) updateHash(path string) bool {
220+
newHash := mr.calculateHash(path)
221+
oldHash := atomic.SwapUint32(&mr.hash, newHash)
222+
return oldHash != newHash
223+
}
224+
225+
func (monitorRecord) calculateHash(path string) uint32 {
226+
fh, err := os.Open(path)
227+
if err != nil {
228+
return 0
229+
}
230+
defer fh.Close()
231+
232+
h := crc32.NewIEEE()
233+
_, err = io.Copy(h, fh)
234+
if err != nil {
235+
return 0
236+
}
237+
return h.Sum32()
238+
}

0 commit comments

Comments
 (0)