Skip to content

Commit eff0bed

Browse files
authored
core/rawdb: freezer index repair (ethereum#29792)
This pull request removes the `fsync` of index files in freezer.ModifyAncients function for performance gain. Originally, fsync is added after each freezer write operation to ensure the written data is truly transferred into disk. Unfortunately, it turns out `fsync` can be relatively slow, especially on macOS (see ethereum#28754 for more information). In this pull request, fsync for index file is removed as it turns out index file can be recovered even after a unclean shutdown. But fsync for data file is still kept, as we have no meaningful way to validate the data correctness after unclean shutdown. --- **But why do we need the `fsync` in the first place?** As it's necessary for freezer to survive/recover after the machine crash (e.g. power failure). In linux, whenever the file write is performed, the file metadata update and data update are not necessarily performed at the same time. Typically, the metadata will be flushed/journalled ahead of the file data. Therefore, we make the pessimistic assumption that the file is first extended with invalid "garbage" data (normally zero bytes) and that afterwards the correct data replaces the garbage. We have observed that the index file of the freezer often contain garbage entry with zero value (filenumber = 0, offset = 0) after a machine power failure. It proves that the index file is extended without the data being flushed. And this corruption can destroy the whole freezer data eventually. Performing fsync after each write operation can reduce the time window for data to be transferred to the disk and ensure the correctness of the data in the disk to the greatest extent. --- **How can we maintain this guarantee without relying on fsync?** Because the items in the index file are strictly in order, we can leverage this characteristic to detect the corruption and truncate them when freezer is opened. Specifically these validation rules are performed for each index file: For two consecutive index items: - If their file numbers are the same, then the offset of the latter one MUST not be less than that of the former. - If the file number of the latter one is equal to that of the former plus one, then the offset of the latter one MUST not be 0. - If their file numbers are not equal, and the latter's file number is not equal to the former plus 1, the latter one is valid And also, for the first non-head item, it must refer to the earliest data file, or the next file if the earliest file is not sufficient to place the first item(very special case, only theoretical possible in tests) With these validation rules, we can detect the invalid item in index file with greatest possibility. --- But unfortunately, these scenarios are not covered and could still lead to a freezer corruption if it occurs: **All items in index file are in zero value** It's impossible to distinguish if they are truly zero (e.g. all the data entries maintained in freezer are zero size) or just the garbage left by OS. In this case, these index items will be kept by truncating the entire data file, namely the freezer is corrupted. However, we can consider that the probability of this situation occurring is quite low, and even if it occurs, the freezer can be considered to be close to an empty state. Rerun the state sync should be acceptable. **Index file is integral while relative data file is corrupted** It might be possible the data file is corrupted whose file size is extended correctly with garbage filled (e.g. zero bytes). In this case, it's impossible to detect the corruption by index validation. We can either choose to `fsync` the data file, or blindly believe that if index file is integral then the data file could be integral with very high chance. In this pull request, the first option is taken.
1 parent 90970ed commit eff0bed

File tree

5 files changed

+217
-14
lines changed

5 files changed

+217
-14
lines changed

core/rawdb/freezer_batch.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -180,10 +180,10 @@ func (batch *freezerTableBatch) maybeCommit() error {
180180
return nil
181181
}
182182

183-
// commit writes the batched items to the backing freezerTable.
183+
// commit writes the batched items to the backing freezerTable. Note index
184+
// file isn't fsync'd after the file write, the recent write can be lost
185+
// after the power failure.
184186
func (batch *freezerTableBatch) commit() error {
185-
// Write data. The head file is fsync'd after write to ensure the
186-
// data is truly transferred to disk.
187187
_, err := batch.t.head.Write(batch.dataBuffer)
188188
if err != nil {
189189
return err
@@ -194,15 +194,10 @@ func (batch *freezerTableBatch) commit() error {
194194
dataSize := int64(len(batch.dataBuffer))
195195
batch.dataBuffer = batch.dataBuffer[:0]
196196

197-
// Write indices. The index file is fsync'd after write to ensure the
198-
// data indexes are truly transferred to disk.
199197
_, err = batch.t.index.Write(batch.indexBuffer)
200198
if err != nil {
201199
return err
202200
}
203-
if err := batch.t.index.Sync(); err != nil {
204-
return err
205-
}
206201
indexSize := int64(len(batch.indexBuffer))
207202
batch.indexBuffer = batch.indexBuffer[:0]
208203

core/rawdb/freezer_table.go

Lines changed: 136 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package rawdb
1818

1919
import (
20+
"bufio"
2021
"bytes"
2122
"encoding/binary"
2223
"errors"
@@ -26,6 +27,7 @@ import (
2627
"path/filepath"
2728
"sync"
2829
"sync/atomic"
30+
"time"
2931

3032
"github.com/ethereum/go-ethereum/common"
3133
"github.com/ethereum/go-ethereum/log"
@@ -219,7 +221,13 @@ func (t *freezerTable) repair() error {
219221
return err
220222
} // New file can't trigger this path
221223
}
222-
// Retrieve the file sizes and prepare for truncation
224+
// Validate the index file as it might contain some garbage data after the
225+
// power failures.
226+
if err := t.repairIndex(); err != nil {
227+
return err
228+
}
229+
// Retrieve the file sizes and prepare for truncation. Note the file size
230+
// might be changed after index validation.
223231
if stat, err = t.index.Stat(); err != nil {
224232
return err
225233
}
@@ -364,6 +372,133 @@ func (t *freezerTable) repair() error {
364372
return nil
365373
}
366374

375+
// repairIndex validates the integrity of the index file. According to the design,
376+
// the initial entry in the file denotes the earliest data file along with the
377+
// count of deleted items. Following this, all subsequent entries in the file must
378+
// be in order. This function identifies any corrupted entries and truncates items
379+
// occurring after the corruption point.
380+
//
381+
// corruption can occur because of the power failure. In the Linux kernel, the
382+
// file metadata update and data update are not necessarily performed at the
383+
// same time. Typically, the metadata will be flushed/journalled ahead of the file
384+
// data. Therefore, we make the pessimistic assumption that the file is first
385+
// extended with invalid "garbage" data (normally zero bytes) and that afterwards
386+
// the correct data replaces the garbage. As all the items in index file are
387+
// supposed to be in-order, the leftover garbage must be truncated before the
388+
// index data is utilized.
389+
//
390+
// It's important to note an exception that's unfortunately undetectable: when
391+
// all index entries in the file are zero. Distinguishing whether they represent
392+
// leftover garbage or if all items in the table have zero size is impossible.
393+
// In such instances, the file will remain unchanged to prevent potential data
394+
// loss or misinterpretation.
395+
func (t *freezerTable) repairIndex() error {
396+
// Retrieve the file sizes and prepare for validation
397+
stat, err := t.index.Stat()
398+
if err != nil {
399+
return err
400+
}
401+
size := stat.Size()
402+
403+
// Move the read cursor to the beginning of the file
404+
_, err = t.index.Seek(0, io.SeekStart)
405+
if err != nil {
406+
return err
407+
}
408+
fr := bufio.NewReader(t.index)
409+
410+
var (
411+
start = time.Now()
412+
buff = make([]byte, indexEntrySize)
413+
prev indexEntry
414+
head indexEntry
415+
416+
read = func() (indexEntry, error) {
417+
n, err := io.ReadFull(fr, buff)
418+
if err != nil {
419+
return indexEntry{}, err
420+
}
421+
if n != indexEntrySize {
422+
return indexEntry{}, fmt.Errorf("failed to read from index, n: %d", n)
423+
}
424+
var entry indexEntry
425+
entry.unmarshalBinary(buff)
426+
return entry, nil
427+
}
428+
truncate = func(offset int64) error {
429+
if t.readonly {
430+
return fmt.Errorf("index file is corrupted at %d, size: %d", offset, size)
431+
}
432+
if err := truncateFreezerFile(t.index, offset); err != nil {
433+
return err
434+
}
435+
log.Warn("Truncated index file", "offset", offset, "truncated", size-offset)
436+
return nil
437+
}
438+
)
439+
for offset := int64(0); offset < size; offset += indexEntrySize {
440+
entry, err := read()
441+
if err != nil {
442+
return err
443+
}
444+
if offset == 0 {
445+
head = entry
446+
continue
447+
}
448+
// Ensure that the first non-head index refers to the earliest file,
449+
// or the next file if the earliest file has no space to place the
450+
// first item.
451+
if offset == indexEntrySize {
452+
if entry.filenum != head.filenum && entry.filenum != head.filenum+1 {
453+
log.Error("Corrupted index item detected", "earliest", head.filenum, "filenumber", entry.filenum)
454+
return truncate(offset)
455+
}
456+
prev = entry
457+
continue
458+
}
459+
// ensure two consecutive index items are in order
460+
if err := t.checkIndexItems(prev, entry); err != nil {
461+
log.Error("Corrupted index item detected", "err", err)
462+
return truncate(offset)
463+
}
464+
prev = entry
465+
}
466+
// Move the read cursor to the end of the file. While theoretically, the
467+
// cursor should reach the end by reading all the items in the file, perform
468+
// the seek operation anyway as a precaution.
469+
_, err = t.index.Seek(0, io.SeekEnd)
470+
if err != nil {
471+
return err
472+
}
473+
log.Debug("Verified index file", "items", size/indexEntrySize, "elapsed", common.PrettyDuration(time.Since(start)))
474+
return nil
475+
}
476+
477+
// checkIndexItems validates the correctness of two consecutive index items based
478+
// on the following rules:
479+
//
480+
// - The file number of two consecutive index items must either be the same or
481+
// increase monotonically. If the file number decreases or skips in a
482+
// non-sequential manner, the index item is considered invalid.
483+
//
484+
// - For index items with the same file number, the data offset must be in
485+
// non-decreasing order. Note: Two index items with the same file number
486+
// and the same data offset are permitted if the entry size is zero.
487+
//
488+
// - The first index item in a new data file must not have a zero data offset.
489+
func (t *freezerTable) checkIndexItems(a, b indexEntry) error {
490+
if b.filenum != a.filenum && b.filenum != a.filenum+1 {
491+
return fmt.Errorf("index items with inconsistent file number, prev: %d, next: %d", a.filenum, b.filenum)
492+
}
493+
if b.filenum == a.filenum && b.offset < a.offset {
494+
return fmt.Errorf("index items with unordered offset, prev: %d, next: %d", a.offset, b.offset)
495+
}
496+
if b.filenum == a.filenum+1 && b.offset == 0 {
497+
return fmt.Errorf("index items with zero offset, file number: %d", b.filenum)
498+
}
499+
return nil
500+
}
501+
367502
// preopen opens all files that the freezer will need. This method should be called from an init-context,
368503
// since it assumes that it doesn't have to bother with locking
369504
// The rationale for doing preopen is to not have to do it from within Retrieve, thus not needing to ever

core/rawdb/freezer_table_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1367,3 +1367,69 @@ func TestRandom(t *testing.T) {
13671367
t.Fatal(err)
13681368
}
13691369
}
1370+
1371+
func TestIndexValidation(t *testing.T) {
1372+
const (
1373+
items = 30
1374+
dataSize = 10
1375+
)
1376+
garbage := indexEntry{
1377+
filenum: 100,
1378+
offset: 200,
1379+
}
1380+
var cases = []struct {
1381+
offset int64
1382+
data []byte
1383+
expItems int
1384+
}{
1385+
// extend index file with zero bytes at the end
1386+
{
1387+
offset: (items + 1) * indexEntrySize,
1388+
data: make([]byte, indexEntrySize),
1389+
expItems: 30,
1390+
},
1391+
// write garbage in the first non-head item
1392+
{
1393+
offset: indexEntrySize,
1394+
data: garbage.append(nil),
1395+
expItems: 0,
1396+
},
1397+
// write garbage in the first non-head item
1398+
{
1399+
offset: (items/2 + 1) * indexEntrySize,
1400+
data: garbage.append(nil),
1401+
expItems: items / 2,
1402+
},
1403+
}
1404+
for _, c := range cases {
1405+
fn := fmt.Sprintf("t-%d", rand.Uint64())
1406+
f, err := newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 100, true, false)
1407+
if err != nil {
1408+
t.Fatal(err)
1409+
}
1410+
writeChunks(t, f, items, dataSize)
1411+
1412+
// write corrupted data
1413+
f.index.WriteAt(c.data, c.offset)
1414+
f.Close()
1415+
1416+
// reopen the table, corruption should be truncated
1417+
f, err = newTable(os.TempDir(), fn, metrics.NewMeter(), metrics.NewMeter(), metrics.NewGauge(), 100, true, false)
1418+
if err != nil {
1419+
t.Fatal(err)
1420+
}
1421+
for i := 0; i < c.expItems; i++ {
1422+
exp := getChunk(10, i)
1423+
got, err := f.Retrieve(uint64(i))
1424+
if err != nil {
1425+
t.Fatalf("Failed to read from table, %v", err)
1426+
}
1427+
if !bytes.Equal(exp, got) {
1428+
t.Fatalf("Unexpected item data, want: %v, got: %v", exp, got)
1429+
}
1430+
}
1431+
if f.items.Load() != uint64(c.expItems) {
1432+
t.Fatalf("Unexpected item number, want: %d, got: %d", c.expItems, f.items.Load())
1433+
}
1434+
}
1435+
}

triedb/pathdb/disklayer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ func (dl *diskLayer) commit(bottom *diffLayer, force bool) (*diskLayer, error) {
202202
if !force && rawdb.ReadPersistentStateID(dl.db.diskdb) < oldest {
203203
force = true
204204
}
205-
if err := ndl.buffer.flush(ndl.db.diskdb, ndl.cleans, ndl.id, force); err != nil {
205+
if err := ndl.buffer.flush(ndl.db.diskdb, ndl.db.freezer, ndl.cleans, ndl.id, force); err != nil {
206206
return nil, err
207207
}
208208
// To remove outdated history objects from the end, we set the 'tail' parameter
@@ -267,7 +267,7 @@ func (dl *diskLayer) setBufferSize(size int) error {
267267
if dl.stale {
268268
return errSnapshotStale
269269
}
270-
return dl.buffer.setSize(size, dl.db.diskdb, dl.cleans, dl.id)
270+
return dl.buffer.setSize(size, dl.db.diskdb, dl.db.freezer, dl.cleans, dl.id)
271271
}
272272

273273
// size returns the approximate size of cached nodes in the disk layer.

triedb/pathdb/nodebuffer.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,9 +194,9 @@ func (b *nodebuffer) empty() bool {
194194

195195
// setSize sets the buffer size to the provided number, and invokes a flush
196196
// operation if the current memory usage exceeds the new limit.
197-
func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64) error {
197+
func (b *nodebuffer) setSize(size int, db ethdb.KeyValueStore, freezer ethdb.AncientStore, clean *fastcache.Cache, id uint64) error {
198198
b.limit = uint64(size)
199-
return b.flush(db, clean, id, false)
199+
return b.flush(db, freezer, clean, id, false)
200200
}
201201

202202
// allocBatch returns a database batch with pre-allocated buffer.
@@ -214,7 +214,7 @@ func (b *nodebuffer) allocBatch(db ethdb.KeyValueStore) ethdb.Batch {
214214

215215
// flush persists the in-memory dirty trie node into the disk if the configured
216216
// memory threshold is reached. Note, all data must be written atomically.
217-
func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id uint64, force bool) error {
217+
func (b *nodebuffer) flush(db ethdb.KeyValueStore, freezer ethdb.AncientWriter, clean *fastcache.Cache, id uint64, force bool) error {
218218
if b.size <= b.limit && !force {
219219
return nil
220220
}
@@ -227,6 +227,13 @@ func (b *nodebuffer) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, id ui
227227
start = time.Now()
228228
batch = b.allocBatch(db)
229229
)
230+
// Explicitly sync the state freezer, ensuring that all written
231+
// data is transferred to disk before updating the key-value store.
232+
if freezer != nil {
233+
if err := freezer.Sync(); err != nil {
234+
return err
235+
}
236+
}
230237
nodes := writeNodes(batch, b.nodes, clean)
231238
rawdb.WritePersistentStateID(batch, id)
232239

0 commit comments

Comments
 (0)