diff --git a/flushable.go b/flushable.go index da005d799b..69c05dec71 100644 --- a/flushable.go +++ b/flushable.go @@ -79,7 +79,20 @@ type flushableEntry struct { // Holds the timestamp of when the flush will be issued. delayedFlushForcedAt time.Time // logNum corresponds to the WAL that contains the records present in the - // receiver. + // receiver. It is possible for multiple flushable entries to have the same + // logNum: + // 1. When a large batch is committed, it is not appended to the WAL. + // Instead, the memtable is rotated, and the large batch is appended to the + // flushable queue with a logNum that matches the newly immutable memtable. + // Care is taken in DB.flush1 to ensure that the large batch is always + // flushed alongside the memtable with the same logNum. + // 2. During recovery, we replay WAL entries into the mutable memtable. The + // memtable we allocate during recovery may be smaller than the memtable + // that corresponded to the original WAL at the time it was written. This + // means we may fill the memtable before finishing replaying the WAL, + // resulting in multiple consecutive flushable entries all with the same + // logNum. This is okay because we take care to flush all the flushables + // during Open. logNum base.DiskFileNum // logSize is the size in bytes of the associated WAL. Protected by DB.mu. logSize uint64 diff --git a/version_set_test.go b/version_set_test.go index 4bb3f86cce..301e515356 100644 --- a/version_set_test.go +++ b/version_set_test.go @@ -296,11 +296,25 @@ func TestVersionSetCheckpoint(t *testing.T) { } func TestVersionSetSeqNums(t *testing.T) { - mem := vfs.NewMem() - require.NoError(t, mem.MkdirAll("ext", 0755)) + var buf struct { + sync.Mutex + bytes.Buffer + } + fs := vfs.WithLogging(vfs.NewMem(), func(format string, args ...interface{}) { + buf.Mutex.Lock() + defer buf.Mutex.Unlock() + fmt.Fprintf(&buf.Buffer, format, args...) + fmt.Fprintln(&buf) + }) + defer func() { + if t.Failed() { + t.Logf("fs log:\n%s", buf.String()) + } + }() + require.NoError(t, fs.MkdirAll("ext", 0755)) opts := &Options{ - FS: mem, + FS: fs, MaxManifestFileSize: 1, Logger: testutils.Logger{T: t}, } @@ -308,49 +322,44 @@ func TestVersionSetSeqNums(t *testing.T) { require.NoError(t, err) // Snapshot has no files, so first edit will cause manifest rotation. - writeAndIngest(t, mem, d, base.MakeInternalKey([]byte("a"), 0, InternalKeyKindSet), []byte("b"), "a") + writeAndIngest(t, fs, d, base.MakeInternalKey([]byte("a"), 0, InternalKeyKindSet), []byte("b"), "a") // Snapshot has no files, and manifest has an edit from the previous ingest, // so this second ingest will cause manifest rotation. - writeAndIngest(t, mem, d, base.MakeInternalKey([]byte("c"), 0, InternalKeyKindSet), []byte("d"), "c") + writeAndIngest(t, fs, d, base.MakeInternalKey([]byte("c"), 0, InternalKeyKindSet), []byte("d"), "c") require.NoError(t, d.Close()) d, err = Open("", opts) require.NoError(t, err) defer d.Close() d.TestOnlyWaitForCleaning() - // Check that the manifest has the correct LastSeqNum, equalling the highest - // observed SeqNum. - filenames, err := mem.List("") + // Check that the current manifest has the correct LastSeqNum, equalling the + // highest observed SeqNum. + desc, err := Peek("", fs) require.NoError(t, err) - var manifestFile vfs.File - for _, filename := range filenames { - fileType, _, ok := base.ParseFilename(mem, filename) - if ok && fileType == base.FileTypeManifest { - manifestFile, err = mem.Open(filename) - require.NoError(t, err) - } - } - require.NotNil(t, manifestFile) - defer manifestFile.Close() - rr := record.NewReader(manifestFile, 0 /* logNum */) - var lastSeqNum base.SeqNum - for { - r, err := rr.Next() - if err == io.EOF { - break - } - require.NoError(t, err) - var ve manifest.VersionEdit - err = ve.Decode(r) + require.True(t, desc.Exists) + func() { + manifestFile, err := fs.Open(desc.ManifestFilename) require.NoError(t, err) - if ve.LastSeqNum != 0 { - lastSeqNum = ve.LastSeqNum + defer manifestFile.Close() + rr := record.NewReader(manifestFile, 0 /* logNum */) + var lastSeqNum base.SeqNum + for { + r, err := rr.Next() + if err == io.EOF { + break + } + require.NoError(t, err) + var ve manifest.VersionEdit + require.NoError(t, ve.Decode(r)) + if ve.LastSeqNum != 0 { + lastSeqNum = ve.LastSeqNum + } } - } - // 2 ingestions happened, so LastSeqNum should equal base.SeqNumStart + 1. - require.Equal(t, base.SeqNum(11), lastSeqNum) - // logSeqNum is always one greater than the last assigned sequence number. - require.Equal(t, d.mu.versions.logSeqNum.Load(), lastSeqNum+1) + // 2 ingestions happened, so LastSeqNum should equal base.SeqNumStart + 1. + require.Equal(t, base.SeqNum(11), lastSeqNum) + // logSeqNum is always one greater than the last assigned sequence number. + require.Equal(t, d.mu.versions.logSeqNum.Load(), lastSeqNum+1) + }() } // TestLargeKeys is a datadriven test that exercises large keys with shared diff --git a/wal/failover_manager.go b/wal/failover_manager.go index 36c1eb9764..3874bbb2e1 100644 --- a/wal/failover_manager.go +++ b/wal/failover_manager.go @@ -526,9 +526,7 @@ func (wm *failoverManager) init(o Options, initial Logs) error { } wm.recycler.Init(o.MaxNumRecyclableLogs) for _, ll := range initial { - if wm.recycler.MinRecycleLogNum() <= ll.Num { - wm.recycler.SetMinRecycleLogNum(ll.Num + 1) - } + wm.recycler.RatchetMinRecycleLogNum(ll.Num + 1) var err error wm.initialObsolete, err = appendDeletableLogs(wm.initialObsolete, ll) if err != nil { diff --git a/wal/failover_manager_test.go b/wal/failover_manager_test.go index af5b5ec8c3..e6275a079f 100644 --- a/wal/failover_manager_test.go +++ b/wal/failover_manager_test.go @@ -382,7 +382,7 @@ func TestManagerFailover(t *testing.T) { var b strings.Builder fmt.Fprintf(&b, "%s\n", errorToStr(err)) if err == nil { - fmt.Fprintf(&b, "recycler min-log-num: %d\n", fm.recycler.MinRecycleLogNum()) + fmt.Fprintf(&b, "recycler min-log-num: %d\n", fm.recycler.minRecycleLogNum) } return b.String() diff --git a/wal/log_recycler.go b/wal/log_recycler.go index cc327b5b98..68fd8ba0d6 100644 --- a/wal/log_recycler.go +++ b/wal/log_recycler.go @@ -40,16 +40,12 @@ func (r *LogRecycler) Init(maxNumLogFiles int) { r.limit = maxNumLogFiles } -// MinRecycleLogNum returns the current minimum log number that is allowed to -// be recycled. -func (r *LogRecycler) MinRecycleLogNum() NumWAL { - return NumWAL(r.minRecycleLogNum) -} - -// SetMinRecycleLogNum sets the minimum log number that is allowed to be -// recycled. -func (r *LogRecycler) SetMinRecycleLogNum(n NumWAL) { - r.minRecycleLogNum = base.DiskFileNum(n) +// RatchetMinRecycleLogNum ratchets the minimum log number that is allowed to be +// recycled to be at least as high as the given log number. +func (r *LogRecycler) RatchetMinRecycleLogNum(n NumWAL) { + if num := base.DiskFileNum(n); num > r.minRecycleLogNum { + r.minRecycleLogNum = num + } } // Add attempts to recycle the log file specified by logInfo. Returns true if diff --git a/wal/standalone_manager.go b/wal/standalone_manager.go index 7396550db4..925e08a430 100644 --- a/wal/standalone_manager.go +++ b/wal/standalone_manager.go @@ -67,9 +67,7 @@ func (m *StandaloneManager) init(o Options, initial Logs) error { return err } for _, ll := range initial { - if m.recycler.MinRecycleLogNum() <= ll.Num { - m.recycler.SetMinRecycleLogNum(ll.Num + 1) - } + m.recycler.RatchetMinRecycleLogNum(ll.Num + 1) m.initialObsolete, err = appendDeletableLogs(m.initialObsolete, ll) if err != nil { return closeAndReturnErr(err)