Skip to content

Commit 422f754

Browse files
author
abushwang
committed
add hot reloading for fs config
Signed-off-by: abushwang <[email protected]>
1 parent 60de78b commit 422f754

File tree

8 files changed

+288
-40
lines changed

8 files changed

+288
-40
lines changed
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"context"
21+
"path/filepath"
22+
"reflect"
23+
"sync"
24+
"time"
25+
26+
"github.com/containerd/containerd/v2/core/snapshots"
27+
"github.com/containerd/log"
28+
fsconfig "github.com/containerd/stargz-snapshotter/fs/config"
29+
"github.com/fsnotify/fsnotify"
30+
"github.com/pelletier/go-toml"
31+
)
32+
33+
// WatchConfig monitors the specified configuration file for changes.
34+
// It triggers the config reload when a change is detected.
35+
// The monitoring runs in a separate goroutine.
36+
func WatchConfig(ctx context.Context, filePath string, rs snapshots.Snapshotter) error {
37+
absFilePath, err := filepath.Abs(filePath)
38+
if err != nil {
39+
return err
40+
}
41+
42+
watchDir := filepath.Dir(absFilePath)
43+
44+
watcher, err := fsnotify.NewWatcher()
45+
if err != nil {
46+
return err
47+
}
48+
49+
if err := watcher.Add(watchDir); err != nil {
50+
watcher.Close()
51+
return err
52+
}
53+
54+
log.G(ctx).Infof("started monitoring config file: %s", absFilePath)
55+
56+
cw := &configWatcher{}
57+
58+
go func() {
59+
defer watcher.Close()
60+
61+
var (
62+
debounceTimer *time.Timer
63+
mu sync.Mutex
64+
)
65+
66+
for {
67+
select {
68+
case event, ok := <-watcher.Events:
69+
if !ok {
70+
return
71+
}
72+
73+
if event.Name == absFilePath {
74+
// Trigger on Write, Create, Rename, or Chmod events
75+
// such as vim, nano, etc.
76+
if event.Has(fsnotify.Write) || event.Has(fsnotify.Create) ||
77+
event.Has(fsnotify.Rename) || event.Has(fsnotify.Chmod) {
78+
79+
mu.Lock()
80+
if debounceTimer != nil {
81+
debounceTimer.Stop()
82+
}
83+
// Debounce changes with a 50ms delay
84+
debounceTimer = time.AfterFunc(50*time.Millisecond, func() {
85+
log.G(ctx).Infof("config file modification detected: %s", absFilePath)
86+
cw.reload(ctx, absFilePath, rs)
87+
})
88+
mu.Unlock()
89+
}
90+
}
91+
92+
case err, ok := <-watcher.Errors:
93+
if !ok {
94+
return
95+
}
96+
log.G(ctx).WithError(err).Error("config watcher encountered an error")
97+
}
98+
}
99+
}()
100+
101+
return nil
102+
}
103+
104+
type configWatcher struct {
105+
lastConfig *fsconfig.Config
106+
mu sync.Mutex
107+
}
108+
109+
func (w *configWatcher) reload(ctx context.Context, configPath string, rs snapshots.Snapshotter) {
110+
log.G(ctx).Infof("Config file %s changed, reloading...", configPath)
111+
var newConfig snapshotterConfig
112+
tree, err := toml.LoadFile(configPath)
113+
if err != nil {
114+
log.G(ctx).WithError(err).Error("failed to reload config file")
115+
return
116+
}
117+
if err := tree.Unmarshal(&newConfig); err != nil {
118+
log.G(ctx).WithError(err).Error("failed to unmarshal config")
119+
return
120+
}
121+
122+
newFsConfig := newConfig.Config.Config
123+
124+
w.mu.Lock()
125+
defer w.mu.Unlock()
126+
127+
if w.lastConfig != nil && reflect.DeepEqual(*w.lastConfig, newFsConfig) {
128+
log.G(ctx).Info("Config content unchanged, skipping update")
129+
return
130+
}
131+
132+
if updater, ok := rs.(interface {
133+
UpdateConfig(context.Context, fsconfig.Config) error
134+
}); ok {
135+
log.G(ctx).Debugf("applying new config: %+v", newFsConfig)
136+
if err := updater.UpdateConfig(ctx, newFsConfig); err != nil {
137+
log.G(ctx).WithError(err).Error("failed to update config")
138+
} else {
139+
log.G(ctx).Info("Config updated successfully")
140+
cfgCopy := newFsConfig
141+
w.lastConfig = &cfgCopy
142+
}
143+
} else {
144+
log.G(ctx).Warn("snapshotter does not support config update")
145+
}
146+
}

