Skip to content

Commit 3bac957

Browse files
author
abushwang
committed
add hot reloading for fs config
Signed-off-by: abushwang <abushwang@tencent.com>
1 parent 76ac719 commit 3bac957

File tree

8 files changed

+296
-40
lines changed

8 files changed

+296
-40
lines changed
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
Copyright The containerd Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package main
18+
19+
import (
20+
"context"
21+
"path/filepath"
22+
"reflect"
23+
"sync"
24+
"time"
25+
26+
"github.com/containerd/containerd/v2/core/snapshots"
27+
"github.com/containerd/log"
28+
fsconfig "github.com/containerd/stargz-snapshotter/fs/config"
29+
"github.com/fsnotify/fsnotify"
30+
"github.com/pelletier/go-toml"
31+
)
32+
33+
// WatchConfig monitors the specified configuration file for changes.
34+
// It triggers the config reload when a change is detected.
35+
func WatchConfig(
36+
ctx context.Context,
37+
filePath string,
38+
rs snapshots.Snapshotter,
39+
initialConfig *fsconfig.Config,
40+
) error {
41+
absFilePath, err := filepath.Abs(filePath)
42+
if err != nil {
43+
return err
44+
}
45+
46+
watchDir := filepath.Dir(absFilePath)
47+
48+
watcher, err := fsnotify.NewWatcher()
49+
if err != nil {
50+
return err
51+
}
52+
53+
if err := watcher.Add(watchDir); err != nil {
54+
watcher.Close()
55+
return err
56+
}
57+
58+
log.G(ctx).Infof("started monitoring config file: %s", absFilePath)
59+
60+
cw := &configWatcher{
61+
lastConfig: initialConfig,
62+
}
63+
64+
go func() {
65+
defer watcher.Close()
66+
67+
var (
68+
debounceTimer *time.Timer
69+
mu sync.Mutex
70+
)
71+
72+
for {
73+
select {
74+
case event, ok := <-watcher.Events:
75+
if !ok {
76+
return
77+
}
78+
79+
if event.Name == absFilePath {
80+
// Trigger on Write, Create, Rename, or Chmod events
81+
// such as vim, nano, etc.
82+
if event.Has(fsnotify.Write) || event.Has(fsnotify.Create) ||
83+
event.Has(fsnotify.Rename) || event.Has(fsnotify.Chmod) {
84+
85+
mu.Lock()
86+
if debounceTimer != nil {
87+
debounceTimer.Stop()
88+
}
89+
// Debounce changes with a 50ms delay
90+
debounceTimer = time.AfterFunc(50*time.Millisecond, func() {
91+
log.G(ctx).Infof("config file modification detected: %s", absFilePath)
92+
cw.reload(ctx, absFilePath, rs)
93+
})
94+
mu.Unlock()
95+
}
96+
}
97+
98+
case err, ok := <-watcher.Errors:
99+
if !ok {
100+
return
101+
}
102+
log.G(ctx).WithError(err).Error("config watcher encountered an error")
103+
}
104+
}
105+
}()
106+
107+
return nil
108+
}
109+
110+
type configWatcher struct {
111+
lastConfig *fsconfig.Config
112+
mu sync.Mutex
113+
}
114+
115+
func (w *configWatcher) reload(ctx context.Context, configPath string, rs snapshots.Snapshotter) {
116+
log.G(ctx).Infof("Config file %s changed, reloading...", configPath)
117+
var newConfig snapshotterConfig
118+
tree, err := toml.LoadFile(configPath)
119+
if err != nil {
120+
log.G(ctx).WithError(err).Error("failed to reload config file")
121+
return
122+
}
123+
if err := tree.Unmarshal(&newConfig); err != nil {
124+
log.G(ctx).WithError(err).Error("failed to unmarshal config")
125+
return
126+
}
127+
128+
newFsConfig := newConfig.Config.Config
129+
130+
w.mu.Lock()
131+
defer w.mu.Unlock()
132+
133+
if w.lastConfig != nil && reflect.DeepEqual(*w.lastConfig, newFsConfig) {
134+
log.G(ctx).Info("Config content unchanged, skipping update")
135+
return
136+
}
137+
138+
if updater, ok := rs.(interface {
139+
UpdateConfig(context.Context, fsconfig.Config) error
140+
}); ok {
141+
log.G(ctx).Debugf("applying new config: %+v", newFsConfig)
142+
if err := updater.UpdateConfig(ctx, newFsConfig); err != nil {
143+
log.G(ctx).WithError(err).Error("failed to update config")
144+
} else {
145+
log.G(ctx).Info("Config updated successfully")
146+
cfgCopy := newFsConfig
147+
w.lastConfig = &cfgCopy
148+
}
149+
} else {
150+
log.G(ctx).Warn("snapshotter does not support config update")
151+
}
152+
}

