Skip to content

Commit 392f102

Browse files
committed
db: factor out recovery logic
Factor out logic to recover the current database state from Open. Over time Open has grown involved, interleaving initialization of pebble.DB state and recovery of the database. Teasing apart these functions help improve readability and will help in refactoring to clean up obsolete files before creating new ones during Open. Future work will pull loading of the version set into recoverState as well.
1 parent 26c861d commit 392f102

File tree

4 files changed

+146
-93
lines changed

4 files changed

+146
-93
lines changed

format_major_version.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -417,13 +417,13 @@ func lookupFormatMajorVersion(
417417
}
418418
vers := FormatMajorVersion(v)
419419
if vers == FormatDefault {
420-
return 0, nil, errors.Newf("pebble: default format major version should not persisted", vers)
420+
return 0, nil, errors.Newf("default format major version should not persisted", vers)
421421
}
422422
if vers > internalFormatNewest {
423-
return 0, nil, errors.Newf("pebble: database %q written in unknown format major version %d", dirname, vers)
423+
return 0, nil, errors.Newf("written in unknown format major version %d", vers)
424424
}
425425
if vers < FormatMinSupported {
426-
return 0, nil, errors.Newf("pebble: database %q written in format major version %d which is no longer supported", dirname, vers)
426+
return 0, nil, errors.Newf("format major version %d which is no longer supported", vers)
427427
}
428428
return vers, m, nil
429429
}

format_major_version_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func TestRatchetFormat(t *testing.T) {
9292
opts.WithFSDefaults()
9393
_, err = Open("", opts)
9494
require.Error(t, err)
95-
require.EqualError(t, err, `pebble: database "" written in unknown format major version 999999`)
95+
require.EqualError(t, err, `pebble: database "": written in unknown format major version 999999`)
9696
}
9797

9898
func testBasicDB(d *DB) error {

open.go

Lines changed: 141 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/cockroachdb/pebble/objstorage/remote"
3434
"github.com/cockroachdb/pebble/record"
3535
"github.com/cockroachdb/pebble/vfs"
36+
"github.com/cockroachdb/pebble/vfs/atomicfs"
3637
"github.com/cockroachdb/pebble/wal"
3738
"github.com/prometheus/client_golang/prometheus"
3839
)
@@ -117,22 +118,14 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
117118
}
118119
defer maybeCleanUp(fileLock.Close)
119120

120-
// List the directory contents. This also happens to include WAL log files, if
121-
// they are in the same dir, but we will ignore those below. The provider is
122-
// also given this list, but it ignores non sstable files.
123-
ls, err := opts.FS.List(dirname)
121+
rs, err := recoverState(opts, dirname)
124122
if err != nil {
125123
return nil, err
126124
}
125+
defer maybeCleanUp(rs.Close)
127126

128-
// Establish the format major version.
129-
formatVersion, formatVersionMarker, err := lookupFormatMajorVersion(opts.FS, dirname, ls)
130-
if err != nil {
131-
return nil, err
132-
}
133-
defer maybeCleanUp(formatVersionMarker.Close)
134-
135-
noFormatVersionMarker := formatVersion == FormatDefault
127+
formatVersion := rs.fmv
128+
noFormatVersionMarker := rs.fmv == FormatDefault
136129
if noFormatVersionMarker {
137130
// We will initialize the store at the minimum possible format, then upgrade
138131
// the format to the desired one. This helps test the format upgrade code.
@@ -159,20 +152,8 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
159152
}()
160153
}
161154

