@@ -252,12 +252,7 @@ func (s *DbStore) collectGarbage(ratio float32) {
252
252
// actual gc
253
253
for i := 0 ; i < gcnt ; i ++ {
254
254
if s .gcArray [i ].value <= cutval {
255
- batch := new (leveldb.Batch )
256
- batch .Delete (s .gcArray [i ].idxKey )
257
- batch .Delete (getDataKey (s .gcArray [i ].idx ))
258
- s .entryCnt --
259
- batch .Put (keyEntryCnt , U64ToBytes (s .entryCnt ))
260
- s .db .Write (batch )
255
+ s .delete (s .gcArray [i ].idx , s .gcArray [i ].idxKey )
261
256
}
262
257
}
263
258
@@ -266,6 +261,52 @@ func (s *DbStore) collectGarbage(ratio float32) {
266
261
s .db .Put (keyGCPos , s .gcPos )
267
262
}
268
263
264
+ func (s * DbStore ) Cleanup () {
265
+ //Iterates over the database and checks that there are no faulty chunks
266
+ it := s .db .NewIterator ()
267
+ startPosition := []byte {kpIndex }
268
+ it .Seek (startPosition )
269
+ var key []byte
270
+ var errorsFound , total int
271
+ for it .Valid () {
272
+ key = it .Key ()
273
+ if (key == nil ) || (key [0 ] != kpIndex ) {
274
+ break
275
+ }
276
+ total ++
277
+ var index dpaDBIndex
278
+ decodeIndex (it .Value (), & index )
279
+
280
+ data , err := s .db .Get (getDataKey (index .Idx ))
281
+ if err != nil {
282
+ glog .V (logger .Warn ).Infof ("Chunk %x found but could not be accessed: %v" , key [:], err )
283
+ s .delete (index .Idx , getIndexKey (key [1 :]))
284
+ errorsFound ++
285
+ } else {
286
+ hasher := s .hashfunc ()
287
+ hasher .Write (data )
288
+ hash := hasher .Sum (nil )
289
+ if ! bytes .Equal (hash , key [1 :]) {
290
+ glog .V (logger .Warn ).Infof ("Found invalid chunk. Hash mismatch. hash=%x, key=%x" , hash , key [:])
291
+ s .delete (index .Idx , getIndexKey (key [1 :]))
292
+ errorsFound ++
293
+ }
294
+ }
295
+ it .Next ()
296
+ }
297
+ it .Release ()
298
+ glog .V (logger .Warn ).Infof ("Found %v errors out of %v entries" , errorsFound , total )
299
+ }
300
+
301
+ func (s * DbStore ) delete (idx uint64 , idxKey []byte ) {
302
+ batch := new (leveldb.Batch )
303
+ batch .Delete (idxKey )
304
+ batch .Delete (getDataKey (idx ))
305
+ s .entryCnt --
306
+ batch .Put (keyEntryCnt , U64ToBytes (s .entryCnt ))
307
+ s .db .Write (batch )
308
+ }
309
+
269
310
func (s * DbStore ) Counter () uint64 {
270
311
s .lock .Lock ()
271
312
defer s .lock .Unlock ()
@@ -283,6 +324,7 @@ func (s *DbStore) Put(chunk *Chunk) {
283
324
if chunk .dbStored != nil {
284
325
close (chunk .dbStored )
285
326
}
327
+ glog .V (logger .Detail ).Infof ("Storing to DB: chunk already exists, only update access" )
286
328
return // already exists, only update access
287
329
}
288
330
@@ -348,16 +390,17 @@ func (s *DbStore) Get(key Key) (chunk *Chunk, err error) {
348
390
var data []byte
349
391
data , err = s .db .Get (getDataKey (index .Idx ))
350
392
if err != nil {
393
+ glog .V (logger .Detail ).Infof ("DBStore: Chunk %v found but could not be accessed: %v" , key .Log (), err )
394
+ s .delete (index .Idx , getIndexKey (key ))
351
395
return
352
396
}
353
397
354
398
hasher := s .hashfunc ()
355
399
hasher .Write (data )
356
400
hash := hasher .Sum (nil )
357
401
if ! bytes .Equal (hash , key ) {
358
- s .db .Delete (getDataKey (index .Idx ))
359
- err = fmt .Errorf ("invalid chunk. hash=%x, key=%v" , hash , key [:])
360
- return
402
+ s .delete (index .Idx , getIndexKey (key ))
403
+ panic ("Invalid Chunk in Database. Please repair with command: 'swarm cleandb'" )
361
404
}
362
405
363
406
chunk = & Chunk {
0 commit comments