cmd/containerd-stargz-grpc/main.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,10 @@ func main() {
260260
}
261261
}
262262

263+
if err := WatchConfig(ctx, *configPath, rs, &config.Config.Config); err != nil {
264+
log.G(ctx).WithError(err).Warn("failed to start config watcher")
265+
}
266+
263267
cleanup, err := serve(ctx, rpc, *address, rs, config)
264268
if err != nil {
265269
log.G(ctx).WithError(err).Fatalf("failed to serve snapshotter")

cmd/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ require (
1414
github.com/containerd/stargz-snapshotter/ipfs v0.18.1
1515
github.com/coreos/go-systemd/v22 v22.6.0
1616
github.com/docker/go-metrics v0.0.1
17+
github.com/fsnotify/fsnotify v1.9.0
1718
github.com/goccy/go-json v0.10.5
1819
github.com/klauspost/compress v1.18.2
1920
github.com/opencontainers/go-digest v1.0.0
@@ -56,7 +57,6 @@ require (
5657
github.com/docker/go-units v0.5.0 // indirect
5758
github.com/emicklei/go-restful/v3 v3.13.0 // indirect
5859
github.com/felixge/httpsnoop v1.0.4 // indirect
59-
github.com/fsnotify/fsnotify v1.9.0 // indirect
6060
github.com/fxamacker/cbor/v2 v2.9.0 // indirect
6161
github.com/go-logr/logr v1.4.3 // indirect
6262
github.com/go-logr/stdr v1.2.2 // indirect

docs/overview.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,14 @@ insecure = true
260260
261261
The config file can be passed to stargz snapshotter using `containerd-stargz-grpc`'s `--config` option.
262262

263+
## Configuration hot reload
264+
265+
[Fs configurations](/fs/config/config.go) supports hot reloading. When the configuration file is modified, the snapshotter detects the change and applies the new configuration without restarting the process.
266+
This enables instant performance tuning (e.g. concurrency, timeouts) without I/O suspension, and allows updating FUSE parameters that cannot be changed by simply restarting the main process when FUSE manager is enabled.
267+
268+
Note that other configurations (e.g. `proxy_plugins`, `fuse_manager`, `resolver`, `mount_options`) require a restart to take effect.
269+
Also, some specific fields in `[stargz]` section (e.g. `no_prometheus`) do not support hot reloading and changes to them will be ignored until restart.
270+
263271
## Make your remote snapshotter
264272

265273
It isn't difficult for you to implement your remote snapshotter using [our general snapshotter package](/snapshot) without considering the protocol between that and containerd.

fs/fs.go

Lines changed: 58 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,18 @@ var (
7777
metricsCtr *layermetrics.Controller
7878
)
7979

80+
func applyDefaults(cfg *config.Config) {
81+
if cfg.MaxConcurrency == 0 {
82+
cfg.MaxConcurrency = defaultMaxConcurrency
83+
}
84+
if cfg.AttrTimeout == 0 {
85+
cfg.AttrTimeout = int64(defaultFuseTimeout.Seconds())
86+
}
87+
if cfg.EntryTimeout == 0 {
88+
cfg.EntryTimeout = int64(defaultFuseTimeout.Seconds())
89+
}
90+
}
91+
8092
type Option func(*options)
8193

8294
type options struct {
@@ -132,20 +144,11 @@ func NewFilesystem(root string, cfg config.Config, opts ...Option) (_ snapshot.F
132144
for _, o := range opts {
133145
o(&fsOpts)
134146
}
135-
maxConcurrency := cfg.MaxConcurrency
136-
if maxConcurrency == 0 {
137-
maxConcurrency = defaultMaxConcurrency
138-
}
147+
applyDefaults(&cfg)
139148

149+
maxConcurrency := cfg.MaxConcurrency
140150
attrTimeout := time.Duration(cfg.AttrTimeout) * time.Second
141-
if attrTimeout == 0 {
142-
attrTimeout = defaultFuseTimeout
143-
}
144-
145151
entryTimeout := time.Duration(cfg.EntryTimeout) * time.Second
146-
if entryTimeout == 0 {
147-
entryTimeout = defaultFuseTimeout
148-
}
149152

150153
metadataStore := fsOpts.metadataStore
151154
if metadataStore == nil {
@@ -205,6 +208,7 @@ type filesystem struct {
205208
debug bool
206209
layer map[string]layer.Layer
207210
layerMu sync.Mutex
211+
configMu sync.RWMutex
208212
backgroundTaskManager *task.BackgroundTaskManager
209213
allowNoVerification bool
210214
disableVerification bool
@@ -214,6 +218,23 @@ type filesystem struct {
214218
entryTimeout time.Duration
215219
}
216220

221+
func (fs *filesystem) UpdateConfig(ctx context.Context, cfg config.Config) error {
222+
applyDefaults(&cfg)
223+
fs.configMu.Lock()
224+
fs.prefetchSize = cfg.PrefetchSize
225+
fs.noprefetch = cfg.NoPrefetch
226+
fs.noBackgroundFetch = cfg.NoBackgroundFetch
227+
fs.debug = cfg.Debug
228+
fs.allowNoVerification = cfg.AllowNoVerification
229+
fs.disableVerification = cfg.DisableVerification
230+
fs.attrTimeout = time.Duration(cfg.AttrTimeout) * time.Second
231+
fs.entryTimeout = time.Duration(cfg.EntryTimeout) * time.Second
232+
fs.configMu.Unlock()
233+
234+
fs.resolver.UpdateConfig(ctx, cfg)
235+
return nil
236+
}
237+
217238
func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[string]string) (retErr error) {
218239
// Setting the start time to measure the Mount operation duration.
219240
start := time.Now()
@@ -233,7 +254,9 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
233254
return fmt.Errorf("source must be passed")
234255
}
235256

257+
fs.configMu.RLock()
236258
defaultPrefetchSize := fs.prefetchSize
259+
fs.configMu.RUnlock()
237260
if psStr, ok := labels[config.TargetPrefetchSizeLabel]; ok {
238261
if ps, err := strconv.ParseInt(psStr, 10, 64); err == nil {
239262
defaultPrefetchSize = ps
@@ -297,7 +320,15 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
297320
}()
298321

299322
// Verify layer's content
300-
if fs.disableVerification {
323+
fs.configMu.RLock()
324+
disableVerification := fs.disableVerification
325+
allowNoVerification := fs.allowNoVerification
326+
attrTimeout := fs.attrTimeout
327+
entryTimeout := fs.entryTimeout
328+
debug := fs.debug
329+
fs.configMu.RUnlock()
330+
331+
if disableVerification {
301332
// Skip if verification is disabled completely
302333
l.SkipVerify()
303334
log.G(ctx).Infof("Verification forcefully skipped")
@@ -313,7 +344,7 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
313344
return fmt.Errorf("invalid stargz layer: %w", err)
314345
}
315346
log.G(ctx).Debugf("verified")
316-
} else if _, ok := labels[config.TargetSkipVerifyLabel]; ok && fs.allowNoVerification {
347+
} else if _, ok := labels[config.TargetSkipVerifyLabel]; ok && allowNoVerification {
317348
// If unverified layer is allowed, use it with warning.
318349
// This mode is for legacy stargz archives which don't contain digests
319350
// necessary for layer verification.
@@ -342,14 +373,14 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
342373
// mount the node to the specified mountpoint
343374
// TODO: bind mount the state directory as a read-only fs on snapshotter's side
344375
rawFS := fusefs.NewNodeFS(node, &fusefs.Options{
345-
AttrTimeout: &fs.attrTimeout,
346-
EntryTimeout: &fs.entryTimeout,
376+
AttrTimeout: &attrTimeout,
377+
EntryTimeout: &entryTimeout,
347378
NullPermissions: true,
348379
})
349380
mountOpts := &fuse.MountOptions{
350381
AllowOther: true, // allow users other than root&mounter to access fs
351382
FsName: "stargz", // name this filesystem as "stargz"
352-
Debug: fs.debug,
383+
Debug: debug,
353384
DirectMount: true,
354385
}
355386
server, err := fuse.NewServer(rawFS, mountpoint, mountOpts)
@@ -391,7 +422,10 @@ func (fs *filesystem) Check(ctx context.Context, mountpoint string, labels map[s
391422
}
392423

393424
// Wait for prefetch compeletion
394-
if !fs.noprefetch {
425+
fs.configMu.RLock()
426+
noprefetch := fs.noprefetch
427+
fs.configMu.RUnlock()
428+
if !noprefetch {
395429
if err := l.WaitForPrefetchCompletion(); err != nil {
396430
log.G(ctx).WithError(err).Warn("failed to sync with prefetch completion")
397431
}
@@ -473,12 +507,17 @@ func unmount(target string, flags int) error {
473507

474508
func (fs *filesystem) prefetch(ctx context.Context, l layer.Layer, defaultPrefetchSize int64, start time.Time) {
475509
// Prefetch a layer. The first Check() for this layer waits for the prefetch completion.
476-
if !fs.noprefetch {
510+
fs.configMu.RLock()
511+
noprefetch := fs.noprefetch
512+
noBackgroundFetch := fs.noBackgroundFetch
513+
fs.configMu.RUnlock()
514+
515+
if !noprefetch {
477516
go l.Prefetch(defaultPrefetchSize)
478517
}
479518

480519
// Fetch whole layer aggressively in background.
481-
if !fs.noBackgroundFetch {
520+
if !noBackgroundFetch {
482521
go func() {
483522
if err := l.BackgroundFetch(); err == nil {
484523
// write log record for the latency between mount start and last on demand fetch

0 commit comments

Comments
 (0)