Skip to content

Commit 825c04c

Browse files
committed
db: rework logAndApply
This commit replaces `logAndApply` with an `UpdateVersionLocked` function that takes a closure and performs the log "locking" around that closure. This makes it impossible for a caller to mess up the locking.
1 parent dbc7e3b commit 825c04c

File tree

11 files changed

+387
-381
lines changed

11 files changed

+387
-381
lines changed

compaction.go

Lines changed: 46 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1100,10 +1100,10 @@ func (d *DB) clearCompactingState(c *compaction, rollback bool) {
11001100
// It is a bit peculiar that we are fiddling with th current version state
11011101
// in a separate critical section from when this version was installed.
11021102
// But this fiddling is necessary if the compaction failed. When the
1103-
// compaction succeeded, we've already done this in logAndApply, so this
1104-
// seems redundant. Anyway, we clear the pickedCompactionCache since we
1103+
// compaction succeeded, we've already done this in UpdateVersionLocked, so
1104+
// this seems redundant. Anyway, we clear the pickedCompactionCache since we
11051105
// may be able to pick a better compaction (though when this compaction
1106-
// succeeded we've also cleared the cache in logAndApply).
1106+
// succeeded we've also cleared the cache in UpdateVersionLocked).
11071107
defer d.mu.versions.logUnlockAndInvalidatePickedCompactionCache()
11081108
d.mu.versions.l0Organizer.InitCompactingFileInfo(l0InProgress)
11091109
}()
@@ -1467,44 +1467,38 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
14671467
d.addInProgressCompaction(c)
14681468

14691469
jobID := d.newJobIDLocked()
1470-
d.opts.EventListener.FlushBegin(FlushInfo{
1470+
info := FlushInfo{
14711471
JobID: int(jobID),
14721472
Input: inputs,
14731473
InputBytes: inputBytes,
14741474
Ingest: ingest,
1475-
})
1475+
}
1476+
d.opts.EventListener.FlushBegin(info)
1477+
14761478
startTime := d.timeNow()
14771479

14781480
var ve *manifest.VersionEdit
14791481
var stats compact.Stats
14801482
// To determine the target level of the files in the ingestedFlushable, we
1481-
// need to acquire the logLock, and not release it for that duration. Since,
1482-
// we need to acquire the logLock below to perform the logAndApply step
1483-
// anyway, we create the VersionEdit for ingestedFlushable outside of
1484-
// runCompaction. For all other flush cases, we construct the VersionEdit
1485-
// inside runCompaction.
1483+
// need to acquire the logLock, and not release it for that duration. Since
1484+
// UpdateVersionLocked acquires it anyway, we create the VersionEdit for
1485+
// ingestedFlushable outside runCompaction. For all other flush cases, we
1486+
// construct the VersionEdit inside runCompaction.
1487+
var compactionErr error
14861488
if c.kind != compactionKindIngestedFlushable {
1487-
ve, stats, err = d.runCompaction(jobID, c)
1489+
ve, stats, compactionErr = d.runCompaction(jobID, c)
14881490
}
14891491

1490-
// Acquire logLock. This will be released either on an error, by way of
1491-
// logUnlock, or through a call to logAndApply if there is no error.
1492-
d.mu.versions.logLock()
1493-
1494-
if c.kind == compactionKindIngestedFlushable {
1495-
ve, err = d.runIngestFlush(c)
1496-
}
1492+
err = d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
1493+
err := compactionErr
1494+
if c.kind == compactionKindIngestedFlushable {
1495+
ve, err = d.runIngestFlush(c)
1496+
}
1497+
info.Duration = d.timeNow().Sub(startTime)
1498+
if err != nil {
1499+
return versionUpdate{}, err
1500+
}
14971501

1498-
info := FlushInfo{
1499-
JobID: int(jobID),
1500-
Input: inputs,
1501-
InputBytes: inputBytes,
1502-
Duration: d.timeNow().Sub(startTime),
1503-
Done: true,
1504-
Ingest: ingest,
1505-
Err: err,
1506-
}
1507-
if err == nil {
15081502
validateVersionEdit(ve, d.opts.Comparer.ValidateKey, d.opts.Comparer.FormatKey, d.opts.Logger)
15091503
for i := range ve.NewTables {
15101504
e := &ve.NewTables[i]
@@ -1515,9 +1509,6 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
15151509
info.IngestLevels = append(info.IngestLevels, e.Level)
15161510
}
15171511
}
1518-
if len(ve.NewTables) == 0 {
1519-
info.Err = errEmptyTable
1520-
}
15211512

15221513
// The flush succeeded or it produced an empty sstable. In either case we
15231514
// want to bump the minimum unflushed log number to the log number of the
@@ -1550,7 +1541,6 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
15501541
// write a file overlapping with the excise span.
15511542
if ingestFlushable.exciseSpan.OverlapsInternalKeyRange(d.cmp, c2.smallest, c2.largest) {
15521543
c2.cancel.Store(true)
1553-
continue
15541544
}
15551545
}
15561546

