Skip to content

Commit 26ee296

Browse files
committed
metamorphic: add op for crash during Open
1 parent 6665d61 commit 26ee296

File tree

7 files changed

+161
-27
lines changed

7 files changed

+161
-27
lines changed

metamorphic/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ const (
2828
OpDBFlush
2929
OpDBRatchetFormatMajorVersion
3030
OpDBRestart
31+
OpDBCrashDuringOpen
3132
OpDBEstimateDiskUsage
3233
OpIterClose
3334
OpIterFirst
@@ -159,6 +160,7 @@ func DefaultOpConfig() OpConfig {
159160
OpDBFlush: 2,
160161
OpDBRatchetFormatMajorVersion: 1,
161162
OpDBRestart: 2,
163+
OpDBCrashDuringOpen: 1,
162164
OpDBEstimateDiskUsage: 1,
163165
OpIterClose: 5,
164166
OpIterFirst: 100,
@@ -221,6 +223,7 @@ func ReadOpConfig() OpConfig {
221223
OpDBFlush: 0,
222224
OpDBRatchetFormatMajorVersion: 0,
223225
OpDBRestart: 0,
226+
OpDBCrashDuringOpen: 0,
224227
OpDBEstimateDiskUsage: 0,
225228
OpIterClose: 5,
226229
OpIterFirst: 100,
@@ -280,6 +283,7 @@ func WriteOpConfig() OpConfig {
280283
OpDBFlush: 2,
281284
OpDBRatchetFormatMajorVersion: 1,
282285
OpDBRestart: 2,
286+
OpDBCrashDuringOpen: 1,
283287
OpDBEstimateDiskUsage: 1,
284288
OpIterClose: 0,
285289
OpIterFirst: 0,

metamorphic/generator.go

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,8 @@ func (g *generator) generate(count uint64) []op {
167167
OpDBDownload: g.dbDownload,
168168
OpDBFlush: g.dbFlush,
169169
OpDBRatchetFormatMajorVersion: g.dbRatchetFormatMajorVersion,
170-
OpDBRestart: g.dbRestart,
170+
OpDBRestart: g.dbRestart(false /* shouldCrashDuringOpen */),
171+
OpDBCrashDuringOpen: g.dbRestart(true /* shouldCrashDuringOpen */),
171172
OpDBEstimateDiskUsage: g.dbEstimateDiskUsage,
172173
OpIterClose: g.randIter(g.iterClose),
173174
OpIterFirst: g.randIter(g.iterFirst),
@@ -465,27 +466,33 @@ func (g *generator) dbRatchetFormatMajorVersion() {
465466
g.add(&dbRatchetFormatMajorVersionOp{dbID: dbID, vers: vers})
466467
}
467468

468-
func (g *generator) dbRestart() {
469-
// Close any live iterators and snapshots, so that we can close the DB
470-
// cleanly.
471-
dbID := g.dbs.rand(g.rng)
472-
for len(g.liveIters) > 0 {
473-
g.randIter(g.iterClose)()
474-
}
475-
for len(g.liveSnapshots) > 0 {
476-
g.snapshotClose()
477-
}
478-
// Close the batches.
479-
for len(g.liveBatches) > 0 {
480-
batchID := g.liveBatches[0]
481-
g.removeBatchFromGenerator(batchID)
482-
g.add(&closeOp{objID: batchID})
483-
}
484-
if len(g.liveReaders) != len(g.dbs) || len(g.liveWriters) != len(g.dbs) {
485-
panic(fmt.Sprintf("unexpected counts: liveReaders %d, liveWriters: %d",
486-
len(g.liveReaders), len(g.liveWriters)))
469+
func (g *generator) dbRestart(shouldCrashDuringOpen bool) func() {
470+
return func() {
471+
// Close any live iterators and snapshots, so that we can close the DB
472+
// cleanly.
473+
dbID := g.dbs.rand(g.rng)
474+
for len(g.liveIters) > 0 {
475+
g.randIter(g.iterClose)()
476+
}
477+
for len(g.liveSnapshots) > 0 {
478+
g.snapshotClose()
479+
}
480+
// Close the batches.
481+
for len(g.liveBatches) > 0 {
482+
batchID := g.liveBatches[0]
483+
g.removeBatchFromGenerator(batchID)
484+
g.add(&closeOp{objID: batchID})
485+
}
486+
if len(g.liveReaders) != len(g.dbs) || len(g.liveWriters) != len(g.dbs) {
487+
panic(fmt.Sprintf("unexpected counts: liveReaders %d, liveWriters: %d",
488+
len(g.liveReaders), len(g.liveWriters)))
489+
}
490+
if shouldCrashDuringOpen {
491+
g.add(&dbUncleanRestartOp{dbID: dbID})
492+
} else {
493+
g.add(&dbRestartOp{dbID: dbID})
494+
}
487495
}
488-
g.add(&dbRestartOp{dbID: dbID})
489496
}
490497

491498
// maybeSetSnapshotIterBounds must be called whenever creating a new iterator or

metamorphic/key_manager.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -883,6 +883,7 @@ func opWrittenKeys(untypedOp op) [][]byte {
883883
case *closeOp:
884884
case *compactOp:
885885
case *dbRestartOp:
886+
case *dbUncleanRestartOp:
886887
case *deleteOp:
887888
return [][]byte{t.key}
888889
case *deleteRangeOp:

metamorphic/ops.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1965,7 +1965,7 @@ type dbRestartOp struct {
19651965
}
19661966

19671967
func (o *dbRestartOp) run(t *Test, h historyRecorder) {
1968-
if err := t.restartDB(o.dbID); err != nil {
1968+
if err := t.restartDB(o.dbID, false /* shouldCrashDuringOpen */); err != nil {
19691969
h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
19701970
h.history.err.Store(errors.Wrap(err, "dbRestartOp"))
19711971
} else {
@@ -1980,6 +1980,37 @@ func (o *dbRestartOp) syncObjs() objIDSlice { return o.affectedObjec
19801980
func (o *dbRestartOp) rewriteKeys(func(UserKey) UserKey) {}
19811981
func (o *dbRestartOp) diagramKeyRanges() []pebble.KeyRange { return nil }
19821982

1983+
// dbUncleanRestartOp performs an unclean restart like dbRestartOp, but also
1984+
// starts a concurrent goroutine that calls CrashClone during the Open and uses
1985+
// that clone to do a second Open. This tests crashing during Open with
1986+
// concurrent operations.
1987+
type dbUncleanRestartOp struct {
1988+
dbID objID
1989+
1990+
// affectedObjects is the list of additional objects that are affected by this
1991+
// operation, and which syncObjs() must return so that we don't perform the
1992+
// restart in parallel with other operations to affected objects.
1993+
affectedObjects []objID
1994+
}
1995+
1996+
func (o *dbUncleanRestartOp) run(t *Test, h historyRecorder) {
1997+
if err := t.restartDB(o.dbID, true /* shouldCrashDuringOpen */); err != nil {
1998+
h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
1999+
h.history.err.Store(errors.Wrap(err, "dbCrashDuringOpenOp"))
2000+
} else {
2001+
h.Recordf("%s", o.formattedString(t.testOpts.KeyFormat))
2002+
}
2003+
}
2004+
2005+
func (o *dbUncleanRestartOp) formattedString(KeyFormat) string {
2006+
return fmt.Sprintf("%s.RestartWithCrashClone()", o.dbID)
2007+
}
2008+
func (o *dbUncleanRestartOp) receiver() objID { return o.dbID }
2009+
func (o *dbUncleanRestartOp) syncObjs() objIDSlice { return o.affectedObjects }
2010+
2011+
func (o *dbUncleanRestartOp) rewriteKeys(func(UserKey) UserKey) {}
2012+
func (o *dbUncleanRestartOp) diagramKeyRanges() []pebble.KeyRange { return nil }
2013+
19832014
func formatOps(kf KeyFormat, ops []op) string {
19842015
var buf strings.Builder
19852016
for _, op := range ops {

metamorphic/parser.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ func opArgs(op op) (receiverID *objID, targetID *objID, args []interface{}) {
6262
return &t.dbID, nil, []interface{}{&t.vers}
6363
case *dbRestartOp:
6464
return &t.dbID, nil, nil
65+
case *dbUncleanRestartOp:
66+
return &t.dbID, nil, nil
6567
case *deleteOp:
6668
return &t.writerID, nil, []interface{}{&t.key}
6769
case *deleteRangeOp:
@@ -177,6 +179,7 @@ var methods = map[string]*methodInfo{
177179
"RatchetFormatMajorVersion": makeMethod(dbRatchetFormatMajorVersionOp{}, dbTag),
178180
"Replicate": makeMethod(replicateOp{}, dbTag),
179181
"Restart": makeMethod(dbRestartOp{}, dbTag),
182+
"RestartWithCrashClone": makeMethod(dbUncleanRestartOp{}, dbTag),
180183
"SeekGE": makeMethod(iterSeekGEOp{}, iterTag),
181184
"SeekLT": makeMethod(iterSeekLTOp{}, iterTag),
182185
"SeekPrefixGE": makeMethod(iterSeekPrefixGEOp{}, iterTag),
@@ -749,6 +752,16 @@ func computeDerivedFields(ops []op) {
749752
}
750753
// Sort so the output is deterministic.
751754
slices.Sort(v.affectedObjects)
755+
case *dbUncleanRestartOp:
756+
// Find all objects that use this db.
757+
v.affectedObjects = nil
758+
for obj, db := range objToDB {
759+
if db == v.dbID {
760+
v.affectedObjects = append(v.affectedObjects, obj)
761+
}
762+
}
763+
// Sort so the output is deterministic.
764+
slices.Sort(v.affectedObjects)
752765
case *ingestOp:
753766
v.derivedDBIDs = make([]objID, len(v.batchIDs))
754767
for i := range v.batchIDs {

metamorphic/test.go

Lines changed: 74 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package metamorphic
77
import (
88
"fmt"
99
"io"
10+
"math/rand/v2"
1011
"os"
1112
"path"
1213
"runtime/debug"
@@ -321,10 +322,11 @@ func (t *Test) minFMV() pebble.FormatMajorVersion {
321322
return minVersion
322323
}
323324

324-
func (t *Test) restartDB(dbID objID) error {
325+
func (t *Test) restartDB(dbID objID, shouldCrashDuringOpen bool) error {
325326
db := t.getDB(dbID)
326-
// If strictFS is not used, we use pebble.NoSync for writeOpts, so we can't
327-
// restart the database (even if we don't revert to synced data).
327+
// If strictFS is not used, no-op since we end up using pebble.NoSync for
328+
// writeOpts. In the case of pebble.NoSync, we can't restart the database
329+
// even if we don't revert to synced data.
328330
if !t.testOpts.strictFS {
329331
return nil
330332
}
@@ -348,6 +350,19 @@ func (t *Test) restartDB(dbID objID) error {
348350
}
349351
}
350352
t.opts.FS = crashFS
353+
var slowFS *errorfs.FS
354+
// If we should crash during Open, inject some latency into the filesystem
355+
// so that the first Open is slow enough for us to capture some arbitrary
356+
// intermediate state.
357+
if shouldCrashDuringOpen {
358+
seed := time.Now().UnixNano()
359+
t.opts.Logger.Infof("seed %d", seed)
360+
mean := time.Duration(rand.IntN(20) + 10*int(time.Millisecond))
361+
t.opts.Logger.Infof("Injecting mean %s of latency with p=%.3f", mean, 1.0)
362+
slowFS = errorfs.Wrap(crashFS,
363+
errorfs.RandomLatency(errorfs.Randomly(1.0, seed), mean, seed, time.Second))
364+
t.opts.FS = slowFS
365+
}
351366
t.opts.WithFSDefaults()
352367
// We want to set the new FS in testOpts too, so they are propagated to the
353368
// TestOptions that were used with metamorphic.New().
@@ -357,6 +372,9 @@ func (t *Test) restartDB(dbID objID) error {
357372
t.testOpts.Opts.WALFailover.Secondary.FS = t.opts.FS
358373
}
359374

375+
secondOpenDone := make(chan struct{})
376+
firstOpenDone := make(chan struct{})
377+
errChan := make(chan error)
360378
// TODO(jackson): Audit errorRate and ensure custom options' hooks semantics
361379
// are well defined within the context of retries.
362380
err := t.withRetries(func() (err error) {
@@ -373,15 +391,66 @@ func (t *Test) restartDB(dbID objID) error {
373391
dir = path.Join(dir, fmt.Sprintf("db%d", dbID.slot()))
374392
}
375393
o := t.finalizeOptions()
394+
if shouldCrashDuringOpen {
395+
go t.simulateCrashDuringOpen(dbID, slowFS, secondOpenDone, firstOpenDone, errChan)
396+
}
376397
t.dbs[dbID.slot()-1], err = pebble.Open(dir, &o)
377-
if err != nil {
378-
return err
398+
if shouldCrashDuringOpen {
399+
firstOpenDone <- struct{}{}
379400
}
380401
return err
381402
})
403+
if shouldCrashDuringOpen {
404+
<-secondOpenDone
405+
select {
406+
case err = <-errChan:
407+
if err != nil {
408+
return err
409+
}
410+
default:
411+
}
412+
}
382413
return err
383414
}
384415

416+
func (t *Test) simulateCrashDuringOpen(
417+
dbID objID, slowFS *errorfs.FS, secondOpenDone, firstOpenDone chan struct{}, errChan chan error,
418+
) {
419+
defer func() { secondOpenDone <- struct{}{} }()
420+
421+
// Wait a bit for the first Open to make some progress.
422+
time.Sleep(30 * time.Millisecond)
423+
424+
// Create a crash clone of the current filesystem state.
425+
dir := t.dir
426+
if len(t.dbs) > 1 {
427+
dir = path.Join(dir, fmt.Sprintf("db%d", dbID.slot()))
428+
}
429+
crashCloneFS, err := slowFS.CrashClone(vfs.CrashCloneCfg{UnsyncedDataPercent: 0})
430+
if err != nil {
431+
errChan <- err
432+
return
433+
}
434+
t.opts.FS = crashCloneFS
435+
436+
// Create a copy of options for the second DB.
437+
o := t.finalizeOptions()
438+
439+
// After the first Open has completed, close the resulting DB and open the
440+
// second DB.
441+
<-firstOpenDone
442+
err = t.dbs[dbID.slot()-1].Close()
443+
if err != nil {
444+
errChan <- err
445+
return
446+
}
447+
t.dbs[dbID.slot()-1], err = pebble.Open(dir, &o)
448+
if err != nil {
449+
errChan <- err
450+
return
451+
}
452+
}
453+
385454
func (t *Test) saveInMemoryDataInternal() error {
386455
if rootFS := vfs.Root(t.opts.FS); rootFS != vfs.Default {
387456
// t.opts.FS is an in-memory system; copy it to disk.

vfs/errorfs/errorfs.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,15 @@ func (fs *FS) Stat(name string) (vfs.FileInfo, error) {
457457
return fs.fs.Stat(name)
458458
}
459459

460+
// CrashClone implements MemFS.CrashClone.
461+
func (fs *FS) CrashClone(cfg vfs.CrashCloneCfg) (*vfs.MemFS, error) {
462+
memFs, ok := fs.fs.(*vfs.MemFS)
463+
if !ok {
464+
return nil, errors.New("not a MemFS")
465+
}
466+
return memFs.CrashClone(cfg), nil
467+
}
468+
460469
// errorFile implements vfs.File. The interface is implemented on the pointer
461470
// type to allow pointer equality comparisons.
462471
type errorFile struct {

0 commit comments

Comments
 (0)