Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion flushable.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,20 @@ type flushableEntry struct {
// Holds the timestamp of when the flush will be issued.
delayedFlushForcedAt time.Time
// logNum corresponds to the WAL that contains the records present in the
// receiver.
// receiver. It is possible for multiple flushable entries to have the same
// logNum:
// 1. When a large batch is committed, it is not appended to the WAL.
// Instead, the memtable is rotated, and the large batch is appended to the
// flushable queue with a logNum that matches the newly immutable memtable.
// Care is taken in DB.flush1 to ensure that the large batch is always
// flushed alongside the memtable with the same logNum.
// 2. During recovery, we replay WAL entries into the mutable memtable. The
// memtable we allocate during recovery may be smaller than the memtable
// that corresponded to the original WAL at the time it was written. This
// means we may fill the memtable before finishing replaying the WAL,
// resulting in multiple consecutive flushable entries all with the same
// logNum. This is okay because we take care to flush all the flushables
// during Open.
logNum base.DiskFileNum
// logSize is the size in bytes of the associated WAL. Protected by DB.mu.
logSize uint64
Expand Down
79 changes: 44 additions & 35 deletions version_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,61 +296,70 @@ func TestVersionSetCheckpoint(t *testing.T) {
}

func TestVersionSetSeqNums(t *testing.T) {
mem := vfs.NewMem()
require.NoError(t, mem.MkdirAll("ext", 0755))
var buf struct {
sync.Mutex
bytes.Buffer
}
fs := vfs.WithLogging(vfs.NewMem(), func(format string, args ...interface{}) {
buf.Mutex.Lock()
defer buf.Mutex.Unlock()
fmt.Fprintf(&buf.Buffer, format, args...)
fmt.Fprintln(&buf)
})
defer func() {
if t.Failed() {
t.Logf("fs log:\n%s", buf.String())
}
}()
require.NoError(t, fs.MkdirAll("ext", 0755))

opts := &Options{
FS: mem,
FS: fs,
MaxManifestFileSize: 1,
Logger: testutils.Logger{T: t},
}
d, err := Open("", opts)
require.NoError(t, err)

// Snapshot has no files, so first edit will cause manifest rotation.
writeAndIngest(t, mem, d, base.MakeInternalKey([]byte("a"), 0, InternalKeyKindSet), []byte("b"), "a")
writeAndIngest(t, fs, d, base.MakeInternalKey([]byte("a"), 0, InternalKeyKindSet), []byte("b"), "a")
// Snapshot has no files, and manifest has an edit from the previous ingest,
// so this second ingest will cause manifest rotation.
writeAndIngest(t, mem, d, base.MakeInternalKey([]byte("c"), 0, InternalKeyKindSet), []byte("d"), "c")
writeAndIngest(t, fs, d, base.MakeInternalKey([]byte("c"), 0, InternalKeyKindSet), []byte("d"), "c")
require.NoError(t, d.Close())
d, err = Open("", opts)
require.NoError(t, err)
defer d.Close()
d.TestOnlyWaitForCleaning()

// Check that the manifest has the correct LastSeqNum, equalling the highest
// observed SeqNum.
filenames, err := mem.List("")
// Check that the current manifest has the correct LastSeqNum, equalling the
// highest observed SeqNum.
desc, err := Peek("", fs)
require.NoError(t, err)
var manifestFile vfs.File
for _, filename := range filenames {
fileType, _, ok := base.ParseFilename(mem, filename)
if ok && fileType == base.FileTypeManifest {
manifestFile, err = mem.Open(filename)
require.NoError(t, err)
}
}
require.NotNil(t, manifestFile)
defer manifestFile.Close()
rr := record.NewReader(manifestFile, 0 /* logNum */)
var lastSeqNum base.SeqNum
for {
r, err := rr.Next()
if err == io.EOF {
break
}
require.NoError(t, err)
var ve manifest.VersionEdit
err = ve.Decode(r)
require.True(t, desc.Exists)
func() {
manifestFile, err := fs.Open(desc.ManifestFilename)
require.NoError(t, err)
if ve.LastSeqNum != 0 {
lastSeqNum = ve.LastSeqNum
defer manifestFile.Close()
rr := record.NewReader(manifestFile, 0 /* logNum */)
var lastSeqNum base.SeqNum
for {
r, err := rr.Next()
if err == io.EOF {
break
}
require.NoError(t, err)
var ve manifest.VersionEdit
require.NoError(t, ve.Decode(r))
if ve.LastSeqNum != 0 {
lastSeqNum = ve.LastSeqNum
}
}
}
// 2 ingestions happened, so LastSeqNum should equal base.SeqNumStart + 1.
require.Equal(t, base.SeqNum(11), lastSeqNum)
// logSeqNum is always one greater than the last assigned sequence number.
require.Equal(t, d.mu.versions.logSeqNum.Load(), lastSeqNum+1)
// 2 ingestions happened, so LastSeqNum should equal base.SeqNumStart + 1.
require.Equal(t, base.SeqNum(11), lastSeqNum)
// logSeqNum is always one greater than the last assigned sequence number.
require.Equal(t, d.mu.versions.logSeqNum.Load(), lastSeqNum+1)
}()
}

// TestLargeKeys is a datadriven test that exercises large keys with shared
Expand Down
4 changes: 1 addition & 3 deletions wal/failover_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,9 +526,7 @@ func (wm *failoverManager) init(o Options, initial Logs) error {
}
wm.recycler.Init(o.MaxNumRecyclableLogs)
for _, ll := range initial {
if wm.recycler.MinRecycleLogNum() <= ll.Num {
wm.recycler.SetMinRecycleLogNum(ll.Num + 1)
}
wm.recycler.RatchetMinRecycleLogNum(ll.Num + 1)
var err error
wm.initialObsolete, err = appendDeletableLogs(wm.initialObsolete, ll)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion wal/failover_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func TestManagerFailover(t *testing.T) {
var b strings.Builder
fmt.Fprintf(&b, "%s\n", errorToStr(err))
if err == nil {
fmt.Fprintf(&b, "recycler min-log-num: %d\n", fm.recycler.MinRecycleLogNum())
fmt.Fprintf(&b, "recycler min-log-num: %d\n", fm.recycler.minRecycleLogNum)
}
return b.String()

Expand Down
16 changes: 6 additions & 10 deletions wal/log_recycler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,12 @@ func (r *LogRecycler) Init(maxNumLogFiles int) {
r.limit = maxNumLogFiles
}

// MinRecycleLogNum returns the current minimum log number that is allowed to
// be recycled.
func (r *LogRecycler) MinRecycleLogNum() NumWAL {
return NumWAL(r.minRecycleLogNum)
}

// SetMinRecycleLogNum sets the minimum log number that is allowed to be
// recycled.
func (r *LogRecycler) SetMinRecycleLogNum(n NumWAL) {
r.minRecycleLogNum = base.DiskFileNum(n)
// RatchetMinRecycleLogNum ratchets the minimum log number that is allowed to be
// recycled to be at least as high as the given log number.
func (r *LogRecycler) RatchetMinRecycleLogNum(n NumWAL) {
if num := base.DiskFileNum(n); num > r.minRecycleLogNum {
r.minRecycleLogNum = num
}
}

// Add attempts to recycle the log file specified by logInfo. Returns true if
Expand Down
4 changes: 1 addition & 3 deletions wal/standalone_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ func (m *StandaloneManager) init(o Options, initial Logs) error {
return err
}
for _, ll := range initial {
if m.recycler.MinRecycleLogNum() <= ll.Num {
m.recycler.SetMinRecycleLogNum(ll.Num + 1)
}
m.recycler.RatchetMinRecycleLogNum(ll.Num + 1)
m.initialObsolete, err = appendDeletableLogs(m.initialObsolete, ll)
if err != nil {
return closeAndReturnErr(err)
Expand Down