Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions cmd/containerd-stargz-grpc/cfgwatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
Copyright The containerd Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
"context"
"path/filepath"
"reflect"
"sync"
"time"

"github.com/containerd/containerd/v2/core/snapshots"
"github.com/containerd/log"
fsconfig "github.com/containerd/stargz-snapshotter/fs/config"
"github.com/fsnotify/fsnotify"
"github.com/pelletier/go-toml"
)

// WatchConfig monitors the specified configuration file for changes.
// It triggers the config reload when a change is detected.
func WatchConfig(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to export this symbol.

ctx context.Context,
filePath string,
rs snapshots.Snapshotter,
initialConfig *fsconfig.Config,
) error {
absFilePath, err := filepath.Abs(filePath)
if err != nil {
return err
}

watchDir := filepath.Dir(absFilePath)

watcher, err := fsnotify.NewWatcher()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work in the rootless settings?

if err != nil {
return err
}

if err := watcher.Add(watchDir); err != nil {
watcher.Close()
return err
}

log.G(ctx).Infof("started monitoring config file: %s", absFilePath)

cw := &configWatcher{
lastConfig: initialConfig,
}

go func() {
defer watcher.Close()

var (
debounceTimer *time.Timer
mu sync.Mutex
Comment on lines +67 to +69
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is debounceTimer protected by the mutex? Is this accessed from multiple goroutines? I cound't find new goroutine creation from inside of this function.

)

for {
select {
case event, ok := <-watcher.Events:
if !ok {
return
}

if event.Name == absFilePath {
// Trigger on Write, Create, Rename, or Chmod events
// such as vim, nano, etc.
if event.Has(fsnotify.Write) || event.Has(fsnotify.Create) ||
event.Has(fsnotify.Rename) || event.Has(fsnotify.Chmod) {
Comment on lines +79 to +83
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fsnotify.Rename

If the file is renamed, the next change won't be detected, will it? Is this an expected behaviour?


mu.Lock()
if debounceTimer != nil {
debounceTimer.Stop()
}
// Debounce changes with a 50ms delay
debounceTimer = time.AfterFunc(50*time.Millisecond, func() {
log.G(ctx).Infof("config file modification detected: %s", absFilePath)
cw.reload(ctx, absFilePath, rs)
})
mu.Unlock()
}
}

case err, ok := <-watcher.Errors:
if !ok {
return
}
log.G(ctx).WithError(err).Error("config watcher encountered an error")
}
}
}()

return nil
}

type configWatcher struct {
lastConfig *fsconfig.Config
mu sync.Mutex
}

func (w *configWatcher) reload(ctx context.Context, configPath string, rs snapshots.Snapshotter) {
log.G(ctx).Infof("Config file %s changed, reloading...", configPath)
var newConfig snapshotterConfig
tree, err := toml.LoadFile(configPath)
if err != nil {
log.G(ctx).WithError(err).Error("failed to reload config file")
return
}
if err := tree.Unmarshal(&newConfig); err != nil {
log.G(ctx).WithError(err).Error("failed to unmarshal config")
return
}

newFsConfig := newConfig.Config.Config

w.mu.Lock()
defer w.mu.Unlock()

if w.lastConfig != nil && reflect.DeepEqual(*w.lastConfig, newFsConfig) {
log.G(ctx).Info("Config content unchanged, skipping update")
return
}

if updater, ok := rs.(interface {
UpdateConfig(context.Context, fsconfig.Config) error
}); ok {
log.G(ctx).Debugf("applying new config: %+v", newFsConfig)
if err := updater.UpdateConfig(ctx, newFsConfig); err != nil {
log.G(ctx).WithError(err).Error("failed to update config")
} else {
log.G(ctx).Info("Config updated successfully")
cfgCopy := newFsConfig
w.lastConfig = &cfgCopy
}
} else {
log.G(ctx).Warn("snapshotter does not support config update")
}
}
4 changes: 4 additions & 0 deletions cmd/containerd-stargz-grpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ func main() {
}
}

if err := WatchConfig(ctx, *configPath, rs, &config.Config.Config); err != nil {
log.G(ctx).WithError(err).Warn("failed to start config watcher")
}