162-
// Find the currently active manifest, if there is one.
163-
manifestMarker, manifestFileNum, manifestExists, err := findCurrentManifest(opts.FS, dirname, ls)
164-
if err != nil {
165-
return nil, errors.Wrapf(err, "pebble: database %q", dirname)
166-
}
167-
defer maybeCleanUp(manifestMarker.Close)
168-
169-
// Atomic markers may leave behind obsolete files if there's a crash
170-
// mid-update. Clean these up if we're not in read-only mode.
171155
if !opts.ReadOnly {
172-
if err := formatVersionMarker.RemoveObsolete(); err != nil {
173-
return nil, err
174-
}
175-
if err := manifestMarker.RemoveObsolete(); err != nil {
156+
if err := rs.RemoveObsolete(); err != nil {
176157
return nil, err
177158
}
178159
}
@@ -194,6 +175,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
194175
largeBatchThreshold: (opts.MemTableSize - uint64(memTableEmptySize)) / 2,
195176
dataDirLock: fileLock,
196177
dataDir: dataDir,
178+
objProvider: rs.objProvider,
197179
closed: new(atomic.Value),
198180
closedCh: make(chan struct{}),
199181
}
@@ -236,9 +218,6 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
236218
if d.cleanupManager != nil {
237219
d.cleanupManager.Close()
238220
}
239-
if d.objProvider != nil {
240-
_ = d.objProvider.Close()
241-
}
242221
if d.mu.versions.manifestFile != nil {
243222
_ = d.mu.versions.manifestFile.Close()
244223
}
@@ -259,10 +238,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
259238
write: d.commitWrite,
260239
})
261240
d.mu.nextJobID = 1
262-
d.mu.mem.nextSize = opts.MemTableSize
263-
if d.mu.mem.nextSize > initialMemTableSize {
264-
d.mu.mem.nextSize = initialMemTableSize
265-
}
241+
d.mu.mem.nextSize = min(opts.MemTableSize, initialMemTableSize)
266242
d.mu.compact.cond.L = &d.mu.Mutex
267243
d.mu.compact.inProgress = make(map[compaction]struct{})
268244
d.mu.compact.noOngoingFlushStartTime = crtime.NowMono()
@@ -273,7 +249,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
273249
// SeqNumStart).
274250
d.mu.versions.logSeqNum.Store(base.SeqNumStart)
275251
d.mu.formatVers.vers.Store(uint64(formatVersion))
276-
d.mu.formatVers.marker = formatVersionMarker
252+
d.mu.formatVers.marker = rs.fmvMarker
277253

278254
d.timeNow = time.Now
279255
d.openedAt = d.timeNow()
@@ -283,27 +259,20 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
283259

284260
jobID := d.newJobIDLocked()
285261

286-
providerSettings := opts.MakeObjStorageProviderSettings(dirname)
287-
providerSettings.FSDirInitialListing = ls
288-
d.objProvider, err = objstorageprovider.Open(providerSettings)
289-
if err != nil {
290-
return nil, err
291-
}
292-
293262
blobRewriteHeuristic := manifest.BlobRewriteHeuristic{
294263
CurrentTime: d.timeNow,
295264
MinimumAge: opts.Experimental.ValueSeparationPolicy().RewriteMinimumAge,
296265
}
297266