cmd/containerd-stargz-grpc/main.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,10 @@ func main() {
260260
}
261261
}
262262

263+
if err := WatchConfig(ctx, *configPath, rs); err != nil {
264+
log.G(ctx).WithError(err).Warn("failed to start config watcher")
265+
}
266+
263267
cleanup, err := serve(ctx, rpc, *address, rs, config)
264268
if err != nil {
265269
log.G(ctx).WithError(err).Fatalf("failed to serve snapshotter")

cmd/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ require (
1616
github.com/containerd/stargz-snapshotter/ipfs v0.18.1
1717
github.com/coreos/go-systemd/v22 v22.6.0
1818
github.com/docker/go-metrics v0.0.1
19+
github.com/fsnotify/fsnotify v1.9.0
1920
github.com/goccy/go-json v0.10.5
2021
github.com/klauspost/compress v1.18.1
2122
github.com/opencontainers/go-digest v1.0.0
@@ -56,7 +57,6 @@ require (
5657
github.com/docker/go-units v0.5.0 // indirect
5758
github.com/emicklei/go-restful/v3 v3.12.2 // indirect
5859
github.com/felixge/httpsnoop v1.0.4 // indirect
59-
github.com/fsnotify/fsnotify v1.9.0 // indirect
6060
github.com/fxamacker/cbor/v2 v2.9.0 // indirect
6161
github.com/go-logr/logr v1.4.3 // indirect
6262
github.com/go-logr/stdr v1.2.2 // indirect

docs/overview.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,12 @@ insecure = true
260260
261261
The config file can be passed to stargz snapshotter using `containerd-stargz-grpc`'s `--config` option.
262262

263+
## Configuration hot reload
264+
265+
[Fs configurations](/fs/config/config.go) supports hot reloading. When the configuration file is modified, the snapshotter detects the change and applies the new configuration without restarting the process.
266+
267+
Note that other configurations (e.g. `proxy_plugins`, `fuse_manager`, `resolver`, `mount_options`) require a restart to take effect.
268+
263269
## Make your remote snapshotter
264270

265271
It isn't difficult for you to implement your remote snapshotter using [our general snapshotter package](/snapshot) without considering the protocol between that and containerd.

fs/fs.go

Lines changed: 58 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,18 @@ var (
7777
metricsCtr *layermetrics.Controller
7878
)
7979

80+
func applyDefaults(cfg *config.Config) {
81+
if cfg.MaxConcurrency == 0 {
82+
cfg.MaxConcurrency = defaultMaxConcurrency
83+
}
84+
if cfg.AttrTimeout == 0 {
85+
cfg.AttrTimeout = int64(defaultFuseTimeout.Seconds())
86+
}
87+
if cfg.EntryTimeout == 0 {
88+
cfg.EntryTimeout = int64(defaultFuseTimeout.Seconds())
89+
}
90+
}
91+
8092
type Option func(*options)
8193

8294
type options struct {
@@ -132,20 +144,11 @@ func NewFilesystem(root string, cfg config.Config, opts ...Option) (_ snapshot.F
132144
for _, o := range opts {
133145
o(&fsOpts)
134146
}
135-
maxConcurrency := cfg.MaxConcurrency
136-
if maxConcurrency == 0 {
137-
maxConcurrency = defaultMaxConcurrency
138-
}
147+
applyDefaults(&cfg)
139148

149+
maxConcurrency := cfg.MaxConcurrency
140150
attrTimeout := time.Duration(cfg.AttrTimeout) * time.Second
141-
if attrTimeout == 0 {
142-
attrTimeout = defaultFuseTimeout
143-
}
144-
145151
entryTimeout := time.Duration(cfg.EntryTimeout) * time.Second
146-
if entryTimeout == 0 {
147-
entryTimeout = defaultFuseTimeout
148-
}
149152

150153
metadataStore := fsOpts.metadataStore
151154
if metadataStore == nil {
@@ -205,6 +208,7 @@ type filesystem struct {
205208
debug bool
206209
layer map[string]layer.Layer
207210
layerMu sync.Mutex
211+
configMu sync.RWMutex
208212
backgroundTaskManager *task.BackgroundTaskManager
209213
allowNoVerification bool
210214
disableVerification bool
@@ -214,6 +218,23 @@ type filesystem struct {
214218
entryTimeout time.Duration
215219
}
216220

221+
func (fs *filesystem) UpdateConfig(ctx context.Context, cfg config.Config) error {
222+
applyDefaults(&cfg)
223+
fs.configMu.Lock()
224+
fs.prefetchSize = cfg.PrefetchSize
225+
fs.noprefetch = cfg.NoPrefetch
226+
fs.noBackgroundFetch = cfg.NoBackgroundFetch
227+
fs.debug = cfg.Debug
228+
fs.allowNoVerification = cfg.AllowNoVerification
229+
fs.disableVerification = cfg.DisableVerification
230+
fs.attrTimeout = time.Duration(cfg.AttrTimeout) * time.Second
231+
fs.entryTimeout = time.Duration(cfg.EntryTimeout) * time.Second
232+
fs.configMu.Unlock()
233+
234+
fs.resolver.UpdateConfig(ctx, cfg)
235+
return nil
236+
}
237+
217238
func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[string]string) (retErr error) {
218239
// Setting the start time to measure the Mount operation duration.
219240
start := time.Now()
@@ -233,7 +254,9 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
233254
return fmt.Errorf("source must be passed")
234255
}
235256

257+
fs.configMu.RLock()
236258
defaultPrefetchSize := fs.prefetchSize
259+
fs.configMu.RUnlock()
237260
if psStr, ok := labels[config.TargetPrefetchSizeLabel]; ok {
238261
if ps, err := strconv.ParseInt(psStr, 10, 64); err == nil {
239262
defaultPrefetchSize = ps
@@ -297,7 +320,15 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
297320
}()
298321

299322
// Verify layer's content
300-
if fs.disableVerification {
323+
fs.configMu.RLock()
324+
disableVerification := fs.disableVerification
325+
allowNoVerification := fs.allowNoVerification
326+
attrTimeout := fs.attrTimeout
327+
entryTimeout := fs.entryTimeout
328+
debug := fs.debug
329+
fs.configMu.RUnlock()
330+
331+
if disableVerification {
301332
// Skip if verification is disabled completely
302333
l.SkipVerify()
303334
log.G(ctx).Infof("Verification forcefully skipped")
@@ -313,7 +344,7 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
313344
return fmt.Errorf("invalid stargz layer: %w", err)
314345
}
315346
log.G(ctx).Debugf("verified")
316-
} else if _, ok := labels[config.TargetSkipVerifyLabel]; ok && fs.allowNoVerification {
347+
} else if _, ok := labels[config.TargetSkipVerifyLabel]; ok && allowNoVerification {
317348
// If unverified layer is allowed, use it with warning.
318349
// This mode is for legacy stargz archives which don't contain digests
319350
// necessary for layer verification.
@@ -342,14 +373,14 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
342373
// mount the node to the specified mountpoint
343374
// TODO: bind mount the state directory as a read-only fs on snapshotter's side
344375
rawFS := fusefs.NewNodeFS(node, &fusefs.Options{
345-
AttrTimeout: &fs.attrTimeout,
346-
EntryTimeout: &fs.entryTimeout,
376+
AttrTimeout: &attrTimeout,
377+
EntryTimeout: &entryTimeout,
347378
NullPermissions: true,
348379
})
349380
mountOpts := &fuse.MountOptions{
350381
AllowOther: true, // allow users other than root&mounter to access fs
351382
FsName: "stargz", // name this filesystem as "stargz"
352-
Debug: fs.debug,
383+
Debug: debug,
353384
DirectMount: true,
354385
}
355386
server, err := fuse.NewServer(rawFS, mountpoint, mountOpts)
@@ -391,7 +422,10 @@ func (fs *filesystem) Check(ctx context.Context, mountpoint string, labels map[s
391422
}
392423

393424
// Wait for prefetch compeletion
394-
if !fs.noprefetch {
425+
fs.configMu.RLock()
426+
noprefetch := fs.noprefetch
427+
fs.configMu.RUnlock()
428+
if !noprefetch {
395429
if err := l.WaitForPrefetchCompletion(); err != nil {
396430
log.G(ctx).WithError(err).Warn("failed to sync with prefetch completion")
397431
}
@@ -473,12 +507,17 @@ func unmount(target string, flags int) error {
473507

474508
func (fs *filesystem) prefetch(ctx context.Context, l layer.Layer, defaultPrefetchSize int64, start time.Time) {
475509
// Prefetch a layer. The first Check() for this layer waits for the prefetch completion.
476-
if !fs.noprefetch {
510+
fs.configMu.RLock()
511+
noprefetch := fs.noprefetch
512+
noBackgroundFetch := fs.noBackgroundFetch
513+
fs.configMu.RUnlock()
514+
515+
if !noprefetch {
477516
go l.Prefetch(defaultPrefetchSize)
478517
}
479518

480519
// Fetch whole layer aggressively in background.
481-
if !fs.noBackgroundFetch {
520+
if !noBackgroundFetch {
482521
go func() {
483522
if err := l.BackgroundFetch(); err == nil {
484523
// write log record for the latency between mount start and last on demand fetch

0 commit comments

Comments
 (0)