@@ -1570,17 +1560,13 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
15701560
}
15711561
}
15721562
}
1573-
err = d.mu.versions.logAndApply(jobID, ve, &c.metrics, false, /* forceRotation */
1574-
func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) })
1575-
if err != nil {
1576-
info.Err = err
1577-
}
1578-
} else {
1579-
// We won't be performing the logAndApply step because of the error, so
1580-
// logUnlock. We don't need to invalidate the pickedCompactionCache since
1581-
// the flush failed and so the latest version has not changed.
1582-
d.mu.versions.logUnlock()
1583-
}
1563+
return versionUpdate{
1564+
VE: ve,
1565+
JobID: jobID,
1566+
Metrics: c.metrics,
1567+
InProgressCompactionsFn: func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) },
1568+
}, nil
1569+
})
15841570

15851571
// If err != nil, then the flush will be retried, and we will recalculate
15861572
// these metrics.
@@ -1610,11 +1596,15 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
16101596
}
16111597
}
16121598
d.maybeTransitionSnapshotsToFileOnlyLocked()
1613-
16141599
}
16151600
// Signal FlushEnd after installing the new readState. This helps for unit
16161601
// tests that use the callback to trigger a read using an iterator with
16171602
// IterOptions.OnlyReadGuaranteedDurable.
1603+
info.Err = err
1604+
if info.Err == nil && len(ve.NewTables) == 0 {
1605+
info.Err = errEmptyTable
1606+
}
1607+
info.Done = true
16181608
info.TotalDuration = d.timeNow().Sub(startTime)
16191609
d.opts.EventListener.FlushEnd(info)
16201610

@@ -2468,9 +2458,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
24682458
info.Duration = d.timeNow().Sub(startTime)
24692459
if err == nil {
24702460
validateVersionEdit(ve, d.opts.Comparer.ValidateKey, d.opts.Comparer.FormatKey, d.opts.Logger)
2471-
err = func() error {
2472-
var err error
2473-
d.mu.versions.logLock()
2461+
err = d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
24742462
// Check if this compaction had a conflicting operation (eg. a d.excise())
24752463
// that necessitates it restarting from scratch. Note that since we hold
24762464
// the manifest lock, we don't expect this bool to change its value
@@ -2485,17 +2473,18 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
24852473
// would not have been true). We should delete any tables already
24862474
// created, as d.runCompaction did not do that.
24872475
d.cleanupVersionEdit(ve)
2488-
// logAndApply calls logUnlock. If we didn't call it, we need to call
2489-
// logUnlock ourselves. We also invalidate the pickedCompactionCache
2490-
// since this failed compaction may be the highest priority to run
2491-
// next.
2492-
d.mu.versions.logUnlockAndInvalidatePickedCompactionCache()
2493-
return err
2476+
// Note that UpdateVersionLocked invalidates the pickedCompactionCache
2477+
// when we return, which is relevant because this failed compaction
2478+
// may be the highest priority to run next.
2479+
return versionUpdate{}, err
24942480
}
2495-
return d.mu.versions.logAndApply(jobID, ve, &c.metrics, false /* forceRotation */, func() []compactionInfo {
2496-
return d.getInProgressCompactionInfoLocked(c)
2497-
})
2498-
}()
2481+
return versionUpdate{
2482+
VE: ve,
2483+
JobID: jobID,
2484+
Metrics: c.metrics,
2485+
InProgressCompactionsFn: func() []compactionInfo { return d.getInProgressCompactionInfoLocked(c) },
2486+
}, nil
2487+
})
24992488
}
25002489

25012490
info.Done = true

