Skip to content

Commit 8486693

Browse files
author
abushwang
committed
add hot reloading for fs config
Signed-off-by: abushwang <[email protected]>
1 parent 60de78b commit 8486693

File tree

8 files changed

+318
-40
lines changed

8 files changed

+318
-40
lines changed
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"context"
21+
"path/filepath"
22+
"reflect"
23+
"sync"
24+
"time"
25+
26+
"github.com/containerd/containerd/v2/core/snapshots"
27+
"github.com/containerd/log"
28+
fsconfig "github.com/containerd/stargz-snapshotter/fs/config"
29+
"github.com/fsnotify/fsnotify"
30+
"github.com/pelletier/go-toml"
31+
)
32+
33+
// hotReloadBlacklist contains the list of configuration fields that do not support hot reloading.
34+
var hotReloadBlacklist = []string{
35+
"NoPrometheus",
36+
}
37+
38+
// WatchConfig monitors the specified configuration file for changes.
39+
// It triggers the config reload when a change is detected.
40+
// The monitoring runs in a separate goroutine.
41+
func WatchConfig(ctx context.Context, filePath string, rs snapshots.Snapshotter) error {
42+
absFilePath, err := filepath.Abs(filePath)
43+
if err != nil {
44+
return err
45+
}
46+
47+
watchDir := filepath.Dir(absFilePath)
48+
49+
watcher, err := fsnotify.NewWatcher()
50+
if err != nil {
51+
return err
52+
}
53+
54+
if err := watcher.Add(watchDir); err != nil {
55+
watcher.Close()
56+
return err
57+
}
58+
59+
log.G(ctx).Infof("started monitoring config file: %s", absFilePath)
60+
61+
cw := &configWatcher{}
62+
63+
go func() {
64+
defer watcher.Close()
65+
66+
var (
67+
debounceTimer *time.Timer
68+
mu sync.Mutex
69+
)
70+
71+
for {
72+
select {
73+
case event, ok := <-watcher.Events:
74+
if !ok {
75+
return
76+
}
77+
78+
if event.Name == absFilePath {
79+
// Trigger on Write, Create, Rename, or Chmod events
80+
// such as vim, nano, etc.
81+
if event.Has(fsnotify.Write) || event.Has(fsnotify.Create) ||
82+
event.Has(fsnotify.Rename) || event.Has(fsnotify.Chmod) {
83+
84+
mu.Lock()
85+
if debounceTimer != nil {
86+
debounceTimer.Stop()
87+
}
88+
// Debounce changes with a 50ms delay
89+
debounceTimer = time.AfterFunc(50*time.Millisecond, func() {
90+
log.G(ctx).Infof("config file modification detected: %s", absFilePath)
91+
cw.reload(ctx, absFilePath, rs)
92+
})
93+
mu.Unlock()
94+
}
95+
}
96+
97+
case err, ok := <-watcher.Errors:
98+
if !ok {
99+
return
100+
}
101+
log.G(ctx).WithError(err).Error("config watcher encountered an error")
102+
}
103+
}
104+
}()
105+
106+
return nil
107+
}
108+
109+
type configWatcher struct {
110+
lastConfig *fsconfig.Config
111+
mu sync.Mutex
112+
}
113+
114+
func (w *configWatcher) reload(ctx context.Context, configPath string, rs snapshots.Snapshotter) {
115+
log.G(ctx).Infof("Config file %s changed, reloading...", configPath)
116+
var newConfig snapshotterConfig
117+
tree, err := toml.LoadFile(configPath)
118+
if err != nil {
119+
log.G(ctx).WithError(err).Error("failed to reload config file")
120+
return
121+
}
122+
if err := tree.Unmarshal(&newConfig); err != nil {
123+
log.G(ctx).WithError(err).Error("failed to unmarshal config")
124+
return
125+
}
126+
127+
newFsConfig := newConfig.Config.Config
128+
129+
w.mu.Lock()
130+
defer w.mu.Unlock()
131+
132+
if w.lastConfig != nil {
133+
revertBlacklistedChanges(ctx, w.lastConfig, &newFsConfig)
134+
}
135+
136+
if w.lastConfig != nil && reflect.DeepEqual(*w.lastConfig, newFsConfig) {
137+
log.G(ctx).Info("Config content unchanged, skipping update")
138+
return
139+
}
140+
141+
if updater, ok := rs.(interface {
142+
UpdateConfig(context.Context, fsconfig.Config) error
143+
}); ok {
144+
log.G(ctx).Debugf("applying new config: %+v", newFsConfig)
145+
if err := updater.UpdateConfig(ctx, newFsConfig); err != nil {
146+
log.G(ctx).WithError(err).Error("failed to update config")
147+
} else {
148+
log.G(ctx).Info("Config updated successfully")
149+
cfgCopy := newFsConfig
150+
w.lastConfig = &cfgCopy
151+
}
152+
} else {
153+
log.G(ctx).Warn("snapshotter does not support config update")
154+
}
155+
}
156+
157+
func revertBlacklistedChanges(ctx context.Context, oldConfig, newConfig *fsconfig.Config) {
158+
oldVal := reflect.ValueOf(oldConfig).Elem()
159+
newVal := reflect.ValueOf(newConfig).Elem()
160+
161+
for _, fieldName := range hotReloadBlacklist {
162+
fOld := oldVal.FieldByName(fieldName)
163+
fNew := newVal.FieldByName(fieldName)
164+
165+
if !fOld.IsValid() || !fNew.IsValid() {
166+
log.G(ctx).Warnf("Field %s not found in config struct", fieldName)
167+
continue
168+
}
169+
170+
if !reflect.DeepEqual(fOld.Interface(), fNew.Interface()) {
171+
log.G(ctx).Warnf("Ignoring update for '%s' as it does not support hot reloading. Keeping old value: %v", fieldName, fOld.Interface())
172+
fNew.Set(fOld)
173+
}
174+
}
175+
}

