Skip to content

Commit ae3bfad

Browse files
committed
wal: deflake TestManagerFailover
The invocation of the new segmentClosed callback introduced in #5388 occurs asynchronously with respect to the manager and the progression through logical log numbers. This test was flaky in two ways: If the segmentClosed callback was invoked /before/ the writerClosed callback for the same WAL, writerClosed would append a second record for the same logical log containing the set of WALs other than the one inserted by the segmentClosed callback. Conversely, if the segmentClosed callback was sufficiently delayed relative to the closing of the writer, it might not be invoked until after the test listed the set of obsolete logs. The first race is fixed by a refactoring of the segmentClosed and writerClosed callbacks, adapting them to share the same logic for merging logs. The second race is fixed through use of (datadriven.TestData).Retry to account for the nondeterminism. Fix #5401.
1 parent 32dc26d commit ae3bfad

File tree

5 files changed

+63
-53
lines changed

5 files changed

+63
-53
lines changed

wal/failover_manager.go

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,10 @@ type segmentWithSizeEtc struct {
440440
synchronouslyClosed bool
441441
}
442442

443+
func cmpSegmentWithSizeEtc(a, b segmentWithSizeEtc) int {
444+
return cmp.Compare(a.segment.logNameIndex, b.segment.logNameIndex)
445+
}
446+
443447
type failoverManager struct {
444448
opts Options
445449
// initialObsolete holds the set of DeletableLogs that formed the logs
@@ -661,41 +665,41 @@ func (wm *failoverManager) ElevateWriteStallThresholdForFailover() bool {
661665
return wm.monitor.elevateWriteStallThresholdForFailover()
662666
}
663667

668+
// writerClosed is called by the failoverWriter; see
669+
// failoverWriterOpts.writerClosed.
664670
func (wm *failoverManager) writerClosed(llse logicalLogWithSizesEtc) {
665671
wm.monitor.noWriter()
666672
wm.mu.Lock()
667673
defer wm.mu.Unlock()
668-
wm.mu.closedWALs = append(wm.mu.closedWALs, llse)
674+
wm.recordClosedWALLocked(llse)
669675
wm.mu.ww = nil
670676
}
671677

672678
// segmentClosed is called by the failoverWriter; see
673679
// failoverWriterOpts.segmentClosed.
674-
func (wm *failoverManager) segmentClosed(num NumWAL, s segmentWithSizeEtc) {
680+
func (wm *failoverManager) segmentClosed(llse logicalLogWithSizesEtc) {
675681
wm.mu.Lock()
676682
defer wm.mu.Unlock()
683+
wm.recordClosedWALLocked(llse)
684+
}
685+
686+
func (wm *failoverManager) recordClosedWALLocked(llse logicalLogWithSizesEtc) {
677687
// Find the closed WAL matching the logical WAL num, if one exists. If we
678-
// find one, we append the segment to the list of segments if it's not
679-
// already there.
680-
i, found := slices.BinarySearchFunc(wm.mu.closedWALs, num, func(llse logicalLogWithSizesEtc, num NumWAL) int {
681-
return cmp.Compare(llse.num, num)
682-
})
683-
if found {
684-
segmentIndex, segmentFound := slices.BinarySearchFunc(wm.mu.closedWALs[i].segments, s.segment.logNameIndex,
685-
func(s segmentWithSizeEtc, logNameIndex LogNameIndex) int {
686-
return cmp.Compare(s.segment.logNameIndex, logNameIndex)
687-
})
688-
if !segmentFound {
689-
wm.mu.closedWALs[i].segments = slices.Insert(wm.mu.closedWALs[i].segments, segmentIndex, s)
690-
}
688+
// find one, we merge the segments into the existing list.
689+
i, found := slices.BinarySearchFunc(wm.mu.closedWALs, llse.num,
690+
func(llse logicalLogWithSizesEtc, num NumWAL) int {
691+
return cmp.Compare(llse.num, num)
692+
})
693+
if !found {
694+
// If we didn't find an existing entry in closedWALs for the provided
695+
// NumWAL, append a new entry.
696+
wm.mu.closedWALs = slices.Insert(wm.mu.closedWALs, i, llse)
691697
return
692698
}
693-
// If we didn't find an existing entry in closedWALs for the provided
694-
// NumWAL, append a new entry.
695-
wm.mu.closedWALs = slices.Insert(wm.mu.closedWALs, i, logicalLogWithSizesEtc{
696-
num: num,
697-
segments: []segmentWithSizeEtc{s},
698-
})
699+
wm.mu.closedWALs[i].segments = append(wm.mu.closedWALs[i].segments, llse.segments...)
700+
slices.SortFunc(wm.mu.closedWALs[i].segments, cmpSegmentWithSizeEtc)
701+
wm.mu.closedWALs[i].segments = slices.CompactFunc(wm.mu.closedWALs[i].segments,
702+
func(a, b segmentWithSizeEtc) bool { return a.logNameIndex == b.logNameIndex })
699703
}
700704

701705
// Stats implements Manager.

wal/failover_manager_test.go

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -428,27 +428,29 @@ func TestManagerFailover(t *testing.T) {
428428
return b.String()
429429

430430
case "list-and-stats":
431-
logs := fm.List()
432-
stats := fm.Stats()
433-
var b strings.Builder
434-
if len(logs) > 0 {
435-
fmt.Fprintf(&b, "logs:\n")
436-
for _, f := range logs {
437-
fmt.Fprintf(&b, " %s\n", f.String())
431+
return td.Retry(t, func() string {
432+
logs := fm.List()
433+
stats := fm.Stats()
434+
var b strings.Builder
435+
if len(logs) > 0 {
436+
fmt.Fprintf(&b, "logs:\n")
437+
for _, f := range logs {
438+
fmt.Fprintf(&b, " %s\n", f.String())
439+
}
438440
}
439-
}
440-
fmt.Fprintf(&b, "stats:\n")
441-
fmt.Fprintf(&b, " obsolete: count %d size %d\n", stats.ObsoleteFileCount, stats.ObsoleteFileSize)
442-
fmt.Fprintf(&b, " live: count %d size %d\n", stats.LiveFileCount, stats.LiveFileSize)
443-
fmt.Fprintf(&b, " failover: switches %d pri-dur %s sec-dur %s\n", stats.Failover.DirSwitchCount,
444-
stats.Failover.PrimaryWriteDuration.String(), stats.Failover.SecondaryWriteDuration.String())
445-
var latencyProto io_prometheus_client.Metric
446-
stats.Failover.FailoverWriteAndSyncLatency.Write(&latencyProto)
447-
latencySampleCount := *latencyProto.Histogram.SampleCount
448-
if latencySampleCount > 0 {
449-
fmt.Fprintf(&b, " latency sample count: %d\n", latencySampleCount)
450-
}
451-
return b.String()
441+
fmt.Fprintf(&b, "stats:\n")
442+
fmt.Fprintf(&b, " obsolete: count %d size %d\n", stats.ObsoleteFileCount, stats.ObsoleteFileSize)
443+
fmt.Fprintf(&b, " live: count %d size %d\n", stats.LiveFileCount, stats.LiveFileSize)
444+
fmt.Fprintf(&b, " failover: switches %d pri-dur %s sec-dur %s\n", stats.Failover.DirSwitchCount,
445+
stats.Failover.PrimaryWriteDuration.String(), stats.Failover.SecondaryWriteDuration.String())
446+
var latencyProto io_prometheus_client.Metric
447+
stats.Failover.FailoverWriteAndSyncLatency.Write(&latencyProto)
448+
latencySampleCount := *latencyProto.Histogram.SampleCount
449+
if latencySampleCount > 0 {
450+
fmt.Fprintf(&b, " latency sample count: %d\n", latencySampleCount)
451+
}
452+
return b.String()
453+
})
452454

453455
case "write-record":
454456
var value string

wal/failover_writer.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,7 @@ type failoverWriterOpts struct {
476476
// invoked. It's used to ensure that we reclaim all physical segment files,
477477
// including ones that did not complete creation before the Writer was
478478
// closed.
479-
segmentClosed func(NumWAL, segmentWithSizeEtc)
479+
segmentClosed func(logicalLogWithSizesEtc)
480480

481481
writerCreatedForTest chan<- struct{}
482482

@@ -704,13 +704,18 @@ func (ww *failoverWriter) switchToNewDir(dir dirAndFileHandle) error {
704704
// there's an obsolete segment file we should clean up. Note
705705
// that the file may be occupying non-negligible disk space even
706706
// though we never wrote to it due to preallocation.
707-
ww.opts.segmentClosed(ww.opts.wn, segmentWithSizeEtc{
708-
segment: segment{
709-
logNameIndex: LogNameIndex(writerIndex),
710-
dir: dir.Dir,
707+
ww.opts.segmentClosed(logicalLogWithSizesEtc{
708+
num: ww.opts.wn,
709+
segments: []segmentWithSizeEtc{
710+
{
711+
segment: segment{
712+
logNameIndex: LogNameIndex(writerIndex),
713+
dir: dir.Dir,
714+
},
715+
approxFileSize: initialFileSize,
716+
synchronouslyClosed: false,
717+
},
711718
},
712-
approxFileSize: initialFileSize,
713-
synchronouslyClosed: false,
714719
})
715720
})
716721
}

wal/failover_writer_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ func TestFailoverWriter(t *testing.T) {
285285
stopper: stopper,
286286
failoverWriteAndSyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
287287
writerClosed: func(_ logicalLogWithSizesEtc) {},
288-
segmentClosed: func(_ NumWAL, _ segmentWithSizeEtc) {},
288+
segmentClosed: func(_ logicalLogWithSizesEtc) {},
289289
writerCreatedForTest: logWriterCreated,
290290
writeWALSyncOffsets: func() bool { return false },
291291
}, testDirs[dirIndex])
@@ -651,7 +651,7 @@ func TestConcurrentWritersWithManyRecords(t *testing.T) {
651651
stopper: stopper,
652652
failoverWriteAndSyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
653653
writerClosed: func(_ logicalLogWithSizesEtc) {},
654-
segmentClosed: func(_ NumWAL, _ segmentWithSizeEtc) {},
654+
segmentClosed: func(_ logicalLogWithSizesEtc) {},
655655
writerCreatedForTest: logWriterCreated,
656656
writeWALSyncOffsets: func() bool { return false },
657657
}, dirs[dirIndex])
@@ -755,7 +755,7 @@ func TestFailoverWriterManyRecords(t *testing.T) {
755755
stopper: stopper,
756756
failoverWriteAndSyncLatency: prometheus.NewHistogram(prometheus.HistogramOpts{}),
757757
writerClosed: func(_ logicalLogWithSizesEtc) {},
758-
segmentClosed: func(_ NumWAL, _ segmentWithSizeEtc) {},
758+
segmentClosed: func(_ logicalLogWithSizesEtc) {},
759759
writeWALSyncOffsets: func() bool { return false },
760760
}, dir)
761761
require.NoError(t, err)

wal/testdata/manager_failover

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,8 +290,7 @@ ok
290290
list-and-stats
291291
----
292292
logs:
293-
000001: {(sec,001)}
294-
000001: {(pri,002)}
293+
000001: {(sec,001), (pri,002)}
295294
stats:
296295
obsolete: count 0 size 0
297296
live: count 2 size 18

0 commit comments

Comments
 (0)