Skip to content

Commit 7f04288

Browse files
authored
Make FUSE layer thinner (#215)
1 parent 409c190 commit 7f04288

File tree

4 files changed

+73
-33
lines changed

4 files changed

+73
-33
lines changed

db.go

Lines changed: 54 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -506,8 +506,40 @@ func (db *DB) OpenLTXFile(txID uint64) (*os.File, error) {
506506
return os.Open(db.LTXPath(txID, txID))
507507
}
508508

509-
// WriteDatabase writes data to the main database file.
510-
func (db *DB) WriteDatabase(f *os.File, data []byte, offset int64) error {
509+
// OpenDatabase returns a handle for the database file.
510+
func (db *DB) OpenDatabase(ctx context.Context) (*os.File, error) {
511+
return os.OpenFile(db.DatabasePath(), os.O_RDWR, 0666)
512+
}
513+
514+
// CloseDatabase closes a handle associated with the database file.
515+
func (db *DB) CloseDatabase(ctx context.Context, f *os.File) error {
516+
return f.Close()
517+
}
518+
519+
// TruncateDatabase sets the size of the database file.
520+
func (db *DB) TruncateDatabase(ctx context.Context, size int64) error {
521+
return os.Truncate(db.DatabasePath(), size)
522+
}
523+
524+
// SyncDatabase fsync's the database file.
525+
func (db *DB) SyncDatabase(ctx context.Context) error {
526+
f, err := os.Open(db.DatabasePath())
527+
if err != nil {
528+
return err
529+
} else if err := f.Sync(); err != nil {
530+
_ = f.Close()
531+
return err
532+
}
533+
return f.Close()
534+
}
535+
536+
// ReadDatabaseAt reads from the database at the specified index.
537+
func (db *DB) ReadDatabaseAt(f *os.File, data []byte, offset int64) (int, error) {
538+
return f.ReadAt(data, offset)
539+
}
540+
541+
// WriteDatabaseAt writes data to the main database file at the given index.
542+
func (db *DB) WriteDatabaseAt(f *os.File, data []byte, offset int64) error {
511543
// Return an error if the current process is not the leader.
512544
if !db.store.IsPrimary() {
513545
return ErrReadOnlyReplica
@@ -544,6 +576,11 @@ func (db *DB) WriteDatabase(f *os.File, data []byte, offset int64) error {
544576
return nil
545577
}
546578

579+
// UnlockDatabase unlocks all locks from the database file.
580+
func (db *DB) UnlockDatabase(ctx context.Context, guardSet *GuardSet) {
581+
guardSet.UnlockDatabase()
582+
}
583+
547584
// CreateJournal creates a new journal file on disk.
548585
func (db *DB) CreateJournal() (*os.File, error) {
549586
if !db.store.IsPrimary() {
@@ -552,11 +589,21 @@ func (db *DB) CreateJournal() (*os.File, error) {
552589
return os.OpenFile(db.JournalPath(), os.O_RDWR|os.O_CREATE|os.O_EXCL|os.O_TRUNC, 0666)
553590
}
554591

592+
// RemoveJournal deletes the journal file from disk.
593+
func (db *DB) RemoveJournal(ctx context.Context) error {
594+
return db.CommitJournal(JournalModeDelete)
595+
}
596+
555597
// CreateWAL creates a new WAL file on disk.
556598
func (db *DB) CreateWAL() (*os.File, error) {
557599
return os.OpenFile(db.WALPath(), os.O_RDWR|os.O_CREATE|os.O_EXCL|os.O_TRUNC, 0666)
558600
}
559601

602+
// RemoveWAL deletes the WAL file from disk.
603+
func (db *DB) RemoveWAL(ctx context.Context) error {
604+
return os.Remove(db.WALPath())
605+
}
606+
560607
// WriteWAL writes data to the WAL file. On final commit write, an LTX file is
561608
// generated for the transaction.
562609
func (db *DB) WriteWAL(f *os.File, data []byte, offset int64) error {
@@ -863,6 +910,11 @@ func (db *DB) CreateSHM() (*os.File, error) {
863910
return os.OpenFile(db.SHMPath(), os.O_RDWR|os.O_CREATE|os.O_EXCL|os.O_TRUNC, 0666)
864911
}
865912

913+
// RemoveSHM removes the SHM file from disk.
914+
func (db *DB) RemoveSHM(ctx context.Context) error {
915+
return os.Remove(db.SHMPath())
916+
}
917+
866918
// WriteSHM writes data to the SHM file.
867919
func (db *DB) WriteSHM(f *os.File, data []byte, offset int64) (int, error) {
868920
dbSHMWriteCountMetricVec.WithLabelValues(db.name).Inc()

db_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ func TestDB_WriteSnapshotTo(t *testing.T) {
2626
t.Fatal(err)
2727
}
2828

29-
if err := db.WriteDatabase(dbh, data[0:4096], 0); err != nil {
29+
if err := db.WriteDatabaseAt(dbh, data[0:4096], 0); err != nil {
3030
t.Fatal(err)
31-
} else if err := db.WriteDatabase(dbh, data[4096:8192], 4096); err != nil {
31+
} else if err := db.WriteDatabaseAt(dbh, data[4096:8192], 4096); err != nil {
3232
t.Fatal(err)
3333
}
3434

@@ -123,9 +123,9 @@ func TestDB_EnforceRetention(t *testing.T) {
123123
// Write first LTX file.
124124
if err := writeEmptyJournal(t, db); err != nil {
125125
t.Fatal(err)
126-
} else if err := db.WriteDatabase(dbh, data[0:4096], 0); err != nil {
126+
} else if err := db.WriteDatabaseAt(dbh, data[0:4096], 0); err != nil {
127127
t.Fatal(err)
128-
} else if err := db.WriteDatabase(dbh, data[4096:8192], 4096); err != nil {
128+
} else if err := db.WriteDatabaseAt(dbh, data[4096:8192], 4096); err != nil {
129129
t.Fatal(err)
130130
} else if err := db.CommitJournal(litefs.JournalModeDelete); err != nil {
131131
t.Fatal(err)
@@ -139,7 +139,7 @@ func TestDB_EnforceRetention(t *testing.T) {
139139
// Write a second LTX file.
140140
if err := writeEmptyJournal(t, db); err != nil {
141141
t.Fatal(err)
142-
} else if err := db.WriteDatabase(dbh, data[0:4096], 0); err != nil {
142+
} else if err := db.WriteDatabaseAt(dbh, data[0:4096], 0); err != nil {
143143
t.Fatal(err)
144144
} else if err := db.CommitJournal(litefs.JournalModeDelete); err != nil {
145145
t.Fatal(err)
@@ -148,7 +148,7 @@ func TestDB_EnforceRetention(t *testing.T) {
148148
// Write another LTX file.
149149
if err := writeEmptyJournal(t, db); err != nil {
150150
t.Fatal(err)
151-
} else if err := db.WriteDatabase(dbh, data[0:4096], 0); err != nil {
151+
} else if err := db.WriteDatabaseAt(dbh, data[0:4096], 0); err != nil {
152152
t.Fatal(err)
153153
} else if err := db.CommitJournal(litefs.JournalModeDelete); err != nil {
154154
t.Fatal(err)
@@ -180,9 +180,9 @@ func TestDB_EnforceRetention(t *testing.T) {
180180
// Write first LTX file.
181181
if err := writeEmptyJournal(t, db); err != nil {
182182
t.Fatal(err)
183-
} else if err := db.WriteDatabase(dbh, data[0:4096], 0); err != nil {
183+
} else if err := db.WriteDatabaseAt(dbh, data[0:4096], 0); err != nil {
184184
t.Fatal(err)
185-
} else if err := db.WriteDatabase(dbh, data[4096:8192], 4096); err != nil {
185+
} else if err := db.WriteDatabaseAt(dbh, data[4096:8192], 4096); err != nil {
186186
t.Fatal(err)
187187
} else if err := db.CommitJournal(litefs.JournalModeDelete); err != nil {
188188
t.Fatal(err)
@@ -191,7 +191,7 @@ func TestDB_EnforceRetention(t *testing.T) {
191191
// Write a second LTX file.
192192
if err := writeEmptyJournal(t, db); err != nil {
193193
t.Fatal(err)
194-
} else if err := db.WriteDatabase(dbh, data[0:4096], 0); err != nil {
194+
} else if err := db.WriteDatabaseAt(dbh, data[0:4096], 0); err != nil {
195195
t.Fatal(err)
196196
} else if err := db.CommitJournal(litefs.JournalModeDelete); err != nil {
197197
t.Fatal(err)

fuse/database_node.go

Lines changed: 7 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (n *DatabaseNode) Attr(ctx context.Context, attr *fuse.Attr) error {
5959

6060
func (n *DatabaseNode) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *fuse.SetattrResponse) error {
6161
if req.Valid.Size() {
62-
if err := os.Truncate(n.db.DatabasePath(), int64(req.Size)); err != nil {
62+
if err := n.db.TruncateDatabase(ctx, int64(req.Size)); err != nil {
6363
return err
6464
}
6565
}
@@ -69,27 +69,15 @@ func (n *DatabaseNode) Setattr(ctx context.Context, req *fuse.SetattrRequest, re
6969
func (n *DatabaseNode) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fs.Handle, error) {
7070
resp.Flags |= fuse.OpenKeepCache
7171

72-
f, err := os.OpenFile(n.db.DatabasePath(), os.O_RDWR, 0666)
72+
f, err := n.db.OpenDatabase(ctx)
7373
if err != nil {
7474
return nil, err
7575
}
7676
return newDatabaseHandle(n, f), nil
7777
}
7878

7979
func (n *DatabaseNode) Fsync(ctx context.Context, req *fuse.FsyncRequest) error {
80-
f, err := os.Open(n.db.DatabasePath())
81-
if err != nil {
82-
return err
83-
}
84-
defer func() { _ = f.Close() }()
85-
86-
if err := f.Sync(); err != nil {
87-
return err
88-
} else if err := f.Close(); err != nil {
89-
return err
90-
}
91-
92-
return nil
80+
return n.db.SyncDatabase(ctx)
9381
}
9482

9583
func (n *DatabaseNode) Forget() { n.fsys.root.ForgetNode(n) }
@@ -135,7 +123,7 @@ func newDatabaseHandle(node *DatabaseNode, file *os.File) *DatabaseHandle {
135123

136124
func (h *DatabaseHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
137125
buf := make([]byte, req.Size)
138-
n, err := h.file.ReadAt(buf, req.Offset)
126+
n, err := h.node.db.ReadDatabaseAt(h.file, buf, req.Offset)
139127
if err == io.EOF {
140128
err = nil
141129
}
@@ -144,7 +132,7 @@ func (h *DatabaseHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *
144132
}
145133

146134
func (h *DatabaseHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {
147-
if err := h.node.db.WriteDatabase(h.file, req.Data, req.Offset); err != nil {
135+
if err := h.node.db.WriteDatabaseAt(h.file, req.Data, req.Offset); err != nil {
148136
log.Printf("fuse: write(): database error: %s", err)
149137
return err
150138
}
@@ -154,13 +142,13 @@ func (h *DatabaseHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp
154142

155143
func (h *DatabaseHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
156144
if gs := h.node.fsys.GuardSet(h.node.db, req.LockOwner); gs != nil {
157-
gs.UnlockDatabase()
145+
h.node.db.UnlockDatabase(ctx, gs)
158146
}
159147
return nil
160148
}
161149

162150
func (h *DatabaseHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) error {
163-
return h.file.Close()
151+
return h.node.db.CloseDatabase(ctx, h.file)
164152
}
165153

166154
func (h *DatabaseHandle) Lock(ctx context.Context, req *fuse.LockRequest) error {

fuse/root_node.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -265,17 +265,17 @@ func (n *RootNode) Remove(ctx context.Context, req *fuse.RemoveRequest) (err err
265265

266266
switch fileType {
267267
case litefs.FileTypeJournal:
268-
if err := db.CommitJournal(litefs.JournalModeDelete); err != nil {
268+
if err := db.RemoveJournal(ctx); err != nil {
269269
log.Printf("fuse: commit error: %s", err)
270270
return err
271271
}
272272
return nil
273273

274274
case litefs.FileTypeWAL:
275-
return os.Remove(db.WALPath())
275+
return db.RemoveWAL(ctx)
276276

277277
case litefs.FileTypeSHM:
278-
return os.Remove(db.SHMPath())
278+
return db.RemoveSHM(ctx)
279279

280280
default:
281281
return fuse.ToErrno(syscall.ENOSYS)

0 commit comments

Comments
 (0)