Skip to content

Commit 3214797

Browse files
committed
wal: clean up all segment files
When WAL failover is configured, a single logical WAL may be composed of multiple physical segment files. The creation of these physical segment files occurs asynchronously. This asynchronous creation may race with the closing of the failover writer. Specifically, an outstanding attempt to create a segment file may block. Meanwhile, writes persisting all the necessary records may complete on the other device. If the logical WAL is now finished, the failover writer may be closed by higher-level code while the outstanding attempt to create a file remains. The system progresses, accumulating a list of obsolete files based on the set that existed at the time that the writer was closed. Eventually, the outstanding file creation may complete, creating a new file. Previously this race resulted in leaking the straggling file. Because we preallocate disk space for WAL files, this logically empty file could still consume significant disk space. This leaked file could not be discovered and deleted until process restart. This commit adjusts the FailoverWriter to invoke a callback on the FailoverManager, propagating information about these abandoned segment files. This allows the FailoverManager to propagate these obsolete files to higher levels for deletion. Fix #5355.
1 parent a0c64af commit 3214797

File tree

5 files changed

+184
-26
lines changed

5 files changed

+184
-26
lines changed

wal/failover_manager.go

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package wal
66

77
import (
8+
"cmp"
89
"fmt"
910
"io"
1011
"math/rand/v2"
@@ -537,26 +538,22 @@ func (wm *failoverManager) init(o Options, initial Logs) error {
537538
func (wm *failoverManager) List() Logs {
538539
wm.mu.Lock()
539540
defer wm.mu.Unlock()
540-
n := len(wm.mu.closedWALs)
541-
if wm.mu.ww != nil {
542-
n++
543-
}
544-
wals := make(Logs, n)
545-
setLogicalLog := func(index int, llse logicalLogWithSizesEtc) {
541+
wals := make(Logs, 0, len(wm.mu.closedWALs)+1)
542+
setLogicalLog := func(llse logicalLogWithSizesEtc) {
546543
segments := make([]segment, len(llse.segments))
547544
for j := range llse.segments {
548545
segments[j] = llse.segments[j].segment
549546
}
550-
wals[index] = LogicalLog{
547+
wals = append(wals, LogicalLog{
551548
Num: llse.num,
552549
segments: segments,
553-
}
550+
})
554551
}
555-
for i, llse := range wm.mu.closedWALs {
556-
setLogicalLog(i, llse)
552+
for _, llse := range wm.mu.closedWALs {
553+
setLogicalLog(llse)
557554
}
558555
if wm.mu.ww != nil {
559-
setLogicalLog(n-1, wm.mu.ww.getLog())
556+
setLogicalLog(wm.mu.ww.getLog())
560557
}
561558
return wals
562559
}
@@ -637,6 +634,7 @@ func (wm *failoverManager) Create(wn NumWAL, jobID int) (Writer, error) {
637634
stopper: wm.stopper,
638635
failoverWriteAndSyncLatency: wm.opts.FailoverWriteAndSyncLatency,
639636
writerClosed: wm.writerClosed,
637+
segmentClosed: wm.segmentClosed,
640638
writerCreatedForTest: wm.opts.logWriterCreatedForTesting,
641639
writeWALSyncOffsets: wm.opts.WriteWALSyncOffsets,
642640
}
@@ -671,6 +669,35 @@ func (wm *failoverManager) writerClosed(llse logicalLogWithSizesEtc) {
671669
wm.mu.ww = nil
672670
}
673671

672+
// segmentClosed is called by the failoverWriter; see
673+
// failoverWriterOpts.segmentClosed.
674+
func (wm *failoverManager) segmentClosed(num NumWAL, s segmentWithSizeEtc) {
675+
wm.mu.Lock()
676+
defer wm.mu.Unlock()
677+
// Find the closed WAL matching the logical WAL num, if one exists. If we
678+
// find one, we append the segment to the list of segments if it's not
679+
// already there.
680+
i, found := slices.BinarySearchFunc(wm.mu.closedWALs, num, func(llse logicalLogWithSizesEtc, num NumWAL) int {
681+
return cmp.Compare(llse.num, num)
682+
})
683+
if found {
684+
segmentIndex, segmentFound := slices.BinarySearchFunc(wm.mu.closedWALs[i].segments, s.segment.logNameIndex,
685+
func(s segmentWithSizeEtc, logNameIndex LogNameIndex) int {
686+
return cmp.Compare(s.segment.logNameIndex, logNameIndex)
687+
})
688+
if !segmentFound {
689+
wm.mu.closedWALs[i].segments = slices.Insert(wm.mu.closedWALs[i].segments, segmentIndex, s)
690+
}
691+
return
692+
}
693+
// If we didn't find an existing entry in closedWALs for the provided
694+
// NumWAL, append a new entry.
695+
wm.mu.closedWALs = slices.Insert(wm.mu.closedWALs, i, logicalLogWithSizesEtc{
696+
num: num,
697+
segments: []segmentWithSizeEtc{s},
698+
})
699+
}
700+
674701
// Stats implements Manager.
675702
func (wm *failoverManager) Stats() Stats {
676703
obsoleteLogsCount, obsoleteLogSize := wm.recycler.Stats()
@@ -889,8 +916,3 @@ func (t *defaultTicker) stop() {
889916
func (t *defaultTicker) ch() <-chan time.Time {
890917
return (*time.Ticker)(t).C
891918
}
892-
893-
// Make lint happy.
894-
var _ = (*failoverMonitor).noWriter
895-
var _ = (*failoverManager).writerClosed
896-
var _ = (&stopper{}).shouldQuiesce

wal/failover_manager_test.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
package wal
66

77
import (
8+
"bytes"
89
"container/list"
910
"fmt"
11+
"math/rand/v2"
1012
"os"
1113
"slices"
1214
"strings"
@@ -15,6 +17,7 @@ import (
1517
"time"
1618

1719
"github.com/cockroachdb/datadriven"
20+
"github.com/cockroachdb/pebble/internal/testutils"
1821
"github.com/cockroachdb/pebble/vfs"
1922
"github.com/cockroachdb/pebble/vfs/errorfs"
2023
"github.com/prometheus/client_golang/prometheus"
@@ -617,3 +620,111 @@ func TestFailoverManager_SecondaryIsWritable(t *testing.T) {
617620
// are complete. Currently this is done by waiting on various channels etc.
618621
// which exposes implementation detail. See concurrency_test.monitor, in
619622
// CockroachDB, for an alternative.
623+
624+
// TestFailoverManager_AllFilesDeletable is a randomized test that validates
625+
// that all the files that are created by the manager are eventually returned by
626+
// FailoverManager.Obsolete for deletion.
627+
func TestFailoverManager_AllFilesDeletable(t *testing.T) {
628+
seed := time.Now().UnixNano()
629+
memFS := vfs.NewMem()
630+
require.NoError(t, memFS.MkdirAll("primary", os.ModePerm))
631+
require.NoError(t, memFS.MkdirAll("secondary", os.ModePerm))
632+
pcg := rand.NewPCG(0, uint64(seed))
633+
rng := rand.New(pcg)
634+
latencySeed := rng.Int64()
635+
fs := errorfs.Wrap(memFS, errorfs.RandomLatency(
636+
errorfs.Randomly(0.50, latencySeed), 10*time.Millisecond, latencySeed, 0 /* no limit */))
637+
638+
var m failoverManager
639+
require.NoError(t, m.init(Options{
640+
Primary: Dir{FS: fs, Dirname: "primary"},
641+
Secondary: Dir{FS: fs, Dirname: "secondary"},
642+
Logger: testutils.Logger{T: t},
643+
MaxNumRecyclableLogs: 0,
644+
PreallocateSize: func() int { return 4 },
645+
FailoverOptions: FailoverOptions{
646+
PrimaryDirProbeInterval: 250 * time.Microsecond,
647+
HealthyProbeLatencyThreshold: time.Millisecond,
648+
HealthyInterval: 3 * time.Millisecond,
649+
UnhealthySamplingInterval: 250 * time.Microsecond,
650+
UnhealthyOperationLatencyThreshold: func() (time.Duration, bool) { return time.Millisecond, true },
651+
},
652+
FailoverWriteAndSyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
653+
WriteWALSyncOffsets: func() bool { return false },
654+
}, nil /* initial logs */))
655+
656+
// Repeatedly create a new log file, write some random data to it, and then
657+
// maybe ratchet the minUnflushed log number, collecting the set of obsolete
658+
// log files if we do.
659+
const numIters = 20
660+
minUnflushed := int64(0)
661+
var allDeletableLogs []DeletableLog
662+
for i := 1; i <= numIters; i++ {
663+
// Create a new log file and write random data to it.
664+
w, err := m.Create(NumWAL(i), i)
665+
require.NoError(t, err)
666+
for j := 0; j < testutils.RandIntInRange(rng, 1, 4); j++ {
667+
data := testutils.RandBytes(rng, testutils.RandIntInRange(rng, 1, 1024))
668+
_, err = w.WriteRecord(data, SyncOptions{}, nil)
669+
require.NoError(t, err)
670+
}
671+
_, err = w.Close()
672+
require.NoError(t, err)
673+
674+
// Ratchet the minUnflushed log number randomly, and call Obsolete to
675+
// collect the set of deletable logs.
676+
minUnflushed = rng.Int64N(int64(i)-minUnflushed+1) + minUnflushed
677+
toDelete, err := m.Obsolete(NumWAL(minUnflushed), false)
678+
require.NoError(t, err)
679+
allDeletableLogs = append(allDeletableLogs, toDelete...)
680+
}
681+
682+
var buf bytes.Buffer
683+
defer func() {
684+
if t.Failed() {
685+
t.Log(buf.String())
686+
}
687+
}()
688+
689+
require.Eventually(t, func() bool {
690+
toDelete, err := m.Obsolete(NumWAL(minUnflushed), false)
691+
require.NoError(t, err)
692+
allDeletableLogs = append(allDeletableLogs, toDelete...)
693+
// Delete any of the obolsete log files that we collected.
694+
for _, dl := range allDeletableLogs {
695+
require.NoError(t, dl.FS.Remove(dl.Path))
696+
}
697+
allDeletableLogs = allDeletableLogs[:0]
698+
699+
// Find all the remaining log files in both the primary and secondary
700+
// directories.
701+
var fa FileAccumulator
702+
ls, err := memFS.List(m.opts.Primary.Dirname)
703+
require.NoError(t, err)
704+
secondaryLS, err := memFS.List(m.opts.Secondary.Dirname)
705+
require.NoError(t, err)
706+
for _, f := range ls {
707+
_, err := fa.MaybeAccumulate(memFS, memFS.PathJoin(m.opts.Primary.Dirname, f))
708+
require.NoError(t, err)
709+
}
710+
for _, f := range secondaryLS {
711+
_, err := fa.MaybeAccumulate(memFS, memFS.PathJoin(m.opts.Secondary.Dirname, f))
712+
require.NoError(t, err)
713+
}
714+
logs := fa.Finish()
715+
716+
// Remove any logs that are above the minUnflushed log. These are to be
717+
// expected.
718+
logs = slices.DeleteFunc(logs, func(log LogicalLog) bool {
719+
return log.Num >= NumWAL(minUnflushed)
720+
})
721+
if len(logs) > 0 {
722+
buf.Reset()
723+
fmt.Fprintf(&buf, "logs with numbers beneath %d remain on the filesystem:\n", minUnflushed)
724+
for _, log := range logs {
725+
fmt.Fprintf(&buf, "%s remains\n", log)
726+
}
727+
}
728+
return len(logs) == 0
729+
}, time.Second, 25*time.Millisecond)
730+
}

wal/failover_writer.go

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,23 @@ type failoverWriterOpts struct {
460460
stopper *stopper
461461

462462
failoverWriteAndSyncLatency prometheus.Histogram
463-
writerClosed func(logicalLogWithSizesEtc)
463+
// writerClosed is a callback invoked by the FailoverWriter when it's
464+
// closed. It notifies the FailoverManager that the writer is now closed and
465+
// propagates information about the various physical segment files that have
466+
// been created.
467+
//
468+
// Note that the asynchronous creation of physical segment files means that
469+
// the writerClosed invocation is not guaranteed to include all physical
470+
// segment files that will ultimately be created for this logical WAL. If a
471+
// new segment file is created after writerClosed is inovked, it will be
472+
// propagated to the FailoverManager via the segmentClosed callback.
473+
writerClosed func(logicalLogWithSizesEtc)
474+
// segmentClosed is a callback invoked by the FailoverWriter when a segment
475+
// file creation completes but the writerClosed callback has already been
476+
// invoked. It's used to ensure that we reclaim all physical segment files,
477+
// including ones that did not complete creation before the Writer was
478+
// closed.
479+
segmentClosed func(NumWAL, segmentWithSizeEtc)
464480

465481
writerCreatedForTest chan<- struct{}
466482

@@ -684,14 +700,18 @@ func (ww *failoverWriter) switchToNewDir(dir dirAndFileHandle) error {
684700
// returned error.
685701
ww.opts.stopper.runAsync(func() {
686702
_ = w.Close()
687-
// TODO(sumeer): consider deleting this file too, since
688-
// failoverWriter.Close may not wait for it. This is going to be
689-
// extremely rare, so the risk of garbage empty files piling up is
690-
// extremely low. Say failover happens daily and of those cases we
691-
// have to be very unlucky and the close happens while a failover was
692-
// ongoing and the previous LogWriter successfully wrote everything
693-
// (say 1% probability if we want to be pessimistic). A garbage file
694-
// every 100 days. Restarts will delete that garbage.
703+
// Invoke the segmentClosed callback to propagate knowledge that
704+
// there's an obsolete segment file we should clean up. Note
705+
// that the file may be occupying non-negligible disk space even
706+
// though we never wrote to it due to preallocation.
707+
ww.opts.segmentClosed(ww.opts.wn, segmentWithSizeEtc{
708+
segment: segment{
709+
logNameIndex: LogNameIndex(writerIndex),
710+
dir: dir.Dir,
711+
},
712+
approxFileSize: initialFileSize,
713+
synchronouslyClosed: false,
714+
})
695715
})
696716
}
697717
})

wal/failover_writer_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,7 @@ func TestFailoverWriter(t *testing.T) {
285285
stopper: stopper,
286286
failoverWriteAndSyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
287287
writerClosed: func(_ logicalLogWithSizesEtc) {},
288+
segmentClosed: func(_ NumWAL, _ segmentWithSizeEtc) {},
288289
writerCreatedForTest: logWriterCreated,
289290
writeWALSyncOffsets: func() bool { return false },
290291
}, testDirs[dirIndex])
@@ -650,6 +651,7 @@ func TestConcurrentWritersWithManyRecords(t *testing.T) {
650651
stopper: stopper,
651652
failoverWriteAndSyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
652653
writerClosed: func(_ logicalLogWithSizesEtc) {},
654+
segmentClosed: func(_ NumWAL, _ segmentWithSizeEtc) {},
653655
writerCreatedForTest: logWriterCreated,
654656
writeWALSyncOffsets: func() bool { return false },
655657
}, dirs[dirIndex])
@@ -753,6 +755,7 @@ func TestFailoverWriterManyRecords(t *testing.T) {
753755
stopper: stopper,
754756
failoverWriteAndSyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
755757
writerClosed: func(_ logicalLogWithSizesEtc) {},
758+
segmentClosed: func(_ NumWAL, _ segmentWithSizeEtc) {},
756759
writeWALSyncOffsets: func() bool { return false },
757760
}, dir)
758761
require.NoError(t, err)

wal/testdata/manager_failover

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,10 +290,11 @@ ok
290290
list-and-stats
291291
----
292292
logs:
293+
000001: {(sec,001)}
293294
000001: {(pri,002)}
294295
stats:
295296
obsolete: count 0 size 0
296-
live: count 1 size 18
297+
live: count 2 size 18
297298
failover: switches 2 pri-dur 77ms sec-dur 80ms
298299

299300
advance-time dur=1s
@@ -318,6 +319,7 @@ obsolete min-unflushed=2
318319
ok
319320
recycler empty
320321
to delete:
322+
wal 1: path: sec/000001-001.log size: 0
321323
wal 1: path: pri/000001-002.log size: 18
322324

323325
create-writer wal-num=2

0 commit comments

Comments
 (0)