cleanup, err := serve(ctx, rpc, *address, rs, config)
if err != nil {
log.G(ctx).WithError(err).Fatalf("failed to serve snapshotter")
Expand Down
2 changes: 1 addition & 1 deletion cmd/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/containerd/stargz-snapshotter/ipfs v0.18.1
github.com/coreos/go-systemd/v22 v22.6.0
github.com/docker/go-metrics v0.0.1
github.com/fsnotify/fsnotify v1.9.0
github.com/goccy/go-json v0.10.5
github.com/klauspost/compress v1.18.2
github.com/opencontainers/go-digest v1.0.0
Expand Down Expand Up @@ -56,7 +57,6 @@ require (
github.com/docker/go-units v0.5.0 // indirect
github.com/emicklei/go-restful/v3 v3.13.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
github.com/fxamacker/cbor/v2 v2.9.0 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand Down
8 changes: 8 additions & 0 deletions docs/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,14 @@ insecure = true

The config file can be passed to stargz snapshotter using `containerd-stargz-grpc`'s `--config` option.

## Configuration hot reload
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The benefit of this feature compared to the FUSE manager (which already supports safe restarts) should be documented.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


[Fs configurations](/fs/config/config.go) supports hot reloading. When the configuration file is modified, the snapshotter detects the change and applies the new configuration without restarting the process.
This enables instant performance tuning (e.g. concurrency, timeouts) without I/O suspension, and allows updating FUSE parameters that cannot be changed by simply restarting the main process when FUSE manager is enabled.

Note that other configurations (e.g. `proxy_plugins`, `fuse_manager`, `resolver`, `mount_options`) require a restart to take effect.
Also, some specific fields in `[stargz]` section (e.g. `no_prometheus`) do not support hot reloading and changes to them will be ignored until restart.

## Make your remote snapshotter

It isn't difficult for you to implement your remote snapshotter using [our general snapshotter package](/snapshot) without considering the protocol between that and containerd.
Expand Down
77 changes: 58 additions & 19 deletions fs/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,18 @@ var (
metricsCtr *layermetrics.Controller
)

func applyDefaults(cfg *config.Config) {
if cfg.MaxConcurrency == 0 {
cfg.MaxConcurrency = defaultMaxConcurrency
}
if cfg.AttrTimeout == 0 {
cfg.AttrTimeout = int64(defaultFuseTimeout.Seconds())
}
if cfg.EntryTimeout == 0 {
cfg.EntryTimeout = int64(defaultFuseTimeout.Seconds())
}
}

type Option func(*options)

type options struct {
Expand Down Expand Up @@ -132,20 +144,11 @@ func NewFilesystem(root string, cfg config.Config, opts ...Option) (_ snapshot.F
for _, o := range opts {
o(&fsOpts)
}
maxConcurrency := cfg.MaxConcurrency
if maxConcurrency == 0 {
maxConcurrency = defaultMaxConcurrency
}
applyDefaults(&cfg)

maxConcurrency := cfg.MaxConcurrency
attrTimeout := time.Duration(cfg.AttrTimeout) * time.Second
if attrTimeout == 0 {
attrTimeout = defaultFuseTimeout
}

entryTimeout := time.Duration(cfg.EntryTimeout) * time.Second
if entryTimeout == 0 {
entryTimeout = defaultFuseTimeout
}

metadataStore := fsOpts.metadataStore
if metadataStore == nil {
Expand Down Expand Up @@ -205,6 +208,7 @@ type filesystem struct {
debug bool
layer map[string]layer.Layer
layerMu sync.Mutex
configMu sync.RWMutex
backgroundTaskManager *task.BackgroundTaskManager
allowNoVerification bool
disableVerification bool
Expand All @@ -214,6 +218,23 @@ type filesystem struct {
entryTimeout time.Duration
}

func (fs *filesystem) UpdateConfig(ctx context.Context, cfg config.Config) error {
applyDefaults(&cfg)
fs.configMu.Lock()
fs.prefetchSize = cfg.PrefetchSize
fs.noprefetch = cfg.NoPrefetch
fs.noBackgroundFetch = cfg.NoBackgroundFetch
fs.debug = cfg.Debug
fs.allowNoVerification = cfg.AllowNoVerification
fs.disableVerification = cfg.DisableVerification
fs.attrTimeout = time.Duration(cfg.AttrTimeout) * time.Second
fs.entryTimeout = time.Duration(cfg.EntryTimeout) * time.Second
fs.configMu.Unlock()

fs.resolver.UpdateConfig(ctx, cfg)
return nil
}

func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[string]string) (retErr error) {
// Setting the start time to measure the Mount operation duration.
start := time.Now()
Expand All @@ -233,7 +254,9 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
return fmt.Errorf("source must be passed")
}

fs.configMu.RLock()
defaultPrefetchSize := fs.prefetchSize
fs.configMu.RUnlock()
if psStr, ok := labels[config.TargetPrefetchSizeLabel]; ok {
if ps, err := strconv.ParseInt(psStr, 10, 64); err == nil {
defaultPrefetchSize = ps
Expand Down Expand Up @@ -297,7 +320,15 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
}()

// Verify layer's content
if fs.disableVerification {
fs.configMu.RLock()
disableVerification := fs.disableVerification
allowNoVerification := fs.allowNoVerification
attrTimeout := fs.attrTimeout
entryTimeout := fs.entryTimeout
debug := fs.debug
fs.configMu.RUnlock()

if disableVerification {
// Skip if verification is disabled completely
l.SkipVerify()
log.G(ctx).Infof("Verification forcefully skipped")
Expand All @@ -313,7 +344,7 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
return fmt.Errorf("invalid stargz layer: %w", err)
}
log.G(ctx).Debugf("verified")
} else if _, ok := labels[config.TargetSkipVerifyLabel]; ok && fs.allowNoVerification {
} else if _, ok := labels[config.TargetSkipVerifyLabel]; ok && allowNoVerification {
// If unverified layer is allowed, use it with warning.
// This mode is for legacy stargz archives which don't contain digests
// necessary for layer verification.
Expand Down Expand Up @@ -342,14 +373,14 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s
// mount the node to the specified mountpoint
// TODO: bind mount the state directory as a read-only fs on snapshotter's side
rawFS := fusefs.NewNodeFS(node, &fusefs.Options{
AttrTimeout: &fs.attrTimeout,
EntryTimeout: &fs.entryTimeout,
AttrTimeout: &attrTimeout,
EntryTimeout: &entryTimeout,
NullPermissions: true,
})
mountOpts := &fuse.MountOptions{
AllowOther: true, // allow users other than root&mounter to access fs
FsName: "stargz", // name this filesystem as "stargz"
Debug: fs.debug,
Debug: debug,
DirectMount: true,
}
server, err := fuse.NewServer(rawFS, mountpoint, mountOpts)
Expand Down Expand Up @@ -391,7 +422,10 @@ func (fs *filesystem) Check(ctx context.Context, mountpoint string, labels map[s
}

// Wait for prefetch compeletion
if !fs.noprefetch {
fs.configMu.RLock()
noprefetch := fs.noprefetch
fs.configMu.RUnlock()
if !noprefetch {
if err := l.WaitForPrefetchCompletion(); err != nil {
log.G(ctx).WithError(err).Warn("failed to sync with prefetch completion")
}
Expand Down Expand Up @@ -473,12 +507,17 @@ func unmount(target string, flags int) error {

func (fs *filesystem) prefetch(ctx context.Context, l layer.Layer, defaultPrefetchSize int64, start time.Time) {
// Prefetch a layer. The first Check() for this layer waits for the prefetch completion.
if !fs.noprefetch {
fs.configMu.RLock()
noprefetch := fs.noprefetch
noBackgroundFetch := fs.noBackgroundFetch
fs.configMu.RUnlock()

if !noprefetch {
go l.Prefetch(defaultPrefetchSize)
}

// Fetch whole layer aggressively in background.
if !fs.noBackgroundFetch {
if !noBackgroundFetch {
go func() {
if err := l.BackgroundFetch(); err == nil {
// write log record for the latency between mount start and last on demand fetch
Expand Down
Loading
Loading