Skip to content

Commit c41e1bd

Browse files
frncmxnonsense
authored andcommitted
swarm/storage: fix garbage collector index skew (#18080)
On file access LDBStore's tryAccessIdx() function created a faulty GC Index Data entry, because not indexing the ikey correctly. That caused the chunk addresses/hashes to start with '00' and the last two digits were dropped. => Incorrect chunk address. Besides the fix, the commit also contains a schema change which will run the CleanGCIndex() function to clean the GC index from erroneous entries. Note: CleanGCIndex() rebuilds the index from scratch which can take a really-really long time with a huge DB (possibly an hour).
1 parent 4fecc7a commit c41e1bd

File tree

4 files changed

+268
-54
lines changed

4 files changed

+268
-54
lines changed

swarm/storage/ldbstore.go

Lines changed: 84 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ var (
5757

5858
var (
5959
keyIndex = byte(0)
60-
keyOldData = byte(1)
6160
keyAccessCnt = []byte{2}
6261
keyEntryCnt = []byte{3}
6362
keyDataIdx = []byte{4}
@@ -285,6 +284,10 @@ func getGCIdxValue(index *dpaDBIndex, po uint8, addr Address) []byte {
285284
return val
286285
}
287286

287+
func parseGCIdxKey(key []byte) (byte, []byte) {
288+
return key[0], key[1:]
289+
}
290+
288291
func parseGCIdxEntry(accessCnt []byte, val []byte) (index *dpaDBIndex, po uint8, addr Address) {
289292
index = &dpaDBIndex{
290293
Idx: binary.BigEndian.Uint64(val[1:]),
@@ -504,7 +507,7 @@ func (s *LDBStore) Import(in io.Reader) (int64, error) {
504507
}
505508
}
506509

507-
//Cleanup iterates over the database and deletes chunks if they pass the `f` condition
510+
// Cleanup iterates over the database and deletes chunks if they pass the `f` condition
508511
func (s *LDBStore) Cleanup(f func(*chunk) bool) {
509512
var errorsFound, removed, total int
510513

@@ -569,47 +572,90 @@ func (s *LDBStore) Cleanup(f func(*chunk) bool) {
569572
log.Warn(fmt.Sprintf("Found %v errors out of %v entries. Removed %v chunks.", errorsFound, total, removed))
570573
}
571574

572-
func (s *LDBStore) ReIndex() {
573-
//Iterates over the database and checks that there are no faulty chunks
575+
// CleanGCIndex rebuilds the garbage collector index from scratch, while
576+
// removing inconsistent elements, e.g., indices with missing data chunks.
577+
// WARN: it's a pretty heavy, long running function.
578+
func (s *LDBStore) CleanGCIndex() error {
579+
s.lock.Lock()
580+
defer s.lock.Unlock()
581+
582+
batch := leveldb.Batch{}
583+
584+
var okEntryCount uint64
585+
var totalEntryCount uint64
586+
587+
// throw out all gc indices, we will rebuild from cleaned index
574588
it := s.db.NewIterator()
575-
startPosition := []byte{keyOldData}
576-
it.Seek(startPosition)
577-
var key []byte
578-
var errorsFound, total int
589+
it.Seek([]byte{keyGCIdx})
590+
var gcDeletes int
591+
for it.Valid() {
592+
rowType, _ := parseGCIdxKey(it.Key())
593+
if rowType != keyGCIdx {
594+
break
595+
}
596+
batch.Delete(it.Key())
597+
gcDeletes++
598+
it.Next()
599+
}
600+
log.Debug("gc", "deletes", gcDeletes)
601+
if err := s.db.Write(&batch); err != nil {
602+
return err
603+
}
604+
605+
it.Seek([]byte{keyIndex})
606+
var idx dpaDBIndex
607+
var poPtrs [256]uint64
579608
for it.Valid() {
580-
key = it.Key()
581-
if (key == nil) || (key[0] != keyOldData) {
609+
rowType, chunkHash := parseGCIdxKey(it.Key())
610+
if rowType != keyIndex {
582611
break
583612
}
584-
data := it.Value()
585-
hasher := s.hashfunc()
586-
hasher.Write(data)
587-
hash := hasher.Sum(nil)
588-
589-
newKey := make([]byte, 10)
590-
oldCntKey := make([]byte, 2)
591-
newCntKey := make([]byte, 2)
592-
oldCntKey[0] = keyDistanceCnt
593-
newCntKey[0] = keyDistanceCnt
594-
key[0] = keyData
595-
key[1] = s.po(Address(key[1:]))
596-
oldCntKey[1] = key[1]
597-
newCntKey[1] = s.po(Address(newKey[1:]))
598-
copy(newKey[2:], key[1:])
599-
newValue := append(hash, data...)
600-
601-
batch := new(leveldb.Batch)
602-
batch.Delete(key)
603-
s.bucketCnt[oldCntKey[1]]--
604-
batch.Put(oldCntKey, U64ToBytes(s.bucketCnt[oldCntKey[1]]))
605-
batch.Put(newKey, newValue)
606-
s.bucketCnt[newCntKey[1]]++
607-
batch.Put(newCntKey, U64ToBytes(s.bucketCnt[newCntKey[1]]))
608-
s.db.Write(batch)
613+
err := decodeIndex(it.Value(), &idx)
614+
if err != nil {
615+
return fmt.Errorf("corrupt index: %v", err)
616+
}
617+
po := s.po(chunkHash)
618+
619+
// if we don't find the data key, remove the entry
620+
dataKey := getDataKey(idx.Idx, po)
621+
_, err = s.db.Get(dataKey)
622+
if err != nil {
623+
log.Warn("deleting inconsistent index (missing data)", "key", chunkHash)
624+
batch.Delete(it.Key())
625+
} else {
626+
gcIdxKey := getGCIdxKey(&idx)
627+
gcIdxData := getGCIdxValue(&idx, po, chunkHash)
628+
batch.Put(gcIdxKey, gcIdxData)
629+
log.Trace("clean ok", "key", chunkHash, "gcKey", gcIdxKey, "gcData", gcIdxData)
630+
okEntryCount++
631+
if idx.Idx > poPtrs[po] {
632+
poPtrs[po] = idx.Idx
633+
}
634+
}
635+
totalEntryCount++
609636
it.Next()
610637
}
638+
611639
it.Release()
612-
log.Warn(fmt.Sprintf("Found %v errors out of %v entries", errorsFound, total))
640+
log.Debug("gc cleanup entries", "ok", okEntryCount, "total", totalEntryCount, "batchlen", batch.Len())
641+
642+
var entryCount [8]byte
643+
binary.BigEndian.PutUint64(entryCount[:], okEntryCount)
644+
batch.Put(keyEntryCnt, entryCount[:])
645+
var poKey [2]byte
646+
poKey[0] = keyDistanceCnt
647+
for i, poPtr := range poPtrs {
648+
poKey[1] = uint8(i)
649+
if poPtr == 0 {
650+
batch.Delete(poKey[:])
651+
} else {
652+
var idxCount [8]byte
653+
binary.BigEndian.PutUint64(idxCount[:], poPtr)
654+
batch.Put(poKey[:], idxCount[:])
655+
}
656+
}
657+
658+
return s.db.Write(&batch)
613659
}
614660

615661
// Delete is removes a chunk and updates indices.
@@ -826,7 +872,7 @@ func (s *LDBStore) tryAccessIdx(addr Address, po uint8) (*dpaDBIndex, bool) {
826872
s.accessCnt++
827873
s.batch.Put(ikey, idata)
828874
newGCIdxKey := getGCIdxKey(index)
829-
newGCIdxData := getGCIdxValue(index, po, ikey)
875+
newGCIdxData := getGCIdxValue(index, po, ikey[1:])
830876
s.batch.Delete(oldGCIdxKey)
831877
s.batch.Put(newGCIdxKey, newGCIdxData)
832878
select {
@@ -844,7 +890,7 @@ func (s *LDBStore) GetSchema() (string, error) {
844890
data, err := s.db.Get(keySchema)
845891
if err != nil {
846892
if err == leveldb.ErrNotFound {
847-
return "", nil
893+
return DbSchemaNone, nil
848894
}
849895
return "", err
850896
}

swarm/storage/ldbstore_test.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package storage
1919
import (
2020
"bytes"
2121
"context"
22+
"encoding/binary"
2223
"fmt"
2324
"io/ioutil"
2425
"os"
@@ -623,6 +624,145 @@ func TestLDBStoreCollectGarbageAccessUnlikeIndex(t *testing.T) {
623624
log.Info("ldbstore", "total", n, "missing", missing, "entrycnt", ldb.entryCnt, "accesscnt", ldb.accessCnt)
624625
}
625626

627+
func TestCleanIndex(t *testing.T) {
628+
capacity := 5000
629+
n := 3
630+
631+
ldb, cleanup := newLDBStore(t)
632+
ldb.setCapacity(uint64(capacity))
633+
defer cleanup()
634+
635+
chunks, err := mputRandomChunks(ldb, n, 4096)
636+
if err != nil {
637+
t.Fatal(err)
638+
}
639+
640+
// remove the data of the first chunk
641+
po := ldb.po(chunks[0].Address()[:])
642+
dataKey := make([]byte, 10)
643+
dataKey[0] = keyData
644+
dataKey[1] = byte(po)
645+
// dataKey[2:10] = first chunk has storageIdx 0 on [2:10]
646+
if _, err := ldb.db.Get(dataKey); err != nil {
647+
t.Fatal(err)
648+
}
649+
if err := ldb.db.Delete(dataKey); err != nil {
650+
t.Fatal(err)
651+
}
652+
653+
// remove the gc index row for the first chunk
654+
gcFirstCorrectKey := make([]byte, 9)
655+
gcFirstCorrectKey[0] = keyGCIdx
656+
if err := ldb.db.Delete(gcFirstCorrectKey); err != nil {
657+
t.Fatal(err)
658+
}
659+
660+
// warp the gc data of the second chunk
661+
// this data should be correct again after the clean
662+
gcSecondCorrectKey := make([]byte, 9)
663+
gcSecondCorrectKey[0] = keyGCIdx
664+
binary.BigEndian.PutUint64(gcSecondCorrectKey[1:], uint64(1))
665+
gcSecondCorrectVal, err := ldb.db.Get(gcSecondCorrectKey)
666+
if err != nil {
667+
t.Fatal(err)
668+
}
669+
warpedGCVal := make([]byte, len(gcSecondCorrectVal)+1)
670+
copy(warpedGCVal[1:], gcSecondCorrectVal)
671+
if err := ldb.db.Delete(gcSecondCorrectKey); err != nil {
672+
t.Fatal(err)
673+
}
674+
if err := ldb.db.Put(gcSecondCorrectKey, warpedGCVal); err != nil {
675+
t.Fatal(err)
676+
}
677+
678+
if err := ldb.CleanGCIndex(); err != nil {
679+
t.Fatal(err)
680+
}
681+
682+
// the index without corresponding data should have been deleted
683+
idxKey := make([]byte, 33)
684+
idxKey[0] = keyIndex
685+
copy(idxKey[1:], chunks[0].Address())
686+
if _, err := ldb.db.Get(idxKey); err == nil {
687+
t.Fatalf("expected chunk 0 idx to be pruned: %v", idxKey)
688+
}
689+
690+
// the two other indices should be present
691+
copy(idxKey[1:], chunks[1].Address())
692+
if _, err := ldb.db.Get(idxKey); err != nil {
693+
t.Fatalf("expected chunk 1 idx to be present: %v", idxKey)
694+
}
695+
696+
copy(idxKey[1:], chunks[2].Address())
697+
if _, err := ldb.db.Get(idxKey); err != nil {
698+
t.Fatalf("expected chunk 2 idx to be present: %v", idxKey)
699+
}
700+
701+
// first gc index should still be gone
702+
if _, err := ldb.db.Get(gcFirstCorrectKey); err == nil {
703+
t.Fatalf("expected gc 0 idx to be pruned: %v", idxKey)
704+
}
705+
706+
// second gc index should still be fixed
707+
if _, err := ldb.db.Get(gcSecondCorrectKey); err != nil {
708+
t.Fatalf("expected gc 1 idx to be present: %v", idxKey)
709+
}
710+
711+
// third gc index should be unchanged
712+
binary.BigEndian.PutUint64(gcSecondCorrectKey[1:], uint64(2))
713+
if _, err := ldb.db.Get(gcSecondCorrectKey); err != nil {
714+
t.Fatalf("expected gc 2 idx to be present: %v", idxKey)
715+
}
716+
717+
c, err := ldb.db.Get(keyEntryCnt)
718+
if err != nil {
719+
t.Fatalf("expected gc 2 idx to be present: %v", idxKey)
720+
}
721+
722+
// entrycount should now be one less
723+
entryCount := binary.BigEndian.Uint64(c)
724+
if entryCount != 2 {
725+
t.Fatalf("expected entrycnt to be 2, was %d", c)
726+
}
727+
728+
// the chunks might accidentally be in the same bin
729+
// if so that bin counter will now be 2 - the highest added index.
730+
// if not, the total of them will be 3
731+
poBins := []uint8{ldb.po(chunks[1].Address()), ldb.po(chunks[2].Address())}
732+
if poBins[0] == poBins[1] {
733+
poBins = poBins[:1]
734+
}
735+
736+
var binTotal uint64
737+
var currentBin [2]byte
738+
currentBin[0] = keyDistanceCnt
739+
if len(poBins) == 1 {
740+
currentBin[1] = poBins[0]
741+
c, err := ldb.db.Get(currentBin[:])
742+
if err != nil {
743+
t.Fatalf("expected gc 2 idx to be present: %v", idxKey)
744+
}
745+
binCount := binary.BigEndian.Uint64(c)
746+
if binCount != 2 {
747+
t.Fatalf("expected entrycnt to be 2, was %d", binCount)
748+
}
749+
} else {
750+
for _, bin := range poBins {
751+
currentBin[1] = bin
752+
c, err := ldb.db.Get(currentBin[:])
753+
if err != nil {
754+
t.Fatalf("expected gc 2 idx to be present: %v", idxKey)
755+
}
756+
binCount := binary.BigEndian.Uint64(c)
757+
binTotal += binCount
758+
759+
}
760+
if binTotal != 3 {
761+
t.Fatalf("expected sum of bin indices to be 3, was %d", binTotal)
762+
}
763+
}
764+
}
765+
626766
func waitGc(ctx context.Context, ldb *LDBStore) {
627767
<-ldb.gc.runC
628768
ldb.gc.runC <- struct{}{}

swarm/storage/localstore.go

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -196,31 +196,48 @@ func (ls *LocalStore) Close() {
196196

197197
// Migrate checks the datastore schema vs the runtime schema, and runs migrations if they don't match
198198
func (ls *LocalStore) Migrate() error {
199-
schema, err := ls.DbStore.GetSchema()
199+
actualDbSchema, err := ls.DbStore.GetSchema()
200200
if err != nil {
201201
log.Error(err.Error())
202202
return err
203203
}
204204

205-
log.Debug("found schema", "schema", schema, "runtime-schema", CurrentDbSchema)
206-
if schema != CurrentDbSchema {
207-
// run migrations
205+
log.Debug("running migrations for", "schema", actualDbSchema, "runtime-schema", CurrentDbSchema)
208206

209-
if schema == "" {
210-
log.Debug("running migrations for", "schema", schema, "runtime-schema", CurrentDbSchema)
207+
if actualDbSchema == CurrentDbSchema {
208+
return nil
209+
}
210+
211+
if actualDbSchema == DbSchemaNone {
212+
ls.migrateFromNoneToPurity()
213+
actualDbSchema = DbSchemaPurity
214+
}
211215

212-
// delete chunks that are not valid, i.e. chunks that do not pass any of the ls.Validators
213-
ls.DbStore.Cleanup(func(c *chunk) bool {
214-
return !ls.isValid(c)
215-
})
216+
if err := ls.DbStore.PutSchema(actualDbSchema); err != nil {
217+
return err
218+
}
216219

217-
err := ls.DbStore.PutSchema(DbSchemaPurity)
218-
if err != nil {
219-
log.Error(err.Error())
220-
return err
221-
}
220+
if actualDbSchema == DbSchemaPurity {
221+
if err := ls.migrateFromPurityToHalloween(); err != nil {
222+
return err
222223
}
224+
actualDbSchema = DbSchemaHalloween
223225
}
224226

227+
if err := ls.DbStore.PutSchema(actualDbSchema); err != nil {
228+
return err
229+
}
225230
return nil
226231
}
232+
233+
func (ls *LocalStore) migrateFromNoneToPurity() {
234+
// delete chunks that are not valid, i.e. chunks that do not pass
235+
// any of the ls.Validators
236+
ls.DbStore.Cleanup(func(c *chunk) bool {
237+
return !ls.isValid(c)
238+
})
239+
}
240+
241+
func (ls *LocalStore) migrateFromPurityToHalloween() error {
242+
return ls.DbStore.CleanGCIndex()
243+
}

0 commit comments

Comments
 (0)