diff --git a/metamorphic/config.go b/metamorphic/config.go index cfb37f7bec..43617b2082 100644 --- a/metamorphic/config.go +++ b/metamorphic/config.go @@ -28,6 +28,7 @@ const ( OpDBFlush OpDBRatchetFormatMajorVersion OpDBRestart + OpDBCrashDuringOpen OpDBEstimateDiskUsage OpIterClose OpIterFirst @@ -159,6 +160,7 @@ func DefaultOpConfig() OpConfig { OpDBFlush: 2, OpDBRatchetFormatMajorVersion: 1, OpDBRestart: 2, + OpDBCrashDuringOpen: 1, OpDBEstimateDiskUsage: 1, OpIterClose: 5, OpIterFirst: 100, @@ -221,6 +223,7 @@ func ReadOpConfig() OpConfig { OpDBFlush: 0, OpDBRatchetFormatMajorVersion: 0, OpDBRestart: 0, + OpDBCrashDuringOpen: 0, OpDBEstimateDiskUsage: 0, OpIterClose: 5, OpIterFirst: 100, @@ -280,6 +283,7 @@ func WriteOpConfig() OpConfig { OpDBFlush: 2, OpDBRatchetFormatMajorVersion: 1, OpDBRestart: 2, + OpDBCrashDuringOpen: 1, OpDBEstimateDiskUsage: 1, OpIterClose: 0, OpIterFirst: 0, diff --git a/metamorphic/generator.go b/metamorphic/generator.go index 4f50e37f87..b6335cdb92 100644 --- a/metamorphic/generator.go +++ b/metamorphic/generator.go @@ -167,7 +167,8 @@ func (g *generator) generate(count uint64) []op { OpDBDownload: g.dbDownload, OpDBFlush: g.dbFlush, OpDBRatchetFormatMajorVersion: g.dbRatchetFormatMajorVersion, - OpDBRestart: g.dbRestart, + OpDBRestart: g.dbRestart(false /* shouldCrashDuringOpen */), + OpDBCrashDuringOpen: g.dbRestart(true /* shouldCrashDuringOpen */), OpDBEstimateDiskUsage: g.dbEstimateDiskUsage, OpIterClose: g.randIter(g.iterClose), OpIterFirst: g.randIter(g.iterFirst), @@ -465,27 +466,33 @@ func (g *generator) dbRatchetFormatMajorVersion() { g.add(&dbRatchetFormatMajorVersionOp{dbID: dbID, vers: vers}) } -func (g *generator) dbRestart() { - // Close any live iterators and snapshots, so that we can close the DB - // cleanly. - dbID := g.dbs.rand(g.rng) - for len(g.liveIters) > 0 { - g.randIter(g.iterClose)() - } - for len(g.liveSnapshots) > 0 { - g.snapshotClose() - } - // Close the batches. - for len(g.liveBatches) > 0 { - batchID := g.liveBatches[0] - g.removeBatchFromGenerator(batchID) - g.add(&closeOp{objID: batchID}) - } - if len(g.liveReaders) != len(g.dbs) || len(g.liveWriters) != len(g.dbs) { - panic(fmt.Sprintf("unexpected counts: liveReaders %d, liveWriters: %d", - len(g.liveReaders), len(g.liveWriters))) +func (g *generator) dbRestart(shouldCrashDuringOpen bool) func() { + return func() { + // Close any live iterators and snapshots, so that we can close the DB + // cleanly. + dbID := g.dbs.rand(g.rng) + for len(g.liveIters) > 0 { + g.randIter(g.iterClose)() + } + for len(g.liveSnapshots) > 0 { + g.snapshotClose() + } + // Close the batches. + for len(g.liveBatches) > 0 { + batchID := g.liveBatches[0] + g.removeBatchFromGenerator(batchID) + g.add(&closeOp{objID: batchID}) + } + if len(g.liveReaders) != len(g.dbs) || len(g.liveWriters) != len(g.dbs) { + panic(fmt.Sprintf("unexpected counts: liveReaders %d, liveWriters: %d", + len(g.liveReaders), len(g.liveWriters))) + } + if shouldCrashDuringOpen { + g.add(&dbUncleanRestartOp{dbID: dbID}) + } else { + g.add(&dbRestartOp{dbID: dbID}) + } } - g.add(&dbRestartOp{dbID: dbID}) } // maybeSetSnapshotIterBounds must be called whenever creating a new iterator or diff --git a/metamorphic/key_manager.go b/metamorphic/key_manager.go index 0cc754dda8..2a4ae99b70 100644 --- a/metamorphic/key_manager.go +++ b/metamorphic/key_manager.go @@ -883,6 +883,7 @@ func opWrittenKeys(untypedOp op) [][]byte { case *closeOp: case *compactOp: case *dbRestartOp: + case *dbUncleanRestartOp: case *deleteOp: return [][]byte{t.key} case *deleteRangeOp: diff --git a/metamorphic/meta.go b/metamorphic/meta.go index 414dab1554..6b8278f614 100644 --- a/metamorphic/meta.go +++ b/metamorphic/meta.go @@ -560,7 +560,7 @@ func RunOnce(t TestingT, runDir string, seed uint64, historyPath string, rOpts . // multi-instance mode. testOpts.Opts.WALFailover = nil } else { - testOpts.Opts.WALFailover.Secondary.FS = opts.FS + testOpts.Opts.WALFailover.Secondary.FS = vfs.NewCrashableMem() } } diff --git a/metamorphic/ops.go b/metamorphic/ops.go index a094092b07..183e1dac4b 100644 --- a/metamorphic/ops.go +++ b/metamorphic/ops.go @@ -1965,7 +1965,7 @@ type dbRestartOp struct { } func (o *dbRestartOp) run(t *Test, h historyRecorder) { - if err := t.restartDB(o.dbID); err != nil { + if err := t.restartDB(o.dbID, false /* shouldCrashDuringOpen */); err != nil { h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err) h.history.err.Store(errors.Wrap(err, "dbRestartOp")) } else { @@ -1980,6 +1980,37 @@ func (o *dbRestartOp) syncObjs() objIDSlice { return o.affectedObjec func (o *dbRestartOp) rewriteKeys(func(UserKey) UserKey) {} func (o *dbRestartOp) diagramKeyRanges() []pebble.KeyRange { return nil } +// dbUncleanRestartOp performs an unclean restart like dbRestartOp, but also +// starts a concurrent goroutine that calls CrashClone during the Open and uses +// that clone to do a second Open. This tests crashing during Open with +// concurrent operations. +type dbUncleanRestartOp struct { + dbID objID + + // affectedObjects is the list of additional objects that are affected by this + // operation, and which syncObjs() must return so that we don't perform the + // restart in parallel with other operations to affected objects. + affectedObjects []objID +} + +func (o *dbUncleanRestartOp) run(t *Test, h historyRecorder) { + if err := t.restartDB(o.dbID, true /* shouldCrashDuringOpen */); err != nil { + h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err) + h.history.err.Store(errors.Wrap(err, "dbCrashDuringOpenOp")) + } else { + h.Recordf("%s", o.formattedString(t.testOpts.KeyFormat)) + } +} + +func (o *dbUncleanRestartOp) formattedString(KeyFormat) string { + return fmt.Sprintf("%s.RestartWithCrashClone()", o.dbID) +} +func (o *dbUncleanRestartOp) receiver() objID { return o.dbID } +func (o *dbUncleanRestartOp) syncObjs() objIDSlice { return o.affectedObjects } + +func (o *dbUncleanRestartOp) rewriteKeys(func(UserKey) UserKey) {} +func (o *dbUncleanRestartOp) diagramKeyRanges() []pebble.KeyRange { return nil } + func formatOps(kf KeyFormat, ops []op) string { var buf strings.Builder for _, op := range ops { diff --git a/metamorphic/options.go b/metamorphic/options.go index 8af5c1f0b5..032f90376a 100644 --- a/metamorphic/options.go +++ b/metamorphic/options.go @@ -741,8 +741,9 @@ func RandomOptions(rng *rand.Rand, kf KeyFormat, cfg RandomOptionsCfg) *TestOpti // maintains a maximum history 120 entries, so the healthy interval // must not exceed 119x the probe interval. healthyInterval := scaleDuration(probeInterval, 1.0, 119.0) + newMem := vfs.NewCrashableMem() opts.WALFailover = &pebble.WALFailoverOptions{ - Secondary: wal.Dir{FS: vfs.Default, Dirname: pebble.MakeStoreRelativePath(vfs.Default, "wal_secondary")}, + Secondary: wal.Dir{FS: newMem, Dirname: pebble.MakeStoreRelativePath(newMem, "wal_secondary")}, FailoverOptions: wal.FailoverOptions{ PrimaryDirProbeInterval: probeInterval, HealthyProbeLatencyThreshold: healthyThreshold, diff --git a/metamorphic/parser.go b/metamorphic/parser.go index 549aeb8b11..53b058c6e7 100644 --- a/metamorphic/parser.go +++ b/metamorphic/parser.go @@ -62,6 +62,8 @@ func opArgs(op op) (receiverID *objID, targetID *objID, args []interface{}) { return &t.dbID, nil, []interface{}{&t.vers} case *dbRestartOp: return &t.dbID, nil, nil + case *dbUncleanRestartOp: + return &t.dbID, nil, nil case *deleteOp: return &t.writerID, nil, []interface{}{&t.key} case *deleteRangeOp: @@ -177,6 +179,7 @@ var methods = map[string]*methodInfo{ "RatchetFormatMajorVersion": makeMethod(dbRatchetFormatMajorVersionOp{}, dbTag), "Replicate": makeMethod(replicateOp{}, dbTag), "Restart": makeMethod(dbRestartOp{}, dbTag), + "RestartWithCrashClone": makeMethod(dbUncleanRestartOp{}, dbTag), "SeekGE": makeMethod(iterSeekGEOp{}, iterTag), "SeekLT": makeMethod(iterSeekLTOp{}, iterTag), "SeekPrefixGE": makeMethod(iterSeekPrefixGEOp{}, iterTag), @@ -749,6 +752,16 @@ func computeDerivedFields(ops []op) { } // Sort so the output is deterministic. slices.Sort(v.affectedObjects) + case *dbUncleanRestartOp: + // Find all objects that use this db. + v.affectedObjects = nil + for obj, db := range objToDB { + if db == v.dbID { + v.affectedObjects = append(v.affectedObjects, obj) + } + } + // Sort so the output is deterministic. + slices.Sort(v.affectedObjects) case *ingestOp: v.derivedDBIDs = make([]objID, len(v.batchIDs)) for i := range v.batchIDs { diff --git a/metamorphic/test.go b/metamorphic/test.go index 2a94c96d5e..5c4d4cb377 100644 --- a/metamorphic/test.go +++ b/metamorphic/test.go @@ -7,6 +7,7 @@ package metamorphic import ( "fmt" "io" + "math/rand/v2" "os" "path" "runtime/debug" @@ -321,10 +322,11 @@ func (t *Test) minFMV() pebble.FormatMajorVersion { return minVersion } -func (t *Test) restartDB(dbID objID) error { +func (t *Test) restartDB(dbID objID, shouldCrashDuringOpen bool) error { db := t.getDB(dbID) - // If strictFS is not used, we use pebble.NoSync for writeOpts, so we can't - // restart the database (even if we don't revert to synced data). + // If strictFS is not used, no-op since we end up using pebble.NoSync for + // writeOpts. In the case of pebble.NoSync, we can't restart the database + // even if we don't revert to synced data. if !t.testOpts.strictFS { return nil } @@ -348,15 +350,26 @@ func (t *Test) restartDB(dbID objID) error { } } t.opts.FS = crashFS + var slowFS *errorfs.FS + // If we should crash during Open, inject some latency into the filesystem + // so that the first Open is slow enough for us to capture some arbitrary + // intermediate state. + if shouldCrashDuringOpen { + seed := time.Now().UnixNano() + t.opts.Logger.Infof("seed %d", seed) + mean := time.Duration(rand.IntN(20) + 10*int(time.Millisecond)) + t.opts.Logger.Infof("Injecting mean %s of latency with p=%.3f", mean, 1.0) + slowFS = errorfs.Wrap(crashFS, + errorfs.RandomLatency(errorfs.Randomly(1.0, seed), mean, seed, time.Second)) + t.opts.FS = slowFS + } t.opts.WithFSDefaults() // We want to set the new FS in testOpts too, so they are propagated to the // TestOptions that were used with metamorphic.New(). t.testOpts.Opts.FS = t.opts.FS - if t.opts.WALFailover != nil { - t.opts.WALFailover.Secondary.FS = t.opts.FS - t.testOpts.Opts.WALFailover.Secondary.FS = t.opts.FS - } + firstOpenDone := make(chan struct{}) + secondOpenDone := make(chan struct{}) // TODO(jackson): Audit errorRate and ensure custom options' hooks semantics // are well defined within the context of retries. err := t.withRetries(func() (err error) { @@ -373,15 +386,90 @@ func (t *Test) restartDB(dbID objID) error { dir = path.Join(dir, fmt.Sprintf("db%d", dbID.slot())) } o := t.finalizeOptions() + if shouldCrashDuringOpen { + go func() { + err = t.simulateCrashDuringOpen(dbID, slowFS, secondOpenDone, firstOpenDone) + }() + if err != nil { + return err + } + } t.dbs[dbID.slot()-1], err = pebble.Open(dir, &o) - if err != nil { - return err + if shouldCrashDuringOpen { + firstOpenDone <- struct{}{} } return err }) + if shouldCrashDuringOpen { + <-secondOpenDone + } return err } +func (t *Test) simulateCrashDuringOpen( + dbID objID, slowFS *errorfs.FS, secondOpenDone, firstOpenDone chan struct{}, +) error { + defer func() { secondOpenDone <- struct{}{} }() + + // Wait a bit for the first Open to make some progress. + time.Sleep(30 * time.Millisecond) + + // Create a crash clone of the current filesystem state. + rng := rand.New(rand.NewPCG(0, uint64(time.Now().UnixNano()))) + crashCloneFS, err := slowFS.CrashClone(vfs.CrashCloneCfg{ + UnsyncedDataPercent: rng.IntN(101), + RNG: rng, + }) + if err != nil { + return err + } + + // After the first Open has completed, close the resulting DB and open the + // second DB. + <-firstOpenDone + err = t.dbs[dbID.slot()-1].Close() + if err != nil { + return err + } + // Release any resources held by custom options. This may be used, for + // example, by the encryption-at-rest custom option (within the Cockroach + // repository) to close the file registry. + for i := range t.testOpts.CustomOpts { + if err := t.testOpts.CustomOpts[i].Close(t.opts); err != nil { + return err + } + } + t.opts.FS = crashCloneFS + if t.opts.WALFailover != nil { + ccsmemFS := t.opts.WALFailover.Secondary.FS.(*vfs.MemFS) + crashCloneSecondaryFS := ccsmemFS.CrashClone(vfs.CrashCloneCfg{ + UnsyncedDataPercent: rng.IntN(101), + RNG: rng, + }) + t.testOpts.Opts.WALFailover.Secondary.FS = crashCloneSecondaryFS + t.opts.WALFailover.Secondary.FS = crashCloneSecondaryFS + } + // Reacquire any resources required by custom options. This may be used, for + // example, by the encryption-at-rest custom option (within the Cockroach + // repository) to reopen the file registry. + for i := range t.testOpts.CustomOpts { + if err := t.testOpts.CustomOpts[i].Open(t.opts); err != nil { + return err + } + } + // Create a copy of options for the second DB. + dir := t.dir + if len(t.dbs) > 1 { + dir = path.Join(dir, fmt.Sprintf("db%d", dbID.slot())) + } + o := t.finalizeOptions() + t.dbs[dbID.slot()-1], err = pebble.Open(dir, &o) + if err != nil { + return err + } + return nil +} + func (t *Test) saveInMemoryDataInternal() error { if rootFS := vfs.Root(t.opts.FS); rootFS != vfs.Default { // t.opts.FS is an in-memory system; copy it to disk. diff --git a/vfs/errorfs/errorfs.go b/vfs/errorfs/errorfs.go index 31862095e8..b4b1e0cc92 100644 --- a/vfs/errorfs/errorfs.go +++ b/vfs/errorfs/errorfs.go @@ -457,6 +457,15 @@ func (fs *FS) Stat(name string) (vfs.FileInfo, error) { return fs.fs.Stat(name) } +// CrashClone implements MemFS.CrashClone. +func (fs *FS) CrashClone(cfg vfs.CrashCloneCfg) (*vfs.MemFS, error) { + memFs, ok := fs.fs.(*vfs.MemFS) + if !ok { + return nil, errors.New("not a MemFS") + } + return memFs.CrashClone(cfg), nil +} + // errorFile implements vfs.File. The interface is implemented on the pointer // type to allow pointer equality comparisons. type errorFile struct {