diff --git a/cmd/containerd-stargz-grpc/cfgwatcher.go b/cmd/containerd-stargz-grpc/cfgwatcher.go new file mode 100644 index 000000000..ed3250b99 --- /dev/null +++ b/cmd/containerd-stargz-grpc/cfgwatcher.go @@ -0,0 +1,152 @@ +/* + Copyright The containerd Authors. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package main + +import ( + "context" + "path/filepath" + "reflect" + "sync" + "time" + + "github.com/containerd/containerd/v2/core/snapshots" + "github.com/containerd/log" + fsconfig "github.com/containerd/stargz-snapshotter/fs/config" + "github.com/fsnotify/fsnotify" + "github.com/pelletier/go-toml" +) + +// WatchConfig monitors the specified configuration file for changes. +// It triggers the config reload when a change is detected. +func WatchConfig( + ctx context.Context, + filePath string, + rs snapshots.Snapshotter, + initialConfig *fsconfig.Config, +) error { + absFilePath, err := filepath.Abs(filePath) + if err != nil { + return err + } + + watchDir := filepath.Dir(absFilePath) + + watcher, err := fsnotify.NewWatcher() + if err != nil { + return err + } + + if err := watcher.Add(watchDir); err != nil { + watcher.Close() + return err + } + + log.G(ctx).Infof("started monitoring config file: %s", absFilePath) + + cw := &configWatcher{ + lastConfig: initialConfig, + } + + go func() { + defer watcher.Close() + + var ( + debounceTimer *time.Timer + mu sync.Mutex + ) + + for { + select { + case event, ok := <-watcher.Events: + if !ok { + return + } + + if event.Name == absFilePath { + // Trigger on Write, Create, Rename, or Chmod events + // such as vim, nano, etc. + if event.Has(fsnotify.Write) || event.Has(fsnotify.Create) || + event.Has(fsnotify.Rename) || event.Has(fsnotify.Chmod) { + + mu.Lock() + if debounceTimer != nil { + debounceTimer.Stop() + } + // Debounce changes with a 50ms delay + debounceTimer = time.AfterFunc(50*time.Millisecond, func() { + log.G(ctx).Infof("config file modification detected: %s", absFilePath) + cw.reload(ctx, absFilePath, rs) + }) + mu.Unlock() + } + } + + case err, ok := <-watcher.Errors: + if !ok { + return + } + log.G(ctx).WithError(err).Error("config watcher encountered an error") + } + } + }() + + return nil +} + +type configWatcher struct { + lastConfig *fsconfig.Config + mu sync.Mutex +} + +func (w *configWatcher) reload(ctx context.Context, configPath string, rs snapshots.Snapshotter) { + log.G(ctx).Infof("Config file %s changed, reloading...", configPath) + var newConfig snapshotterConfig + tree, err := toml.LoadFile(configPath) + if err != nil { + log.G(ctx).WithError(err).Error("failed to reload config file") + return + } + if err := tree.Unmarshal(&newConfig); err != nil { + log.G(ctx).WithError(err).Error("failed to unmarshal config") + return + } + + newFsConfig := newConfig.Config.Config + + w.mu.Lock() + defer w.mu.Unlock() + + if w.lastConfig != nil && reflect.DeepEqual(*w.lastConfig, newFsConfig) { + log.G(ctx).Info("Config content unchanged, skipping update") + return + } + + if updater, ok := rs.(interface { + UpdateConfig(context.Context, fsconfig.Config) error + }); ok { + log.G(ctx).Debugf("applying new config: %+v", newFsConfig) + if err := updater.UpdateConfig(ctx, newFsConfig); err != nil { + log.G(ctx).WithError(err).Error("failed to update config") + } else { + log.G(ctx).Info("Config updated successfully") + cfgCopy := newFsConfig + w.lastConfig = &cfgCopy + } + } else { + log.G(ctx).Warn("snapshotter does not support config update") + } +} diff --git a/cmd/containerd-stargz-grpc/main.go b/cmd/containerd-stargz-grpc/main.go index 436dca938..b4cd23b23 100644 --- a/cmd/containerd-stargz-grpc/main.go +++ b/cmd/containerd-stargz-grpc/main.go @@ -260,6 +260,10 @@ func main() { } } + if err := WatchConfig(ctx, *configPath, rs, &config.Config.Config); err != nil { + log.G(ctx).WithError(err).Warn("failed to start config watcher") + } + cleanup, err := serve(ctx, rpc, *address, rs, config) if err != nil { log.G(ctx).WithError(err).Fatalf("failed to serve snapshotter") diff --git a/cmd/go.mod b/cmd/go.mod index c2e689e6a..7ee6ef1f0 100644 --- a/cmd/go.mod +++ b/cmd/go.mod @@ -14,6 +14,7 @@ require ( github.com/containerd/stargz-snapshotter/ipfs v0.18.1 github.com/coreos/go-systemd/v22 v22.6.0 github.com/docker/go-metrics v0.0.1 + github.com/fsnotify/fsnotify v1.9.0 github.com/goccy/go-json v0.10.5 github.com/klauspost/compress v1.18.2 github.com/opencontainers/go-digest v1.0.0 @@ -56,7 +57,6 @@ require ( github.com/docker/go-units v0.5.0 // indirect github.com/emicklei/go-restful/v3 v3.13.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect - github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/fxamacker/cbor/v2 v2.9.0 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect diff --git a/docs/overview.md b/docs/overview.md index 60b051b44..ed3d20162 100644 --- a/docs/overview.md +++ b/docs/overview.md @@ -260,6 +260,14 @@ insecure = true The config file can be passed to stargz snapshotter using `containerd-stargz-grpc`'s `--config` option. +## Configuration hot reload + +[Fs configurations](/fs/config/config.go) supports hot reloading. When the configuration file is modified, the snapshotter detects the change and applies the new configuration without restarting the process. +This enables instant performance tuning (e.g. concurrency, timeouts) without I/O suspension, and allows updating FUSE parameters that cannot be changed by simply restarting the main process when FUSE manager is enabled. + +Note that other configurations (e.g. `proxy_plugins`, `fuse_manager`, `resolver`, `mount_options`) require a restart to take effect. +Also, some specific fields in `[stargz]` section (e.g. `no_prometheus`) do not support hot reloading and changes to them will be ignored until restart. + ## Make your remote snapshotter It isn't difficult for you to implement your remote snapshotter using [our general snapshotter package](/snapshot) without considering the protocol between that and containerd. diff --git a/fs/fs.go b/fs/fs.go index c55d57002..f4f5142f4 100644 --- a/fs/fs.go +++ b/fs/fs.go @@ -77,6 +77,18 @@ var ( metricsCtr *layermetrics.Controller ) +func applyDefaults(cfg *config.Config) { + if cfg.MaxConcurrency == 0 { + cfg.MaxConcurrency = defaultMaxConcurrency + } + if cfg.AttrTimeout == 0 { + cfg.AttrTimeout = int64(defaultFuseTimeout.Seconds()) + } + if cfg.EntryTimeout == 0 { + cfg.EntryTimeout = int64(defaultFuseTimeout.Seconds()) + } +} + type Option func(*options) type options struct { @@ -132,20 +144,11 @@ func NewFilesystem(root string, cfg config.Config, opts ...Option) (_ snapshot.F for _, o := range opts { o(&fsOpts) } - maxConcurrency := cfg.MaxConcurrency - if maxConcurrency == 0 { - maxConcurrency = defaultMaxConcurrency - } + applyDefaults(&cfg) + maxConcurrency := cfg.MaxConcurrency attrTimeout := time.Duration(cfg.AttrTimeout) * time.Second - if attrTimeout == 0 { - attrTimeout = defaultFuseTimeout - } - entryTimeout := time.Duration(cfg.EntryTimeout) * time.Second - if entryTimeout == 0 { - entryTimeout = defaultFuseTimeout - } metadataStore := fsOpts.metadataStore if metadataStore == nil { @@ -205,6 +208,7 @@ type filesystem struct { debug bool layer map[string]layer.Layer layerMu sync.Mutex + configMu sync.RWMutex backgroundTaskManager *task.BackgroundTaskManager allowNoVerification bool disableVerification bool @@ -214,6 +218,23 @@ type filesystem struct { entryTimeout time.Duration } +func (fs *filesystem) UpdateConfig(ctx context.Context, cfg config.Config) error { + applyDefaults(&cfg) + fs.configMu.Lock() + fs.prefetchSize = cfg.PrefetchSize + fs.noprefetch = cfg.NoPrefetch + fs.noBackgroundFetch = cfg.NoBackgroundFetch + fs.debug = cfg.Debug + fs.allowNoVerification = cfg.AllowNoVerification + fs.disableVerification = cfg.DisableVerification + fs.attrTimeout = time.Duration(cfg.AttrTimeout) * time.Second + fs.entryTimeout = time.Duration(cfg.EntryTimeout) * time.Second + fs.configMu.Unlock() + + fs.resolver.UpdateConfig(ctx, cfg) + return nil +} + func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[string]string) (retErr error) { // Setting the start time to measure the Mount operation duration. start := time.Now() @@ -233,7 +254,9 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s return fmt.Errorf("source must be passed") } + fs.configMu.RLock() defaultPrefetchSize := fs.prefetchSize + fs.configMu.RUnlock() if psStr, ok := labels[config.TargetPrefetchSizeLabel]; ok { if ps, err := strconv.ParseInt(psStr, 10, 64); err == nil { defaultPrefetchSize = ps @@ -297,7 +320,15 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s }() // Verify layer's content - if fs.disableVerification { + fs.configMu.RLock() + disableVerification := fs.disableVerification + allowNoVerification := fs.allowNoVerification + attrTimeout := fs.attrTimeout + entryTimeout := fs.entryTimeout + debug := fs.debug + fs.configMu.RUnlock() + + if disableVerification { // Skip if verification is disabled completely l.SkipVerify() log.G(ctx).Infof("Verification forcefully skipped") @@ -313,7 +344,7 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s return fmt.Errorf("invalid stargz layer: %w", err) } log.G(ctx).Debugf("verified") - } else if _, ok := labels[config.TargetSkipVerifyLabel]; ok && fs.allowNoVerification { + } else if _, ok := labels[config.TargetSkipVerifyLabel]; ok && allowNoVerification { // If unverified layer is allowed, use it with warning. // This mode is for legacy stargz archives which don't contain digests // necessary for layer verification. @@ -342,14 +373,14 @@ func (fs *filesystem) Mount(ctx context.Context, mountpoint string, labels map[s // mount the node to the specified mountpoint // TODO: bind mount the state directory as a read-only fs on snapshotter's side rawFS := fusefs.NewNodeFS(node, &fusefs.Options{ - AttrTimeout: &fs.attrTimeout, - EntryTimeout: &fs.entryTimeout, + AttrTimeout: &attrTimeout, + EntryTimeout: &entryTimeout, NullPermissions: true, }) mountOpts := &fuse.MountOptions{ AllowOther: true, // allow users other than root&mounter to access fs FsName: "stargz", // name this filesystem as "stargz" - Debug: fs.debug, + Debug: debug, DirectMount: true, } server, err := fuse.NewServer(rawFS, mountpoint, mountOpts) @@ -391,7 +422,10 @@ func (fs *filesystem) Check(ctx context.Context, mountpoint string, labels map[s } // Wait for prefetch compeletion - if !fs.noprefetch { + fs.configMu.RLock() + noprefetch := fs.noprefetch + fs.configMu.RUnlock() + if !noprefetch { if err := l.WaitForPrefetchCompletion(); err != nil { log.G(ctx).WithError(err).Warn("failed to sync with prefetch completion") } @@ -473,12 +507,17 @@ func unmount(target string, flags int) error { func (fs *filesystem) prefetch(ctx context.Context, l layer.Layer, defaultPrefetchSize int64, start time.Time) { // Prefetch a layer. The first Check() for this layer waits for the prefetch completion. - if !fs.noprefetch { + fs.configMu.RLock() + noprefetch := fs.noprefetch + noBackgroundFetch := fs.noBackgroundFetch + fs.configMu.RUnlock() + + if !noprefetch { go l.Prefetch(defaultPrefetchSize) } // Fetch whole layer aggressively in background. - if !fs.noBackgroundFetch { + if !noBackgroundFetch { go func() { if err := l.BackgroundFetch(); err == nil { // write log record for the latency between mount start and last on demand fetch diff --git a/fs/layer/layer.go b/fs/layer/layer.go index 0397f2e9e..333fcc76e 100644 --- a/fs/layer/layer.go +++ b/fs/layer/layer.go @@ -59,6 +59,24 @@ const ( memoryCacheType = "memory" ) +func applyDefaults(cfg *config.Config) { + if cfg.ResolveResultEntryTTLSec == 0 { + cfg.ResolveResultEntryTTLSec = defaultResolveResultEntryTTLSec + } + if cfg.PrefetchTimeoutSec == 0 { + cfg.PrefetchTimeoutSec = defaultPrefetchTimeoutSec + } +} + +func applyDirectoryCacheDefaults(dcc *config.DirectoryCacheConfig) { + if dcc.MaxLRUCacheEntry == 0 { + dcc.MaxLRUCacheEntry = defaultMaxLRUCacheEntry + } + if dcc.MaxCacheFds == 0 { + dcc.MaxCacheFds = defaultMaxCacheFds + } +} + // passThroughConfig contains configuration for FUSE passthrough mode type passThroughConfig struct { // enable indicates whether to enable FUSE passthrough mode @@ -138,6 +156,7 @@ type Resolver struct { backgroundTaskManager *task.BackgroundTaskManager resolveLock *namedmutex.NamedMutex config config.Config + configMu sync.RWMutex metadataStore metadata.Store overlayOpaqueType OverlayOpaqueType additionalDecompressors func(context.Context, source.RegistryHosts, reference.Spec, ocispec.Descriptor) []metadata.Decompressor @@ -145,14 +164,9 @@ type Resolver struct { // NewResolver returns a new layer resolver. func NewResolver(root string, backgroundTaskManager *task.BackgroundTaskManager, cfg config.Config, resolveHandlers map[string]remote.Handler, metadataStore metadata.Store, overlayOpaqueType OverlayOpaqueType, additionalDecompressors func(context.Context, source.RegistryHosts, reference.Spec, ocispec.Descriptor) []metadata.Decompressor) (*Resolver, error) { + applyDefaults(&cfg) resolveResultEntryTTL := time.Duration(cfg.ResolveResultEntryTTLSec) * time.Second - if resolveResultEntryTTL == 0 { - resolveResultEntryTTL = defaultResolveResultEntryTTLSec * time.Second - } prefetchTimeout := time.Duration(cfg.PrefetchTimeoutSec) * time.Second - if prefetchTimeout == 0 { - prefetchTimeout = defaultPrefetchTimeoutSec * time.Second - } // layerCache caches resolved layers for future use. This is useful in a use-case where // the filesystem resolves and caches all layers in an image (not only queried one) in parallel, @@ -196,20 +210,24 @@ func NewResolver(root string, backgroundTaskManager *task.BackgroundTaskManager, }, nil } +func (r *Resolver) UpdateConfig(ctx context.Context, cfg config.Config) { + applyDefaults(&cfg) + r.configMu.Lock() + r.config = cfg + r.prefetchTimeout = time.Duration(cfg.PrefetchTimeoutSec) * time.Second + r.configMu.Unlock() + r.resolver.UpdateConfig(ctx, cfg.BlobConfig) +} + func newCache(root string, cacheType string, cfg config.Config) (cache.BlobCache, error) { if cacheType == memoryCacheType { return cache.NewMemoryCache(), nil } dcc := cfg.DirectoryCacheConfig + applyDirectoryCacheDefaults(&dcc) maxDataEntry := dcc.MaxLRUCacheEntry - if maxDataEntry == 0 { - maxDataEntry = defaultMaxLRUCacheEntry - } maxFdEntry := dcc.MaxCacheFds - if maxFdEntry == 0 { - maxFdEntry = defaultMaxCacheFds - } bufPool := &sync.Pool{ New: func() interface{} { @@ -285,7 +303,11 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs } }() - fsCache, err := newCache(filepath.Join(r.rootDir, "fscache"), r.config.FSCacheType, r.config) + r.configMu.RLock() + cfg := r.config + r.configMu.RUnlock() + + fsCache, err := newCache(filepath.Join(r.rootDir, "fscache"), cfg.FSCacheType, cfg) if err != nil { return nil, fmt.Errorf("failed to create fs cache: %w", err) } @@ -333,9 +355,9 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs // Combine layer information together and cache it. l := newLayer(r, desc, blobR, vr, passThroughConfig{ - enable: r.config.PassThrough, - mergeBufferSize: r.config.MergeBufferSize, - mergeWorkerCount: r.config.MergeWorkerCount, + enable: cfg.PassThrough, + mergeBufferSize: cfg.MergeBufferSize, + mergeWorkerCount: cfg.MergeWorkerCount, }) r.layerCacheMu.Lock() cachedL, done2, added := r.layerCache.Add(name, l) @@ -367,7 +389,11 @@ func (r *Resolver) resolveBlob(ctx context.Context, hosts source.RegistryHosts, r.blobCacheMu.Unlock() } - httpCache, err := newCache(filepath.Join(r.rootDir, "httpcache"), r.config.HTTPCacheType, r.config) + r.configMu.RLock() + cfg := r.config + r.configMu.RUnlock() + + httpCache, err := newCache(filepath.Join(r.rootDir, "httpcache"), cfg.HTTPCacheType, cfg) if err != nil { return nil, fmt.Errorf("failed to create http cache: %w", err) } diff --git a/fs/remote/resolver.go b/fs/remote/resolver.go index 392cda087..a93296db4 100644 --- a/fs/remote/resolver.go +++ b/fs/remote/resolver.go @@ -62,7 +62,7 @@ const ( defaultMaxWaitMSec = 300000 ) -func NewResolver(cfg config.BlobConfig, handlers map[string]Handler) *Resolver { +func applyDefaults(cfg *config.BlobConfig) { if cfg.ChunkSize == 0 { // zero means "use default chunk size" cfg.ChunkSize = defaultChunkSize } @@ -84,7 +84,10 @@ func NewResolver(cfg config.BlobConfig, handlers map[string]Handler) *Resolver { if cfg.MaxWaitMSec == 0 { cfg.MaxWaitMSec = defaultMaxWaitMSec } +} +func NewResolver(cfg config.BlobConfig, handlers map[string]Handler) *Resolver { + applyDefaults(&cfg) return &Resolver{ blobConfig: cfg, handlers: handlers, @@ -94,6 +97,14 @@ func NewResolver(cfg config.BlobConfig, handlers map[string]Handler) *Resolver { type Resolver struct { blobConfig config.BlobConfig handlers map[string]Handler + mu sync.RWMutex +} + +func (r *Resolver) UpdateConfig(ctx context.Context, cfg config.BlobConfig) { + applyDefaults(&cfg) + r.mu.Lock() + defer r.mu.Unlock() + r.blobConfig = cfg } type fetcher interface { @@ -107,7 +118,9 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs if err != nil { return nil, err } - blobConfig := &r.blobConfig + r.mu.RLock() + blobConfig := r.blobConfig + r.mu.RUnlock() return makeBlob(f, size, blobConfig.ChunkSize, @@ -120,7 +133,9 @@ func (r *Resolver) Resolve(ctx context.Context, hosts source.RegistryHosts, refs } func (r *Resolver) resolveFetcher(ctx context.Context, hosts source.RegistryHosts, refspec reference.Spec, desc ocispec.Descriptor) (f fetcher, size int64, err error) { - blobConfig := &r.blobConfig + r.mu.RLock() + blobConfig := r.blobConfig + r.mu.RUnlock() fc := &fetcherConfig{ hosts: hosts, refspec: refspec, diff --git a/snapshot/snapshot.go b/snapshot/snapshot.go index bbd73ad2c..83f6f47b8 100644 --- a/snapshot/snapshot.go +++ b/snapshot/snapshot.go @@ -31,6 +31,7 @@ import ( "github.com/containerd/continuity/fs" "github.com/containerd/errdefs" "github.com/containerd/log" + "github.com/containerd/stargz-snapshotter/fs/config" "github.com/moby/sys/mountinfo" "golang.org/x/sync/errgroup" ) @@ -166,6 +167,17 @@ func NewSnapshotter(ctx context.Context, root string, targetFs FileSystem, opts return o, nil } +type configUpdater interface { + UpdateConfig(ctx context.Context, config config.Config) error +} + +func (o *snapshotter) UpdateConfig(ctx context.Context, config config.Config) error { + if updater, ok := o.fs.(configUpdater); ok { + return updater.UpdateConfig(ctx, config) + } + return nil +} + // Stat returns the info for an active or committed snapshot by name or // key. //