Skip to content

Commit 8080265

Browse files
frncmxFerenc Szabo
authored andcommitted
swarm/storage: fix access count on dbstore after cache hit (#17978)
Access count was not incremented when chunk was retrieved from cache. So the garbage collector might have deleted the most frequently accessed chunk from disk. Co-authored-by: Ferenc Szabo <[email protected]>
1 parent 1212c7b commit 8080265

File tree

4 files changed

+138
-17
lines changed

4 files changed

+138
-17
lines changed

swarm/storage/ldbstore.go

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,20 @@ func NewLDBStore(params *LDBStoreParams) (s *LDBStore, err error) {
186186
return s, nil
187187
}
188188

189+
// MarkAccessed increments the access counter as a best effort for a chunk, so
190+
// the chunk won't get garbage collected.
191+
func (s *LDBStore) MarkAccessed(addr Address) {
192+
s.lock.Lock()
193+
defer s.lock.Unlock()
194+
195+
if s.closed {
196+
return
197+
}
198+
199+
proximity := s.po(addr)
200+
s.tryAccessIdx(addr, proximity)
201+
}
202+
189203
// initialize and set values for processing of gc round
190204
func (s *LDBStore) startGC(c int) {
191205

@@ -349,6 +363,7 @@ func (s *LDBStore) collectGarbage() error {
349363
s.delete(s.gc.batch.Batch, index, keyIdx, po)
350364
singleIterationCount++
351365
s.gc.count++
366+
log.Trace("garbage collect enqueued chunk for deletion", "key", hash)
352367

353368
// break if target is not on max garbage batch boundary
354369
if s.gc.count >= s.gc.target {
@@ -685,12 +700,7 @@ func (s *LDBStore) Put(ctx context.Context, chunk Chunk) error {
685700
idata, err := s.db.Get(ikey)
686701
if err != nil {
687702
s.doPut(chunk, &index, po)
688-
} else {
689-
log.Debug("ldbstore.put: chunk already exists, only update access", "key", chunk.Address(), "po", po)
690-
decodeIndex(idata, &index)
691703
}
692-
index.Access = s.accessCnt
693-
s.accessCnt++
694704
idata = encodeIndex(&index)
695705
s.batch.Put(ikey, idata)
696706

@@ -723,7 +733,8 @@ func (s *LDBStore) doPut(chunk Chunk, index *dpaDBIndex, po uint8) {
723733
s.entryCnt++
724734
dbEntryCount.Inc(1)
725735
s.dataIdx++
726-
736+
index.Access = s.accessCnt
737+
s.accessCnt++
727738
cntKey := make([]byte, 2)
728739
cntKey[0] = keyDistanceCnt
729740
cntKey[1] = po
@@ -796,18 +807,23 @@ func newMockEncodeDataFunc(mockStore *mock.NodeStore) func(chunk Chunk) []byte {
796807
}
797808
}
798809

799-
// try to find index; if found, update access cnt and return true
800-
func (s *LDBStore) tryAccessIdx(ikey []byte, po uint8, index *dpaDBIndex) bool {
810+
// tryAccessIdx tries to find index entry. If found then increments the access
811+
// count for garbage collection and returns the index entry and true for found,
812+
// otherwise returns nil and false.
813+
func (s *LDBStore) tryAccessIdx(addr Address, po uint8) (*dpaDBIndex, bool) {
814+
ikey := getIndexKey(addr)
801815
idata, err := s.db.Get(ikey)
802816
if err != nil {
803-
return false
817+
return nil, false
804818
}
819+
820+
index := new(dpaDBIndex)
805821
decodeIndex(idata, index)
806822
oldGCIdxKey := getGCIdxKey(index)
807823
s.batch.Put(keyAccessCnt, U64ToBytes(s.accessCnt))
808-
s.accessCnt++
809824
index.Access = s.accessCnt
810825
idata = encodeIndex(index)
826+
s.accessCnt++
811827
s.batch.Put(ikey, idata)
812828
newGCIdxKey := getGCIdxKey(index)
813829
newGCIdxData := getGCIdxValue(index, po, ikey)
@@ -817,7 +833,7 @@ func (s *LDBStore) tryAccessIdx(ikey []byte, po uint8, index *dpaDBIndex) bool {
817833
case s.batchesC <- struct{}{}:
818834
default:
819835
}
820-
return true
836+
return index, true
821837
}
822838

823839
// GetSchema is returning the current named schema of the datastore as read from LevelDB
@@ -858,12 +874,12 @@ func (s *LDBStore) Get(_ context.Context, addr Address) (chunk Chunk, err error)
858874

859875
// TODO: To conform with other private methods of this object indices should not be updated
860876
func (s *LDBStore) get(addr Address) (chunk *chunk, err error) {
861-
var indx dpaDBIndex
862877
if s.closed {
863878
return nil, ErrDBClosed
864879
}
865880
proximity := s.po(addr)
866-
if s.tryAccessIdx(getIndexKey(addr), proximity, &indx) {
881+
index, found := s.tryAccessIdx(addr, proximity)
882+
if found {
867883
var data []byte
868884
if s.getDataFunc != nil {
869885
// if getDataFunc is defined, use it to retrieve the chunk data
@@ -874,12 +890,12 @@ func (s *LDBStore) get(addr Address) (chunk *chunk, err error) {
874890
}
875891
} else {
876892
// default DbStore functionality to retrieve chunk data
877-
datakey := getDataKey(indx.Idx, proximity)
893+
datakey := getDataKey(index.Idx, proximity)
878894
data, err = s.db.Get(datakey)
879-
log.Trace("ldbstore.get retrieve", "key", addr, "indexkey", indx.Idx, "datakey", fmt.Sprintf("%x", datakey), "proximity", proximity)
895+
log.Trace("ldbstore.get retrieve", "key", addr, "indexkey", index.Idx, "datakey", fmt.Sprintf("%x", datakey), "proximity", proximity)
880896
if err != nil {
881897
log.Trace("ldbstore.get chunk found but could not be accessed", "key", addr, "err", err)
882-
s.deleteNow(&indx, getIndexKey(addr), s.po(addr))
898+
s.deleteNow(index, getIndexKey(addr), s.po(addr))
883899
return
884900
}
885901
}

swarm/storage/ldbstore_test.go

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import (
3131
ch "github.com/ethereum/go-ethereum/swarm/chunk"
3232
"github.com/ethereum/go-ethereum/swarm/log"
3333
"github.com/ethereum/go-ethereum/swarm/storage/mock/mem"
34-
3534
ldberrors "github.com/syndtr/goleveldb/leveldb/errors"
3635
)
3736

@@ -105,6 +104,46 @@ func testDbStoreCorrect(n int, chunksize int64, mock bool, t *testing.T) {
105104
testStoreCorrect(db, n, chunksize, t)
106105
}
107106

107+
func TestMarkAccessed(t *testing.T) {
108+
db, cleanup, err := newTestDbStore(false, true)
109+
defer cleanup()
110+
if err != nil {
111+
t.Fatalf("init dbStore failed: %v", err)
112+
}
113+
114+
h := GenerateRandomChunk(ch.DefaultSize)
115+
116+
db.Put(context.Background(), h)
117+
118+
var index dpaDBIndex
119+
addr := h.Address()
120+
idxk := getIndexKey(addr)
121+
122+
idata, err := db.db.Get(idxk)
123+
if err != nil {
124+
t.Fatal(err)
125+
}
126+
decodeIndex(idata, &index)
127+
128+
if index.Access != 0 {
129+
t.Fatalf("Expected the access index to be %d, but it is %d", 0, index.Access)
130+
}
131+
132+
db.MarkAccessed(addr)
133+
db.writeCurrentBatch()
134+
135+
idata, err = db.db.Get(idxk)
136+
if err != nil {
137+
t.Fatal(err)
138+
}
139+
decodeIndex(idata, &index)
140+
141+
if index.Access != 1 {
142+
t.Fatalf("Expected the access index to be %d, but it is %d", 1, index.Access)
143+
}
144+
145+
}
146+
108147
func TestDbStoreRandom_1(t *testing.T) {
109148
testDbStoreRandom(1, 0, false, t)
110149
}

swarm/storage/localstore.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ func (ls *LocalStore) get(ctx context.Context, addr Address) (chunk Chunk, err e
153153

154154
if err == nil {
155155
metrics.GetOrRegisterCounter("localstore.get.cachehit", nil).Inc(1)
156+
go ls.DbStore.MarkAccessed(addr)
156157
return chunk, nil
157158
}
158159

swarm/storage/localstore_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"io/ioutil"
2222
"os"
2323
"testing"
24+
"time"
2425

2526
ch "github.com/ethereum/go-ethereum/swarm/chunk"
2627
)
@@ -144,3 +145,67 @@ func put(store *LocalStore, n int, f func(i int64) Chunk) (hs []Address, errs []
144145
}
145146
return hs, errs
146147
}
148+
149+
// TestGetFrequentlyAccessedChunkWontGetGarbageCollected tests that the most
150+
// frequently accessed chunk is not garbage collected from LDBStore, i.e.,
151+
// from disk when we are at the capacity and garbage collector runs. For that
152+
// we start putting random chunks into the DB while continuously accessing the
153+
// chunk we care about then check if we can still retrieve it from disk.
154+
func TestGetFrequentlyAccessedChunkWontGetGarbageCollected(t *testing.T) {
155+
ldbCap := defaultGCRatio
156+
store, cleanup := setupLocalStore(t, ldbCap)
157+
defer cleanup()
158+
159+
var chunks []Chunk
160+
for i := 0; i < ldbCap; i++ {
161+
chunks = append(chunks, GenerateRandomChunk(ch.DefaultSize))
162+
}
163+
164+
mostAccessed := chunks[0].Address()
165+
for _, chunk := range chunks {
166+
if err := store.Put(context.Background(), chunk); err != nil {
167+
t.Fatal(err)
168+
}
169+
170+
if _, err := store.Get(context.Background(), mostAccessed); err != nil {
171+
t.Fatal(err)
172+
}
173+
// Add time for MarkAccessed() to be able to finish in a separate Goroutine
174+
time.Sleep(1 * time.Millisecond)
175+
}
176+
177+
store.DbStore.collectGarbage()
178+
if _, err := store.DbStore.Get(context.Background(), mostAccessed); err != nil {
179+
t.Logf("most frequntly accessed chunk not found on disk (key: %v)", mostAccessed)
180+
t.Fatal(err)
181+
}
182+
183+
}
184+
185+
func setupLocalStore(t *testing.T, ldbCap int) (ls *LocalStore, cleanup func()) {
186+
t.Helper()
187+
188+
var err error
189+
datadir, err := ioutil.TempDir("", "storage")
190+
if err != nil {
191+
t.Fatal(err)
192+
}
193+
194+
params := &LocalStoreParams{
195+
StoreParams: NewStoreParams(uint64(ldbCap), uint(ldbCap), nil, nil),
196+
}
197+
params.Init(datadir)
198+
199+
store, err := NewLocalStore(params, nil)
200+
if err != nil {
201+
_ = os.RemoveAll(datadir)
202+
t.Fatal(err)
203+
}
204+
205+
cleanup = func() {
206+
store.Close()
207+
_ = os.RemoveAll(datadir)
208+
}
209+
210+
return store, cleanup
211+
}

0 commit comments

Comments
 (0)