compaction_picker.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -695,8 +695,8 @@ type compactionPickerByScore struct {
695695
// This means that at some point in the future a compactionPickerByScore
696696
// created in the past will have mutually inconsistent state in vers and
697697
// l0Organizer. This is not a problem since (a) a new picker is created in
698-
// logAndApply when a new version is installed, and (b) only the latest picker
699-
// is used for picking compactions. This is ensured by holding
698+
// UpdateVersionLocked when a new version is installed, and (b) only the
699+
// latest picker is used for picking compactions. This is ensured by holding
700700
// versionSet.logLock for both (a) and (b).
701701
l0Organizer *manifest.L0Organizer
702702
virtualBackings *manifest.VirtualBackings

compaction_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2558,7 +2558,7 @@ func (i *createManifestErrorInjector) MaybeError(op errorfs.Op) error {
25582558
return nil
25592559
}
25602560
// This necessitates having a MaxManifestSize of 1, to reliably induce
2561-
// logAndApply errors.
2561+
// UpdateVersionLocked errors.
25622562
if strings.Contains(op.Path, "MANIFEST") && op.Kind == errorfs.OpCreate {
25632563
return errorfs.ErrInjected
25642564
}
@@ -2567,11 +2567,11 @@ func (i *createManifestErrorInjector) MaybeError(op errorfs.Op) error {
25672567

25682568
var _ errorfs.Injector = &createManifestErrorInjector{}
25692569

2570-
// TestCompaction_LogAndApplyFails exercises a flush or ingest encountering an
2571-
// unrecoverable error during logAndApply.
2570+
// TestCompaction_UpdateVersionFails exercises a flush or ingest encountering an
2571+
// unrecoverable error during UpdateVersionLocked.
25722572
//
25732573
// Regression test for #1669.
2574-
func TestCompaction_LogAndApplyFails(t *testing.T) {
2574+
func TestCompaction_UpdateVersionFails(t *testing.T) {
25752575
// flushKeys writes the given keys to the DB, flushing the resulting memtable.
25762576
var key = []byte("foo")
25772577
flushErrC := make(chan error)
@@ -2654,8 +2654,9 @@ func TestCompaction_LogAndApplyFails(t *testing.T) {
26542654
err = addFn(db)
26552655
require.True(t, errors.Is(err, errorfs.ErrInjected))
26562656

2657-
// Under normal circumstances, such an error in logAndApply would panic and
2658-
// cause the DB to terminate here. Assert that we captured the fatal error.
2657+
// Under normal circumstances, such an error in UpdateVersionLocked would
2658+
// panic and cause the DB to terminate here. Assert that we captured the
2659+
// fatal error.
26592660
require.True(t, errors.Is(logger.err, errorfs.ErrInjected))
26602661
}
26612662
for _, tc := range testCases {

data_test.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1102,10 +1102,15 @@ func runDBDefineCmdReuseFS(td *datadriven.TestData, opts *Options) (*DB, error)
11021102
ve.NewBlobFiles = slices.Collect(maps.Values(valueSeparator.metas))
11031103

11041104
jobID := d.newJobIDLocked()
1105-
d.mu.versions.logLock()
1106-
if err := d.mu.versions.logAndApply(jobID, ve, newFileMetrics(ve.NewTables), false, func() []compactionInfo {
1107-
return nil
1108-
}); err != nil {
1105+
err = d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
1106+
return versionUpdate{
1107+
VE: ve,
1108+
JobID: jobID,
1109+
Metrics: newFileMetrics(ve.NewTables),
1110+
InProgressCompactionsFn: func() []compactionInfo { return nil },
1111+
}, nil
1112+
})
1113+
if err != nil {
11091114
return nil, err
11101115
}
11111116
d.updateReadStateLocked(nil)

db.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ type DB struct {
322322
//
323323
// Care is taken to avoid holding DB.mu during IO operations. Accomplishing
324324
// this sometimes requires releasing DB.mu in a method that was called with
325-
// it held. See versionSet.logAndApply() and DB.makeRoomForWrite() for
325+
// it held. See versionSet.UpdateVersionLocked() and DB.makeRoomForWrite() for
326326
// examples. This is a common pattern, so be careful about expectations that
327327
// DB.mu will be held continuously across a set of calls.
328328
mu struct {

file_cache_test.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,7 @@ func TestVirtualReadsWiring(t *testing.T) {
393393
d.checkVirtualBounds(v2)
394394

395395
// Write the version edit.
396-
fileMetrics := func(ve *versionEdit) *levelMetricsDelta {
396+
fileMetrics := func(ve *versionEdit) levelMetricsDelta {
397397
metrics := newFileMetrics(ve.NewTables)
398398
for de, f := range ve.DeletedTables {
399399
lm := metrics[de.Level]
@@ -408,11 +408,13 @@ func TestVirtualReadsWiring(t *testing.T) {
408408
}
409409

410410
applyVE := func(ve *versionEdit) error {
411-
d.mu.versions.logLock()
412-
jobID := d.newJobIDLocked()
413-
414-
err := d.mu.versions.logAndApply(jobID, ve, fileMetrics(ve), false, func() []compactionInfo {
415-
return d.getInProgressCompactionInfoLocked(nil)
411+
err := d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
412+
return versionUpdate{
413+
VE: ve,
414+
JobID: d.newJobIDLocked(),
415+
Metrics: fileMetrics(ve),
416+
InProgressCompactionsFn: func() []compactionInfo { return d.getInProgressCompactionInfoLocked(nil) },
417+
}, nil
416418
})
417419
d.updateReadStateLocked(nil)
418420
return err

flushable_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,9 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
9696
}
9797

9898
// Fsync the directory we added the tables to. We need to do this at some
99-
// point before we update the MANIFEST (via logAndApply), otherwise a crash
100-
// can have the tables referenced in the MANIFEST, but not present in the
101-
// directory.
99+
// point before we update the MANIFEST (via UpdateVersionLocked), otherwise
100+
// a crash can have the tables referenced in the MANIFEST, but not present
101+
// in the directory.
102102
if err := d.dataDir.Sync(); err != nil {
103103
t.Fatal(err)
104104
}

format_major_version.go

Lines changed: 38 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -566,47 +566,45 @@ func (d *DB) markFilesLocked(findFn findFilesFunc) error {
566566

567567
// Lock the manifest for a coherent view of the LSM. The database lock has
568568
// been re-acquired by the defer within the above anonymous function.
569-
d.mu.versions.logLock()
570-
vers := d.mu.versions.currentVersion()
571-
for l, filesToMark := range files {
572-
if len(filesToMark) == 0 {
573-
continue
574-
}
575-
for _, f := range filesToMark {
576-
// Ignore files to be marked that have already been compacted or marked.
577-
if f.CompactionState == manifest.CompactionStateCompacted ||
578-
f.MarkedForCompaction {
569+
return d.mu.versions.UpdateVersionLocked(func() (versionUpdate, error) {
570+
vers := d.mu.versions.currentVersion()
571+
for l, filesToMark := range files {
572+
if len(filesToMark) == 0 {
579573
continue
580574
}
581-
// Else, mark the file for compaction in this version.
582-
vers.Stats.MarkedForCompaction++
583-
f.MarkedForCompaction = true
575+
for _, f := range filesToMark {
576+
// Ignore files to be marked that have already been compacted or marked.
577+
if f.CompactionState == manifest.CompactionStateCompacted ||
578+
f.MarkedForCompaction {
579+
continue
580+
}
581+
// Else, mark the file for compaction in this version.
582+
vers.Stats.MarkedForCompaction++
583+
f.MarkedForCompaction = true
584+
}
585+
// The compaction picker uses the markedForCompactionAnnotator to
586+
// quickly find files marked for compaction, or to quickly determine
587+
// that there are no such files marked for compaction within a level.
588+
// A b-tree node may be annotated with an annotation recording that
589+
// there are no files marked for compaction within the node's subtree,
590+
// based on the assumption that it's static.
591+
//
592+
// Since we're marking files for compaction, these b-tree nodes'
593+
// annotations will be out of date. Clear the compaction-picking
594+
// annotation, so that it's recomputed the next time the compaction
595+
// picker looks for a file marked for compaction.
596+
markedForCompactionAnnotator.InvalidateLevelAnnotation(vers.Levels[l])
584597
}
585-
// The compaction picker uses the markedForCompactionAnnotator to
586-
// quickly find files marked for compaction, or to quickly determine
587-
// that there are no such files marked for compaction within a level.
588-
// A b-tree node may be annotated with an annotation recording that
589-
// there are no files marked for compaction within the node's subtree,
590-
// based on the assumption that it's static.
591-
//
592-
// Since we're marking files for compaction, these b-tree nodes'
593-
// annotations will be out of date. Clear the compaction-picking
594-
// annotation, so that it's recomputed the next time the compaction
595-
// picker looks for a file marked for compaction.
596-
markedForCompactionAnnotator.InvalidateLevelAnnotation(vers.Levels[l])
597-
}
598-
599-
// The 'marked-for-compaction' bit is persisted in the MANIFEST file
600-
// metadata. We've already modified the in-memory table metadata, but the
601-
// manifest hasn't been updated. Force rotation to a new MANIFEST file,
602-
// which will write every table metadata to the new manifest file and ensure
603-
// that the now marked-for-compaction table metadata are persisted as marked.
604-
// NB: This call to logAndApply will unlockthe MANIFEST, which we locked up
605-
// above before obtaining `vers`.
606-
return d.mu.versions.logAndApply(
607-
jobID,
608-
&manifest.VersionEdit{},
609-
nil, /* metrics */
610-
true, /* forceRotation */
611-
func() []compactionInfo { return d.getInProgressCompactionInfoLocked(nil) })
598+
// The 'marked-for-compaction' bit is persisted in the MANIFEST file
599+
// metadata. We've already modified the in-memory table metadata, but the
600+
// manifest hasn't been updated. Force rotation to a new MANIFEST file,
601+
// which will write every table metadata to the new manifest file and ensure
602+
// that the now marked-for-compaction table metadata are persisted as marked.
603+
return versionUpdate{
604+
VE: &manifest.VersionEdit{},
605+
JobID: jobID,
606+
ForceManifestRotation: true,
607+
InProgressCompactionsFn: func() []compactionInfo { return d.getInProgressCompactionInfoLocked(nil) },
608+
}, nil
609+
})
612610
}

0 commit comments

Comments
 (0)