Skip to content

Commit 4f25443

Browse files
committed
feat(local): add sync writes for distributed filesystem coherence
Add --local-sync-writes option to sync files and directories after writes, ensuring immediate visibility to other clients on CephFS, GlusterFS, and Lustre. Directory syncs are batched and deduplicated for efficiency. Linux only. Signed-off-by: Anagh Kumar Baranwal <6824881+darthShadow@users.noreply.github.com>
1 parent 398254f commit 4f25443

File tree

3 files changed

+609
-21
lines changed

3 files changed

+609
-21
lines changed

backend/local/local.go

Lines changed: 143 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,47 @@ cause deadlocks (especially on Ceph).`,
213213
Default: false,
214214
Advanced: true,
215215
},
216+
{
217+
Name: "sync_writes",
218+
Help: `Sync files and directories after writes for distributed filesystem coherence.
219+
220+
This option is primarily designed for CephFS on Linux. It may also help with
221+
other distributed filesystems like GlusterFS or Lustre, but CephFS is the
222+
main target.
223+
224+
Enable this when multiple clients need to see file changes immediately.
225+
Without this, other clients may experience significant delays (determined
226+
by the dirty page writeback interval) when listing or accessing newly
227+
created/modified files.
228+
229+
File data is synced immediately after each write. Directory metadata syncs
230+
are batched (see --local-sync-writes-interval) so other clients may not see
231+
new files in directory listings for up to the batch interval after the file
232+
write completes.
233+
234+
This option is only effective on Linux where distributed filesystem
235+
kernel clients are available. It will be ignored on other platforms.`,
236+
Default: false,
237+
Advanced: true,
238+
},
239+
{
240+
Name: "sync_writes_interval",
241+
Help: `Interval for batching directory syncs when using --local-sync-writes.
242+
243+
Directory syncs are batched and deduplicated for efficiency. This sets
244+
the maximum delay before a directory sync is performed after a file write.
245+
Other clients may not see new files in directory listings until this
246+
interval elapses.
247+
248+
Set to 0 for immediate (unbatched) directory syncs if you need other
249+
clients to see files immediately after write, at the cost of reduced
250+
throughput for bulk operations.
251+
252+
Default is 100ms which provides a good balance between responsiveness
253+
and efficiency.`,
254+
Default: fs.Duration(100 * time.Millisecond),
255+
Advanced: true,
256+
},
216257
{
217258
Name: "case_sensitive",
218259
Help: `Force the filesystem to report itself as case sensitive.
@@ -345,24 +386,26 @@ only useful for reading.
345386

346387
// Options defines the configuration for this backend
347388
type Options struct {
348-
FollowSymlinks bool `config:"copy_links"`
349-
TranslateSymlinks bool `config:"links"`
350-
SkipSymlinks bool `config:"skip_links"`
351-
SkipSpecials bool `config:"skip_specials"`
352-
UTFNorm bool `config:"unicode_normalization"`
353-
NoCheckUpdated bool `config:"no_check_updated"`
354-
NoUNC bool `config:"nounc"`
355-
OneFileSystem bool `config:"one_file_system"`
356-
CaseSensitive bool `config:"case_sensitive"`
357-
CaseInsensitive bool `config:"case_insensitive"`
358-
NoPreAllocate bool `config:"no_preallocate"`
359-
NoSparse bool `config:"no_sparse"`
360-
NoSetModTime bool `config:"no_set_modtime"`
361-
TimeType timeType `config:"time_type"`
362-
Hashes fs.CommaSepList `config:"hashes"`
363-
Enc encoder.MultiEncoder `config:"encoding"`
364-
NoClone bool `config:"no_clone"`
365-
SkipRecent bool `config:"skip_recent"`
389+
FollowSymlinks bool `config:"copy_links"`
390+
TranslateSymlinks bool `config:"links"`
391+
SkipSymlinks bool `config:"skip_links"`
392+
SkipSpecials bool `config:"skip_specials"`
393+
UTFNorm bool `config:"unicode_normalization"`
394+
NoCheckUpdated bool `config:"no_check_updated"`
395+
NoUNC bool `config:"nounc"`
396+
OneFileSystem bool `config:"one_file_system"`
397+
CaseSensitive bool `config:"case_sensitive"`
398+
CaseInsensitive bool `config:"case_insensitive"`
399+
NoPreAllocate bool `config:"no_preallocate"`
400+
NoSparse bool `config:"no_sparse"`
401+
NoSetModTime bool `config:"no_set_modtime"`
402+
TimeType timeType `config:"time_type"`
403+
Hashes fs.CommaSepList `config:"hashes"`
404+
Enc encoder.MultiEncoder `config:"encoding"`
405+
NoClone bool `config:"no_clone"`
406+
SkipRecent bool `config:"skip_recent"`
407+
SyncWrites bool `config:"sync_writes"`
408+
SyncWritesInterval fs.Duration `config:"sync_writes_interval"`
366409
}
367410