298-
if !manifestExists {
267+
if !rs.manifestExists {
299268
// DB does not exist.
300269
if d.opts.ErrorIfNotExists || d.opts.ReadOnly {
301270
return nil, errors.Wrapf(ErrDBDoesNotExist, "dirname=%q", dirname)
302271
}
303272

304273
// Create the DB.
305274
if err := d.mu.versions.create(
306-
jobID, dirname, d.objProvider, opts, manifestMarker, d.FormatMajorVersion, blobRewriteHeuristic, &d.mu.Mutex); err != nil {
275+
jobID, dirname, d.objProvider, opts, rs.manifestMarker, d.FormatMajorVersion, blobRewriteHeuristic, &d.mu.Mutex); err != nil {
307276
return nil, err
308277
}
309278
} else {
@@ -312,7 +281,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
312281
}
313282
// Load the version set.
314283
if err := d.mu.versions.load(
315-
dirname, d.objProvider, opts, manifestFileNum, manifestMarker, d.FormatMajorVersion, blobRewriteHeuristic, &d.mu.Mutex); err != nil {
284+
dirname, d.objProvider, opts, rs.manifestFileNum, rs.manifestMarker, d.FormatMajorVersion, blobRewriteHeuristic, &d.mu.Mutex); err != nil {
316285
return nil, err
317286
}
318287
if opts.ErrorIfNotPristine {
@@ -464,7 +433,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
464433

465434
d.cleanupManager = openCleanupManager(opts, d.objProvider, d.getDeletionPacerInfo)
466435

467-
if manifestExists && !opts.DisableConsistencyCheck {
436+
if rs.manifestExists && !opts.DisableConsistencyCheck {
468437
curVersion := d.mu.versions.currentVersion()
469438
if err := checkConsistency(curVersion, d.objProvider); err != nil {
470439
return nil, err
@@ -482,39 +451,7 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
482451

483452
d.fileSizeAnnotator = d.makeFileSizeAnnotator()
484453

485-
var previousOptionsFileNum base.DiskFileNum
486-
var previousOptionsFilename string
487-
for _, filename := range ls {
488-
ft, fn, ok := base.ParseFilename(opts.FS, filename)
489-
if !ok {
490-
continue
491-
}
492-
493-
// Don't reuse any obsolete file numbers to avoid modifying an
494-
// ingested sstable's original external file.
495-
d.mu.versions.markFileNumUsed(fn)
496-
497-
switch ft {
498-
case base.FileTypeLog:
499-
// Ignore.
500-
case base.FileTypeOptions:
501-
if previousOptionsFileNum < fn {
502-
previousOptionsFileNum = fn
503-
previousOptionsFilename = filename
504-
}
505-
case base.FileTypeTemp, base.FileTypeOldTemp:
506-
if !d.opts.ReadOnly {
507-
// Some codepaths write to a temporary file and then
508-
// rename it to its final location when complete. A
509-
// temp file is leftover if a process exits before the
510-
// rename. Remove it.
511-
err := opts.FS.Remove(opts.FS.PathJoin(dirname, filename))
512-
if err != nil {
513-
return nil, err
514-
}
515-
}
516-
}
517-
}
454+
d.mu.versions.markFileNumUsed(rs.maxFilenumUsed)
518455
if n := len(wals); n > 0 {
519456
// Don't reuse any obsolete file numbers to avoid modifying an
520457
// ingested sstable's original external file.
@@ -529,8 +466,8 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
529466
}
530467

531468
// Validate the most-recent OPTIONS file, if there is one.
532-
if previousOptionsFilename != "" {
533-
path := opts.FS.PathJoin(dirname, previousOptionsFilename)
469+
if rs.previousOptionsFilename != "" {
470+
path := opts.FS.PathJoin(dirname, rs.previousOptionsFilename)
534471
previousOptions, err := readOptionsFile(opts, path)
535472
if err != nil {
536473
return nil, err
@@ -541,23 +478,19 @@ func Open(dirname string, opts *Options) (db *DB, err error) {
541478
}
542479

543480
// Replay any newer log files than the ones named in the manifest.
544-
var replayWALs wal.Logs
481+
var flushableIngests []*ingestedFlushable
545482
for i, w := range wals {
546-
if base.DiskFileNum(w.Num) >= d.mu.versions.minUnflushedLogNum {
547-
replayWALs = wals[i:]
548-
break
483+
if base.DiskFileNum(w.Num) < d.mu.versions.minUnflushedLogNum {
484+
continue
549485
}
550-
}
551-
var flushableIngests []*ingestedFlushable
552-
for i, lf := range replayWALs {
553486
// WALs other than the last one would have been closed cleanly.
554487
//
555488
// Note: we used to never require strict WAL tails when reading from older
556489
// versions: RocksDB 6.2.1 and the version of Pebble included in CockroachDB
557490
// 20.1 do not guarantee that closed WALs end cleanly. But the earliest
558491
// compatible Pebble format is newer and guarantees a clean EOF.
559-
strictWALTail := i < len(replayWALs)-1
560-
fi, maxSeqNum, err := d.replayWAL(jobID, lf, strictWALTail)
492+
strictWALTail := i < len(wals)-1
493+
fi, maxSeqNum, err := d.replayWAL(jobID, w, strictWALTail)
561494
if err != nil {
562495
return nil, err
563496
}
@@ -1288,3 +1221,123 @@ func (l walEventListenerAdaptor) LogCreated(ci wal.CreateInfo) {
12881221
}
12891222
l.l.WALCreated(wci)
12901223
}
1224+
1225+
// recoverState reads the named database directory and recovers the set of files
1226+
// encoding the database state at the moment the previous process exited.
1227+
// recoverState is read only and does not mutate the on-disk state.
1228+
func recoverState(opts *Options, dirname string) (s *recoveredState, err error) {
1229+
rs := &recoveredState{
1230+
dirname: dirname,
1231+
fs: opts.FS,
1232+
}
1233+
if err := rs.init(opts, dirname); err != nil {
1234+
return nil, errors.CombineErrors(err, rs.Close())
1235+
}
1236+
return rs, nil
1237+
}
1238+
1239+
func (rs *recoveredState) init(opts *Options, dirname string) error {
1240+
// List the directory contents. This also happens to include WAL log files,
1241+
// if they are in the same dir.
1242+
var err error
1243+
if rs.ls, err = opts.FS.List(dirname); err != nil {
1244+
return errors.Wrapf(err, "pebble: database %q", dirname)
1245+
}
1246+
// Find the currently format major version and active manifest.
1247+
rs.fmv, rs.fmvMarker, err = lookupFormatMajorVersion(opts.FS, dirname, rs.ls)
1248+
if err != nil {
1249+
return errors.Wrapf(err, "pebble: database %q", dirname)
1250+
}
1251+
rs.manifestMarker, rs.manifestFileNum, rs.manifestExists, err = findCurrentManifest(opts.FS, dirname, rs.ls)
1252+
if err != nil {
1253+
return errors.Wrapf(err, "pebble: database %q", dirname)
1254+
}
1255+
// Open the object storage provider.
1256+
providerSettings := opts.MakeObjStorageProviderSettings(dirname)
1257+
providerSettings.FSDirInitialListing = rs.ls
1258+
rs.objProvider, err = objstorageprovider.Open(providerSettings)
1259+
if err != nil {
1260+
return errors.Wrapf(err, "pebble: database %q", dirname)
1261+
}
1262+
1263+
// Identify the maximal file number in the directory. We do not want to
1264+
// reuse any existing file numbers even if they are obsolete file numbers to
1265+
// avoid modifying an ingested sstable's original external file.
1266+
//
1267+
// We also identify the most recent OPTIONS file, so we can validate our
1268+
// configured Options against the previous options, and we collect any
1269+
// orphaned temporary files that should be removed.
1270+
var previousOptionsFileNum base.DiskFileNum
1271+
for _, filename := range rs.ls {
1272+
ft, fn, ok := base.ParseFilename(opts.FS, filename)
1273+
if !ok {
1274+
continue
1275+
}
1276+
rs.maxFilenumUsed = max(rs.maxFilenumUsed, fn)
1277+
switch ft {
1278+
case base.FileTypeLog:
1279+
// Ignore.
1280+
case base.FileTypeOptions:
1281+
if previousOptionsFileNum < fn {
1282+
previousOptionsFileNum = fn
1283+
rs.previousOptionsFilename = filename
1284+
}
1285+
case base.FileTypeTemp, base.FileTypeOldTemp:
1286+
rs.obsoleteTempFilenames = append(rs.obsoleteTempFilenames, filename)
1287+
}
1288+
}
1289+
return nil
1290+
}
1291+
1292+
// recoveredState encapsulates state recovered from reading the database
1293+
// directory.
1294+
type recoveredState struct {
1295+
dirname string
1296+
fmv FormatMajorVersion
1297+
fmvMarker *atomicfs.Marker
1298+
fs vfs.FS
1299+
ls []string
1300+
manifestMarker *atomicfs.Marker
1301+
manifestFileNum base.DiskFileNum
1302+
manifestExists bool
1303+
maxFilenumUsed base.DiskFileNum
1304+
obsoleteTempFilenames []string
1305+
objProvider objstorage.Provider
1306+
previousOptionsFilename string
1307+
}
1308+
1309+
// RemoveObsolete removes obsolete files uncovered during recovery.
1310+
func (rs *recoveredState) RemoveObsolete() error {
1311+
var err error
1312+
// Atomic markers may leave behind obsolete files if there's a crash
1313+
// mid-update.
1314+
if rs.fmvMarker != nil {
1315+
err = errors.CombineErrors(err, rs.fmvMarker.RemoveObsolete())
1316+
}
1317+
if rs.manifestMarker != nil {
1318+
err = errors.CombineErrors(err, rs.manifestMarker.RemoveObsolete())
1319+
}
1320+
// Some codepaths write to a temporary file and then rename it to its final
1321+
// location when complete. A temp file is leftover if a process exits
1322+
// before the rename. Remove any that were found.
1323+
for _, filename := range rs.obsoleteTempFilenames {
1324+
err = errors.CombineErrors(err, rs.fs.Remove(rs.fs.PathJoin(rs.dirname, filename)))
1325+
}
1326+
return err
1327+
}
1328+
1329+
// Close closes resources held by the RecoveredState, including open file
1330+
// descriptors.
1331+
func (rs *recoveredState) Close() error {
1332+
var err error
1333+
if rs.fmvMarker != nil {
1334+
err = errors.CombineErrors(err, rs.fmvMarker.Close())
1335+
}
1336+
if rs.manifestMarker != nil {
1337+
err = errors.CombineErrors(err, rs.manifestMarker.Close())
1338+
}
1339+
if rs.objProvider != nil {
1340+
err = errors.CombineErrors(err, rs.objProvider.Close())
1341+
}
1342+
return err
1343+
}

open_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1492,7 +1492,7 @@ func TestOpen_ErrorIfUnknownFormatVersion(t *testing.T) {
14921492
FormatMajorVersion: FormatMinSupported,
14931493
})
14941494
require.Error(t, err)
1495-
require.EqualError(t, err, `pebble: database "" written in unknown format major version 999999`)
1495+
require.EqualError(t, err, `pebble: database "": written in unknown format major version 999999`)
14961496
}
14971497

14981498
// ensureFilesClosed updates the provided Options to wrap the filesystem. It

0 commit comments

Comments
 (0)