Skip to content

Commit f7ea518

Browse files
committed
metamorphic: add op for crash during Open
1 parent 8498142 commit f7ea518

File tree

9 files changed

+187
-33
lines changed

9 files changed

+187
-33
lines changed

metamorphic/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const (
2828
OpDBFlush
2929
OpDBRatchetFormatMajorVersion
3030
OpDBRestart
31+
OpDBCrashDuringOpen
3132
OpDBEstimateDiskUsage
3233
OpIterClose
3334
OpIterFirst
@@ -159,6 +160,7 @@ func DefaultOpConfig() OpConfig {
159160
OpDBFlush: 2,
160161
OpDBRatchetFormatMajorVersion: 1,
161162
OpDBRestart: 2,
163+
OpDBCrashDuringOpen: 1,
162164
OpDBEstimateDiskUsage: 1,
163165
OpIterClose: 5,
164166
OpIterFirst: 100,
@@ -221,6 +223,7 @@ func ReadOpConfig() OpConfig {
221223
OpDBFlush: 0,
222224
OpDBRatchetFormatMajorVersion: 0,
223225
OpDBRestart: 0,
226+
OpDBCrashDuringOpen: 0,
224227
OpDBEstimateDiskUsage: 0,
225228
OpIterClose: 5,
226229
OpIterFirst: 100,
@@ -280,6 +283,7 @@ func WriteOpConfig() OpConfig {
280283
OpDBFlush: 2,
281284
OpDBRatchetFormatMajorVersion: 1,
282285
OpDBRestart: 2,
286+
OpDBCrashDuringOpen: 1,
283287
OpDBEstimateDiskUsage: 1,
284288
OpIterClose: 0,
285289
OpIterFirst: 0,

metamorphic/generator.go

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,8 @@ func (g *generator) generate(count uint64) []op {
167167
OpDBDownload: g.dbDownload,
168168
OpDBFlush: g.dbFlush,
169169
OpDBRatchetFormatMajorVersion: g.dbRatchetFormatMajorVersion,
170-
OpDBRestart: g.dbRestart,
170+
OpDBRestart: g.dbRestart(false /* shouldCrashDuringOpen */),
171+
OpDBCrashDuringOpen: g.dbRestart(true /* shouldCrashDuringOpen */),
171172
OpDBEstimateDiskUsage: g.dbEstimateDiskUsage,
172173
OpIterClose: g.randIter(g.iterClose),
173174
OpIterFirst: g.randIter(g.iterFirst),
@@ -465,27 +466,33 @@ func (g *generator) dbRatchetFormatMajorVersion() {
465466
g.add(&dbRatchetFormatMajorVersionOp{dbID: dbID, vers: vers})
466467
}
467468

468-
func (g *generator) dbRestart() {
469-
// Close any live iterators and snapshots, so that we can close the DB
470-
// cleanly.
471-
dbID := g.dbs.rand(g.rng)
472-
for len(g.liveIters) > 0 {
473-
g.randIter(g.iterClose)()
474-
}
475-
for len(g.liveSnapshots) > 0 {
476-
g.snapshotClose()
477-
}
478-
// Close the batches.
479-
for len(g.liveBatches) > 0 {
480-
batchID := g.liveBatches[0]
481-
g.removeBatchFromGenerator(batchID)
482-
g.add(&closeOp{objID: batchID})
483-
}
484-
if len(g.liveReaders) != len(g.dbs) || len(g.liveWriters) != len(g.dbs) {
485-
panic(fmt.Sprintf("unexpected counts: liveReaders %d, liveWriters: %d",
486-
len(g.liveReaders), len(g.liveWriters)))
469+
func (g *generator) dbRestart(shouldCrashDuringOpen bool) func() {
470+
return func() {
471+
// Close any live iterators and snapshots, so that we can close the DB
472+
// cleanly.
473+
dbID := g.dbs.rand(g.rng)
474+
for len(g.liveIters) > 0 {
475+
g.randIter(g.iterClose)()
476+
}
477+
for len(g.liveSnapshots) > 0 {
478+
g.snapshotClose()
479+
}
480+
// Close the batches.
481+
for len(g.liveBatches) > 0 {
482+
batchID := g.liveBatches[0]
483+
g.removeBatchFromGenerator(batchID)
484+
g.add(&closeOp{objID: batchID})
485+
}
486+
if len(g.liveReaders) != len(g.dbs) || len(g.liveWriters) != len(g.dbs) {
487+
panic(fmt.Sprintf("unexpected counts: liveReaders %d, liveWriters: %d",
488+
len(g.liveReaders), len(g.liveWriters)))
489+
}
490+
if shouldCrashDuringOpen {
491+
g.add(&dbUncleanRestartOp{dbID: dbID})
492+
} else {
493+
g.add(&dbRestartOp{dbID: dbID})
494+
}
487495
}
488-
g.add(&dbRestartOp{dbID: dbID})
489496
}
490497

491498
// maybeSetSnapshotIterBounds must be called whenever creating a new iterator or

metamorphic/key_manager.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -883,6 +883,7 @@ func opWrittenKeys(untypedOp op) [][]byte {
883883
case *closeOp:
884884
case *compactOp:
885885
case *dbRestartOp:
886+
case *dbUncleanRestartOp:
886887
case *deleteOp:
887888
return [][]byte{t.key}
888889
case *deleteRangeOp:

metamorphic/meta.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -560,7 +560,7 @@ func RunOnce(t TestingT, runDir string, seed uint64, historyPath string, rOpts .
560560
// multi-instance mode.
561561
testOpts.Opts.WALFailover = nil
562562
} else {
563-
testOpts.Opts.WALFailover.Secondary.FS = opts.FS
563+
testOpts.Opts.WALFailover.Secondary.FS = vfs.NewCrashableMem()
564564
}
565565
}
566566

metamorphic/ops.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1965,7 +1965,7 @@ type dbRestartOp struct {
19651965
}
19661966

19671967
func (o *dbRestartOp) run(t *Test, h historyRecorder) {
1968-
if err := t.restartDB(o.dbID); err != nil {
1968+
if err := t.restartDB(o.dbID, false /* shouldCrashDuringOpen */); err != nil {
19691969
h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
19701970
h.history.err.Store(errors.Wrap(err, "dbRestartOp"))
19711971
} else {
@@ -1980,6 +1980,37 @@ func (o *dbRestartOp) syncObjs() objIDSlice { return o.affectedObjec
19801980
func (o *dbRestartOp) rewriteKeys(func(UserKey) UserKey) {}
19811981
func (o *dbRestartOp) diagramKeyRanges() []pebble.KeyRange { return nil }
19821982

1983+
// dbUncleanRestartOp performs an unclean restart like dbRestartOp, but also
1984+
// starts a concurrent goroutine that calls CrashClone during the Open and uses
1985+
// that clone to do a second Open. This tests crashing during Open with
1986+
// concurrent operations.
1987+
type dbUncleanRestartOp struct {
1988+
dbID objID
1989+
1990+
// affectedObjects is the list of additional objects that are affected by this
1991+
// operation, and which syncObjs() must return so that we don't perform the
1992+
// restart in parallel with other operations to affected objects.
1993+
affectedObjects []objID
1994+
}
1995+
1996+
func (o *dbUncleanRestartOp) run(t *Test, h historyRecorder) {
1997+
if err := t.restartDB(o.dbID, true /* shouldCrashDuringOpen */); err != nil {
1998+
h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
1999+
h.history.err.Store(errors.Wrap(err, "dbCrashDuringOpenOp"))
2000+
} else {
2001+
h.Recordf("%s", o.formattedString(t.testOpts.KeyFormat))
2002+
}
2003+
}
2004+
2005+
func (o *dbUncleanRestartOp) formattedString(KeyFormat) string {
2006+
return fmt.Sprintf("%s.RestartWithCrashClone()", o.dbID)
2007+
}
2008+
func (o *dbUncleanRestartOp) receiver() objID { return o.dbID }
2009+
func (o *dbUncleanRestartOp) syncObjs() objIDSlice { return o.affectedObjects }
2010+
2011+
func (o *dbUncleanRestartOp) rewriteKeys(func(UserKey) UserKey) {}
2012+
func (o *dbUncleanRestartOp) diagramKeyRanges() []pebble.KeyRange { return nil }
2013+
19832014
func formatOps(kf KeyFormat, ops []op) string {
19842015
var buf strings.Builder
19852016
for _, op := range ops {

metamorphic/options.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -741,8 +741,9 @@ func RandomOptions(rng *rand.Rand, kf KeyFormat, cfg RandomOptionsCfg) *TestOpti
741741
// maintains a maximum history 120 entries, so the healthy interval
742742
// must not exceed 119x the probe interval.
743743
healthyInterval := scaleDuration(probeInterval, 1.0, 119.0)
744+
newMem := vfs.NewCrashableMem()
744745
opts.WALFailover = &pebble.WALFailoverOptions{
745-
Secondary: wal.Dir{FS: vfs.Default, Dirname: pebble.MakeStoreRelativePath(vfs.Default, "wal_secondary")},
746+
Secondary: wal.Dir{FS: newMem, Dirname: pebble.MakeStoreRelativePath(newMem, "wal_secondary")},
746747
FailoverOptions: wal.FailoverOptions{
747748
PrimaryDirProbeInterval: probeInterval,
748749
HealthyProbeLatencyThreshold: healthyThreshold,

metamorphic/parser.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ func opArgs(op op) (receiverID *objID, targetID *objID, args []interface{}) {
6262
return &t.dbID, nil, []interface{}{&t.vers}
6363
case *dbRestartOp:
6464
return &t.dbID, nil, nil
65+
case *dbUncleanRestartOp:
66+
return &t.dbID, nil, nil
6567
case *deleteOp:
6668
return &t.writerID, nil, []interface{}{&t.key}
6769
case *deleteRangeOp:
@@ -177,6 +179,7 @@ var methods = map[string]*methodInfo{
177179
"RatchetFormatMajorVersion": makeMethod(dbRatchetFormatMajorVersionOp{}, dbTag),
178180
"Replicate": makeMethod(replicateOp{}, dbTag),
179181
"Restart": makeMethod(dbRestartOp{}, dbTag),
182+
"RestartWithCrashClone": makeMethod(dbUncleanRestartOp{}, dbTag),
180183
"SeekGE": makeMethod(iterSeekGEOp{}, iterTag),
181184
"SeekLT": makeMethod(iterSeekLTOp{}, iterTag),
182185
"SeekPrefixGE": makeMethod(iterSeekPrefixGEOp{}, iterTag),
@@ -749,6 +752,16 @@ func computeDerivedFields(ops []op) {
749752
}
750753
// Sort so the output is deterministic.
751754
slices.Sort(v.affectedObjects)
755+
case *dbUncleanRestartOp:
756+
// Find all objects that use this db.
757+
v.affectedObjects = nil
758+
for obj, db := range objToDB {
759+
if db == v.dbID {
760+
v.affectedObjects = append(v.affectedObjects, obj)
761+
}
762+
}
763+
// Sort so the output is deterministic.
764+
slices.Sort(v.affectedObjects)
752765
case *ingestOp:
753766
v.derivedDBIDs = make([]objID, len(v.batchIDs))
754767
for i := range v.batchIDs {

metamorphic/test.go

Lines changed: 97 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package metamorphic
77
import (
88
"fmt"
99
"io"
10+
"math/rand/v2"
1011
"os"
1112
"path"
1213
"runtime/debug"
@@ -321,10 +322,11 @@ func (t *Test) minFMV() pebble.FormatMajorVersion {
321322
return minVersion
322323
}
323324

324-
func (t *Test) restartDB(dbID objID) error {
325+
func (t *Test) restartDB(dbID objID, shouldCrashDuringOpen bool) error {
325326
db := t.getDB(dbID)
326-
// If strictFS is not used, we use pebble.NoSync for writeOpts, so we can't
327-
// restart the database (even if we don't revert to synced data).
327+
// If strictFS is not used, no-op since we end up using pebble.NoSync for
328+
// writeOpts. In the case of pebble.NoSync, we can't restart the database
329+
// even if we don't revert to synced data.
328330
if !t.testOpts.strictFS {
329331
return nil
330332
}
@@ -348,15 +350,26 @@ func (t *Test) restartDB(dbID objID) error {
348350
}
349351
}
350352
t.opts.FS = crashFS
353+
var slowFS *errorfs.FS
354+
// If we should crash during Open, inject some latency into the filesystem
355+
// so that the first Open is slow enough for us to capture some arbitrary
356+
// intermediate state.
357+
if shouldCrashDuringOpen {
358+
seed := time.Now().UnixNano()
359+
t.opts.Logger.Infof("seed %d", seed)
360+
mean := time.Duration(rand.IntN(20) + 10*int(time.Millisecond))
361+
t.opts.Logger.Infof("Injecting mean %s of latency with p=%.3f", mean, 1.0)
362+
slowFS = errorfs.Wrap(crashFS,
363+
errorfs.RandomLatency(errorfs.Randomly(1.0, seed), mean, seed, time.Second))
364+
t.opts.FS = slowFS
365+
}
351366
t.opts.WithFSDefaults()
352367
// We want to set the new FS in testOpts too, so they are propagated to the
353368
// TestOptions that were used with metamorphic.New().
354369
t.testOpts.Opts.FS = t.opts.FS
355-
if t.opts.WALFailover != nil {
356-
t.opts.WALFailover.Secondary.FS = t.opts.FS
357-
t.testOpts.Opts.WALFailover.Secondary.FS = t.opts.FS
358-
}
359370

371+
firstOpenDone := make(chan struct{})
372+
secondOpenDone := make(chan struct{})
360373
// TODO(jackson): Audit errorRate and ensure custom options' hooks semantics
361374
// are well defined within the context of retries.
362375
err := t.withRetries(func() (err error) {
@@ -373,15 +386,90 @@ func (t *Test) restartDB(dbID objID) error {
373386
dir = path.Join(dir, fmt.Sprintf("db%d", dbID.slot()))
374387
}
375388
o := t.finalizeOptions()
389+
if shouldCrashDuringOpen {
390+
go func() {
391+
err = t.simulateCrashDuringOpen(dbID, slowFS, secondOpenDone, firstOpenDone)
392+
}()
393+
if err != nil {
394+
return err
395+
}
396+
}
376397
t.dbs[dbID.slot()-1], err = pebble.Open(dir, &o)
377-
if err != nil {
378-
return err
398+
if shouldCrashDuringOpen {
399+
firstOpenDone <- struct{}{}
379400
}
380401
return err
381402
})
403+
if shouldCrashDuringOpen {
404+
<-secondOpenDone
405+
}
382406
return err
383407
}
384408

409+
func (t *Test) simulateCrashDuringOpen(
410+
dbID objID, slowFS *errorfs.FS, secondOpenDone, firstOpenDone chan struct{},
411+
) error {
412+
defer func() { secondOpenDone <- struct{}{} }()
413+
414+
// Wait a bit for the first Open to make some progress.
415+
time.Sleep(30 * time.Millisecond)
416+
417+
// Create a crash clone of the current filesystem state.
418+
rng := rand.New(rand.NewPCG(0, uint64(time.Now().UnixNano())))
419+
crashCloneFS, err := slowFS.CrashClone(vfs.CrashCloneCfg{
420+
UnsyncedDataPercent: rng.IntN(101),
421+
RNG: rng,
422+
})
423+
if err != nil {
424+
return err
425+
}
426+
427+
// After the first Open has completed, close the resulting DB and open the
428+
// second DB.
429+
<-firstOpenDone
430+
err = t.dbs[dbID.slot()-1].Close()
431+
if err != nil {
432+
return err
433+
}
434+
// Release any resources held by custom options. This may be used, for
435+
// example, by the encryption-at-rest custom option (within the Cockroach
436+
// repository) to close the file registry.
437+
for i := range t.testOpts.CustomOpts {
438+
if err := t.testOpts.CustomOpts[i].Close(t.opts); err != nil {
439+
return err
440+
}
441+
}
442+
t.opts.FS = crashCloneFS
443+
if t.opts.WALFailover != nil {
444+
ccsmemFS := t.opts.WALFailover.Secondary.FS.(*vfs.MemFS)
445+
crashCloneSecondaryFS := ccsmemFS.CrashClone(vfs.CrashCloneCfg{
446+
UnsyncedDataPercent: rng.IntN(101),
447+
RNG: rng,
448+
})
449+
t.testOpts.Opts.WALFailover.Secondary.FS = crashCloneSecondaryFS
450+
t.opts.WALFailover.Secondary.FS = crashCloneSecondaryFS
451+
}
452+
// Reacquire any resources required by custom options. This may be used, for
453+
// example, by the encryption-at-rest custom option (within the Cockroach
454+
// repository) to reopen the file registry.
455+
for i := range t.testOpts.CustomOpts {
456+
if err := t.testOpts.CustomOpts[i].Open(t.opts); err != nil {
457+
return err
458+
}
459+
}
460+
// Create a copy of options for the second DB.
461+
dir := t.dir
462+
if len(t.dbs) > 1 {
463+
dir = path.Join(dir, fmt.Sprintf("db%d", dbID.slot()))
464+
}
465+
o := t.finalizeOptions()
466+
t.dbs[dbID.slot()-1], err = pebble.Open(dir, &o)
467+
if err != nil {
468+
return err
469+
}
470+
return nil
471+
}
472+
385473
func (t *Test) saveInMemoryDataInternal() error {
386474
if rootFS := vfs.Root(t.opts.FS); rootFS != vfs.Default {
387475
// t.opts.FS is an in-memory system; copy it to disk.

vfs/errorfs/errorfs.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,15 @@ func (fs *FS) Stat(name string) (vfs.FileInfo, error) {
457457
return fs.fs.Stat(name)
458458
}
459459

460+
// CrashClone implements MemFS.CrashClone.
461+
func (fs *FS) CrashClone(cfg vfs.CrashCloneCfg) (*vfs.MemFS, error) {
462+
memFs, ok := fs.fs.(*vfs.MemFS)
463+
if !ok {
464+
return nil, errors.New("not a MemFS")
465+
}
466+
return memFs.CrashClone(cfg), nil
467+
}
468+
460469
// errorFile implements vfs.File. The interface is implemented on the pointer
461470
// type to allow pointer equality comparisons.
462471
type errorFile struct {

0 commit comments

Comments
 (0)