368411
// Fs represents a local filesystem rooted at root
@@ -379,8 +422,9 @@ type Fs struct {
379422
xattrSupported atomic.Int32 // whether xattrs are supported
380423

381424
// do os.Lstat or os.Stat
382-
lstat func(name string) (os.FileInfo, error)
383-
objectMetaMu sync.RWMutex // global lock for Object metadata
425+
lstat func(name string) (os.FileInfo, error)
426+
objectMetaMu sync.RWMutex // global lock for Object metadata
427+
dirSyncManager *dirSyncManager // batches directory syncs; nil if SyncWrites disabled or unsupported
384428
}
385429

386430
// Object represents a local filesystem object
@@ -461,6 +505,16 @@ func NewFs(ctx context.Context, name, root string, m configmap.Mapper) (fs.Fs, e
461505
f.features.Copy = nil
462506
}
463507

508+
// Initialize sync writes support (Linux only)
509+
if f.opt.SyncWrites {
510+
if syncWritesSupported() {
511+
f.dirSyncManager = newDirSyncManager(time.Duration(f.opt.SyncWritesInterval), f.root)
512+
} else {
513+
fs.Infof(nil, "local: --local-sync-writes is only supported on Linux, ignoring")
514+
f.opt.SyncWrites = false
515+
}
516+
}
517+
464518
// Check to see if this points to a file
465519
fi, err := f.lstat(f.root)
466520
if err == nil {
@@ -675,6 +729,11 @@ func (f *Fs) mkParentDir(ctx context.Context, remote string, metadata fs.Metadat
675729
if err != nil {
676730
return fmt.Errorf("mkParentDir: %s: failed to make directory: %s: %w", remote, localPath, err)
677731
}
732+
// Queue parent directory sync for distributed FS coherence.
733+
// This makes newly created directories visible to other clients.
734+
if f.dirSyncManager != nil {
735+
f.dirSyncManager.Queue(filepath.Dir(localDirPath))
736+
}
678737

679738
if metadata != nil && remoteDir != "" && remoteDir != "." && remoteDir != ".." && remoteDir != "/" {
680739
fi, err := f.lstat(localDirPath)
@@ -703,6 +762,11 @@ func (f *Fs) mkdir(ctx context.Context, dir string, metadata fs.Metadata) (os.Fi
703762
if err != nil {
704763
return nil, err
705764
}
765+
// Queue parent directory sync for distributed FS coherence.
766+
// mkParentDir already synced ancestors; we just need the immediate parent.
767+
if f.dirSyncManager != nil {
768+
f.dirSyncManager.Queue(filepath.Dir(localPath))
769+
}
706770
fi, err = f.lstat(localPath)
707771
if err != nil {
708772
return nil, fmt.Errorf("mkdir: %s: failed to re-read info: %s: %w", dir, localPath, err)
@@ -783,6 +847,10 @@ func (f *Fs) Rmdir(ctx context.Context, dir string) error {
783847
err = os.Remove(localPath)
784848
}
785849
}
850+
// Queue directory sync so removal is visible to other clients
851+
if err == nil && f.dirSyncManager != nil {
852+
f.dirSyncManager.Queue(filepath.Dir(localPath))
853+
}
786854
return err
787855
}
788856

@@ -910,6 +978,17 @@ func (f *Fs) Move(ctx context.Context, src fs.Object, remote string) (fs.Object,
910978
return nil, fs.ErrorCantMove
911979
}
912980

981+
// From this point on, the rename succeeded. Ensure directory sync happens
982+
// even if subsequent operations fail, so other clients can see the move.
983+
defer func() {
984+
if f.dirSyncManager != nil {
985+
f.dirSyncManager.QueueMultiple(
986+
filepath.Dir(srcObj.path),
987+
filepath.Dir(dstObj.path),
988+
)
989+
}
990+
}()
991+
913992
// Set metadata if --metadata is in use
914993
err = dstObj.writeMetadata(meta)
915994
if err != nil {
@@ -969,6 +1048,15 @@ func (f *Fs) DirMove(ctx context.Context, src fs.Fs, srcRemote, dstRemote string
9691048
fs.Debugf(src, "Can't move dir: %v: trying copy", err)
9701049
return fs.ErrorCantDirMove
9711050
}
1051+
1052+
// Queue directory syncs for distributed FS coherence (both src and dst parents)
1053+
if f.dirSyncManager != nil {
1054+
f.dirSyncManager.QueueMultiple(
1055+
filepath.Dir(srcPath),
1056+
filepath.Dir(dstPath),
1057+
)
1058+
}
1059+
9721060
return nil
9731061
}
9741062

@@ -1475,6 +1563,15 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
14751563
}
14761564

14771565
_, err = io.Copy(out, in)
1566+
1567+
// Sync file for distributed FS coherence (releases locks).
1568+
// Only sync on success - failed copies will be removed anyway.
1569+
if err == nil {
1570+
if f, ok := out.(*os.File); ok && o.fs.opt.SyncWrites {
1571+
syncFile(f, o.path)
1572+
}
1573+
}
1574+
14781575
closeErr := out.Close()
14791576
if err == nil {
14801577
err = closeErr
@@ -1504,6 +1601,10 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
15041601
if removeErr := os.Remove(o.path); removeErr != nil {
15051602
fs.Errorf(o, "Failed to remove partially written file: %v", removeErr)
15061603
}
1604+
// Queue directory sync so removal is visible to other clients
1605+
if o.fs.dirSyncManager != nil {
1606+
o.fs.dirSyncManager.Queue(filepath.Dir(o.path))
1607+
}
15071608
return err
15081609
}
15091610

@@ -1514,6 +1615,14 @@ func (o *Object) Update(ctx context.Context, in io.Reader, src fs.ObjectInfo, op
15141615
o.fs.objectMetaMu.Unlock()
15151616
}
15161617

1618+
// From this point on, the file exists. Ensure directory sync happens
1619+
// even if subsequent operations fail, so other clients can see the file.
1620+
defer func() {
1621+
if o.fs.dirSyncManager != nil {
1622+
o.fs.dirSyncManager.Queue(filepath.Dir(o.path))
1623+
}
1624+
}()
1625+
15171626
// Set the mtime
15181627
err = o.SetModTime(ctx, src.ModTime(ctx))
15191628
if err != nil {
@@ -1576,6 +1685,14 @@ func (f *Fs) OpenWriterAt(ctx context.Context, remote string, size int64) (fs.Wr
15761685
}
15771686
}
15781687

1688+
// Return wrapper for distributed FS coherence if enabled
1689+
if f.opt.SyncWrites {
1690+
return &syncingWriterAtCloser{
1691+
File: out,
1692+
path: o.path,
1693+
manager: f.dirSyncManager,
1694+
}, nil
1695+
}
15791696
return out, nil
15801697
}
15811698

@@ -1630,7 +1747,12 @@ func (o *Object) lstatWithContext(ctx context.Context) error {
16301747
// Remove an object
16311748
func (o *Object) Remove(ctx context.Context) error {
16321749
o.clearHashCache()
1633-
return remove(o.path)
1750+
err := remove(o.path)
1751+
// Queue directory sync for distributed FS coherence
1752+
if err == nil && o.fs.dirSyncManager != nil {
1753+
o.fs.dirSyncManager.Queue(filepath.Dir(o.path))
1754+
}
1755+
return err
16341756
}
16351757

16361758
// Metadata returns metadata for an object

0 commit comments

Comments
 (0)