cmd/containerd-stargz-grpc/main.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,10 @@ func main() {
260260
}
261261
}
262262

263+
if err := WatchConfig(ctx, *configPath, rs); err != nil {
264+
log.G(ctx).WithError(err).Warn("failed to start config watcher")
265+
}
266+
263267
cleanup, err := serve(ctx, rpc, *address, rs, config)
264268
if err != nil {
265269
log.G(ctx).WithError(err).Fatalf("failed to serve snapshotter")

cmd/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ require (
1616
github.com/containerd/stargz-snapshotter/ipfs v0.18.1
1717
github.com/coreos/go-systemd/v22 v22.6.0
1818
github.com/docker/go-metrics v0.0.1
19+
github.com/fsnotify/fsnotify v1.9.0
1920
github.com/goccy/go-json v0.10.5
2021
github.com/klauspost/compress v1.18.1
2122
github.com/opencontainers/go-digest v1.0.0
@@ -56,7 +57,6 @@ require (
5657
github.com/docker/go-units v0.5.0 // indirect
5758
github.com/emicklei/go-restful/v3 v3.12.2 // indirect
5859
github.com/felixge/httpsnoop v1.0.4 // indirect
59-
github.com/fsnotify/fsnotify v1.9.0 // indirect
6060
github.com/fxamacker/cbor/v2 v2.9.0 // indirect
6161
github.com/go-logr/logr v1.4.3 // indirect
6262
github.com/go-logr/stdr v1.2.2 // indirect

docs/overview.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,13 @@ insecure = true
260260
261261
The config file can be passed to stargz snapshotter using `containerd-stargz-grpc`'s `--config` option.
262262

263+
## Configuration hot reload
264+
265+
[Fs configurations](/fs/config/config.go) supports hot reloading. When the configuration file is modified, the snapshotter detects the change and applies the new configuration without restarting the process.
266+
267+
Note that other configurations (e.g. `proxy_plugins`, `fuse_manager`, `resolver`, `mount_options`) require a restart to take effect.
268+
Also, some specific fields in `[stargz]` section (e.g. `no_prometheus`) do not support hot reloading and changes to them will be ignored until restart.
269+
263270
## Make your remote snapshotter
264271

265272
It isn't difficult for you to implement your remote snapshotter using [our general snapshotter package](/snapshot) without considering the protocol between that and containerd.

fs/fs.go

Lines changed: 58 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,18 @@ var (
7777
metricsCtr *layermetrics.Controller
7878
)
7979

80+
func applyDefaults(cfg *config.Config) {
81+
if cfg.MaxConcurrency == 0 {
82+
cfg.MaxConcurrency = defaultMaxConcurrency
83+
}
84+
if cfg.AttrTimeout == 0 {
85+
cfg.AttrTimeout = int64(defaultFuseTimeout.Seconds())
86+
}
87+
if cfg.EntryTimeout == 0 {
88+
cfg.EntryTimeout = int64(defaultFuseTimeout.Seconds())
89+
}
90+
}
91+
8092
type Option func(*options)
8193

8294
type options struct {
@@ -132,20 +144,11 @@ func NewFilesystem(root string, cfg config.Config, opts ...Option) (_ snapshot.F
132144
for _, o := range opts {
133145
o(&fsOpts)
134146
}
135-
maxConcurrency := cfg.MaxConcurrency
136-
if maxConcurrency == 0 {
137-
maxConcurrency = defaultMaxConcurrency
138-
}
147+
applyDefaults(&cfg)
139148

149+
maxConcurrency := cfg.MaxConcurrency
140150
attrTimeout := time.Duration(cfg.AttrTimeout) * time.Second
141-
if attrTimeout == 0 {
142-
attrTimeout = defaultFuseTimeout
143-
}
144-
145151
entryTimeout := time.Duration(cfg.EntryTimeout) * time.Second
146-
if entryTimeout == 0 {
147-
entryTimeout = defaultFuseTimeout
148-
}
149152

150153
metadataStore := fsOpts.metadataStore
151154
if metadataStore == nil {
@@ -205,6 +208,7 @@ type filesystem struct {
205208
debug bool
206209
layer map[string]layer.Layer
207210
layerMu sync.Mutex
211+
configMu sync.RWMutex
208212
backgroundTaskManager *task.BackgroundTaskManager
209213
allowNoVerification bool
210214
disableVerification bool
@@ -214,6 +218,23 @@ type filesystem struct {
214218
entryTimeout time.Duration
215219
}
216220

221+
func (fs *filesystem) UpdateConfig(ctx context.Context, cfg config.Config) error {
222+
applyDefaults(&cfg)
223+
fs.configMu.Lock()
224+
fs.prefetchSize = cfg.PrefetchSize
225+
fs.noprefetch = cfg.NoPrefetch
226+
fs.noBackgroundFetch = cfg.NoBackgroundFetch
227+
fs.debug = cfg.Debug
228+
fs.allowNoVerification = cfg.AllowNoVerification
229+
fs.disableVerification = cfg.DisableVerification
230+
fs.attrTimeout = time.Duration(cfg.AttrTimeout) * time.Second
231+
fs.entryTimeout = time.Duration(cfg.EntryTimeout) * time.Second
232+
fs.configMu.Unlock()
233+
234+
fs.resolver.UpdateConfig(ctx, cfg)
235+
return nil
236+
}
237+
217238
func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[string]string) (retErr error) {
218239
// Setting the start time to measure the Mount operation duration.
219240
start := time.Now()
@@ -233,7 +254,9 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
233254
return fmt.Errorf("source must be passed")
234255
}
235256

257+
fs.configMu.RLock()
236258
defaultPrefetchSize := fs.prefetchSize
259+
fs.configMu.RUnlock()
237260
if psStr, ok := labels[config.TargetPrefetchSizeLabel]; ok {
238261
if ps, err := strconv.ParseInt(psStr, 10, 64); err == nil {
239262
defaultPrefetchSize = ps
@@ -297,7 +320,15 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
297320
}()
298321

299322
// Verify layer's content
300-
if fs.disableVerification {
323+
fs.configMu.RLock()
324+
disableVerification := fs.disableVerification
325+
allowNoVerification := fs.allowNoVerification
326+
attrTimeout := fs.attrTimeout
327+
entryTimeout := fs.entryTimeout
328+
debug := fs.debug
329+
fs.configMu.RUnlock()
330+
331+
if disableVerification {
301332
// Skip if verification is disabled completely
302333
l.SkipVerify()
303334
log.G(ctx).Infof("Verification forcefully skipped")
@@ -313,7 +344,7 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
313344
return fmt.Errorf("invalid stargz layer: %w", err)
314345
}
315346
log.G(ctx).Debugf("verified")
316-
} else if _, ok := labels[config.TargetSkipVerifyLabel]; ok && fs.allowNoVerification {
347+
} else if _, ok := labels[config.TargetSkipVerifyLabel]; ok && allowNoVerification {
317348
// If unverified layer is allowed, use it with warning.
318349
// This mode is for legacy stargz archives which don't contain digests
319350
// necessary for layer verification.
@@ -342,14 +373,14 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
342373
// mount the node to the specified mountpoint
343374
// TODO: bind mount the state directory as a read-only fs on snapshotter's side
344375
rawFS := fusefs.NewNodeFS(node, &fusefs.Options{
345-
AttrTimeout: &fs.attrTimeout,
346-
EntryTimeout: &fs.entryTimeout,
376+
AttrTimeout: &attrTimeout,
377+
EntryTimeout: &entryTimeout,
347378
NullPermissions: true,
348379
})
349380
mountOpts := &fuse.MountOptions{
350381
AllowOther: true, // allow users other than root&mounter to access fs
351382
FsName: "stargz", // name this filesystem as "stargz"
352-
Debug: fs.debug,
383+
Debug: debug,
353384
DirectMount: true,
354385
}
355386
server, err := fuse.NewServer(rawFS, mountpoint, mountOpts)
@@ -391,7 +422,10 @@ func (fs *filesystem) Check(ctx context.Context, mountpoint string, labels map[s
391422
}
392423

393424
// Wait for prefetch compeletion
394-
if !fs.noprefetch {
425+
fs.configMu.RLock()
426+
noprefetch := fs.noprefetch
427+
fs.configMu.RUnlock()
428+
if !noprefetch {
395429
if err := l.WaitForPrefetchCompletion(); err != nil {
396430
log.G(ctx).WithError(err).Warn("failed to sync with prefetch completion")
397431
}
@@ -473,12 +507,17 @@ func unmount(target string, flags int) error {
473507

474508
func (fs *filesystem) prefetch(ctx context.Context, l layer.Layer, defaultPrefetchSize int64, start time.Time) {
475509
// Prefetch a layer. The first Check() for this layer waits for the prefetch completion.
476-
if !fs.noprefetch {
510+
fs.configMu.RLock()
511+
noprefetch := fs.noprefetch
512+
noBackgroundFetch := fs.noBackgroundFetch
513+
fs.configMu.RUnlock()
514+
515+
if !noprefetch {
477516
go l.Prefetch(defaultPrefetchSize)
478517
}
479518

480519
// Fetch whole layer aggressively in background.
481-
if !fs.noBackgroundFetch {
520+
if !noBackgroundFetch {
482521
go func() {
483522
if err := l.BackgroundFetch(); err == nil {
484523
// write log record for the latency between mount start and last on demand fetch

0 commit comments

Comments
 (0)