Skip to content

Commit 5e982b6

Browse files
authored
chunk, storage: chunk.Store multiple chunk Put (ethersphere#1681)
1 parent a0aefcf commit 5e982b6

File tree

10 files changed

+578
-310
lines changed

10 files changed

+578
-310
lines changed

chunk/chunk.go

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ func (d *Descriptor) String() string {
243243

244244
type Store interface {
245245
Get(ctx context.Context, mode ModeGet, addr Address) (ch Chunk, err error)
246-
Put(ctx context.Context, mode ModePut, ch Chunk) (exists bool, err error)
246+
Put(ctx context.Context, mode ModePut, chs ...Chunk) (exist []bool, err error)
247247
Has(ctx context.Context, addr Address) (yes bool, err error)
248248
Set(ctx context.Context, mode ModeSet, addr Address) (err error)
249249
LastPullSubscriptionBinID(bin uint8) (id uint64, err error)
@@ -256,7 +256,7 @@ type Validator interface {
256256
Validate(ch Chunk) bool
257257
}
258258

259-
// ValidatorStore encapsulates Store by decorting the Put method
259+
// ValidatorStore encapsulates Store by decorating the Put method
260260
// with validators check.
261261
type ValidatorStore struct {
262262
Store
@@ -272,14 +272,25 @@ func NewValidatorStore(store Store, validators ...Validator) (s *ValidatorStore)
272272
}
273273
}
274274

275-
// Put overrides Store put method with validators check. If one of the validators
276-
// return true, the chunk is considered valid and Store Put method is called.
277-
// If all validators return false, ErrChunkInvalid is returned.
278-
func (s *ValidatorStore) Put(ctx context.Context, mode ModePut, ch Chunk) (exists bool, err error) {
275+
// Put overrides Store put method with validators check. For Put to succeed,
276+
// all provided chunks must be validated with true by one of the validators.
277+
func (s *ValidatorStore) Put(ctx context.Context, mode ModePut, chs ...Chunk) (exist []bool, err error) {
278+
for _, ch := range chs {
279+
if !s.validate(ch) {
280+
return nil, ErrChunkInvalid
281+
}
282+
}
283+
return s.Store.Put(ctx, mode, chs...)
284+
}
285+
286+
// validate returns true if one of the validators
287+
// return true. If all validators return false,
288+
// the chunk is considered invalid.
289+
func (s *ValidatorStore) validate(ch Chunk) bool {
279290
for _, v := range s.validators {
280291
if v.Validate(ch) {
281-
return s.Store.Put(ctx, mode, ch)
292+
return true
282293
}
283294
}
284-
return false, ErrChunkInvalid
295+
return false
285296
}

network/stream/common_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -227,10 +227,10 @@ func (rrs *roundRobinStore) Get(_ context.Context, _ chunk.ModeGet, _ storage.Ad
227227
return nil, errors.New("roundRobinStore doesn't support Get")
228228
}
229229

230-
func (rrs *roundRobinStore) Put(ctx context.Context, mode chunk.ModePut, ch storage.Chunk) (bool, error) {
230+
func (rrs *roundRobinStore) Put(ctx context.Context, mode chunk.ModePut, chs ...storage.Chunk) ([]bool, error) {
231231
i := atomic.AddUint32(&rrs.index, 1)
232232
idx := int(i) % len(rrs.stores)
233-
return rrs.stores[idx].Put(ctx, mode, ch)
233+
return rrs.stores[idx].Put(ctx, mode, chs...)
234234
}
235235

236236
func (rrs *roundRobinStore) Set(ctx context.Context, mode chunk.ModeSet, addr chunk.Address) (err error) {

storage/common_test.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -225,12 +225,17 @@ func NewMapChunkStore() *MapChunkStore {
225225
}
226226
}
227227

228-
func (m *MapChunkStore) Put(_ context.Context, _ chunk.ModePut, ch Chunk) (bool, error) {
228+
func (m *MapChunkStore) Put(_ context.Context, _ chunk.ModePut, chs ...Chunk) ([]bool, error) {
229229
m.mu.Lock()
230230
defer m.mu.Unlock()
231-
_, exists := m.chunks[ch.Address().Hex()]
232-
m.chunks[ch.Address().Hex()] = ch
233-
return exists, nil
231+
232+
exist := make([]bool, len(chs))
233+
for i, ch := range chs {
234+
addr := ch.Address().Hex()
235+
_, exist[i] = m.chunks[addr]
236+
m.chunks[addr] = ch
237+
}
238+
return exist, nil
234239
}
235240

236241
func (m *MapChunkStore) Get(_ context.Context, _ chunk.ModeGet, ref Address) (Chunk, error) {

storage/hasherstore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ func (h *hasherStore) storeChunk(ctx context.Context, ch Chunk) {
281281
}()
282282
seen, err := h.store.Put(ctx, chunk.ModePutUpload, ch)
283283
h.tag.Inc(chunk.StateStored)
284-
if seen {
284+
if err != nil && seen[0] {
285285
h.tag.Inc(chunk.StateSeen)
286286
}
287287
select {

storage/localstore/gc_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import (
3232
// TestDB_collectGarbageWorker tests garbage collection runs
3333
// by uploading and syncing a number of chunks.
3434
func TestDB_collectGarbageWorker(t *testing.T) {
35-
testDB_collectGarbageWorker(t)
35+
testDBCollectGarbageWorker(t)
3636
}
3737

3838
// TestDB_collectGarbageWorker_multipleBatches tests garbage
@@ -44,13 +44,12 @@ func TestDB_collectGarbageWorker_multipleBatches(t *testing.T) {
4444
defer func(s uint64) { gcBatchSize = s }(gcBatchSize)
4545
gcBatchSize = 2
4646

47-
testDB_collectGarbageWorker(t)
47+
testDBCollectGarbageWorker(t)
4848
}
4949

50-
// testDB_collectGarbageWorker is a helper test function to test
50+
// testDBCollectGarbageWorker is a helper test function to test
5151
// garbage collection runs by uploading and syncing a number of chunks.
52-
func testDB_collectGarbageWorker(t *testing.T) {
53-
t.Helper()
52+
func testDBCollectGarbageWorker(t *testing.T) {
5453

5554
chunkCount := 150
5655

storage/localstore/localstore_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,16 @@ func generateTestRandomChunk() chunk.Chunk {
185185
return chunk.NewChunk(key, data)
186186
}
187187

188+
// generateTestRandomChunks generates a slice of random
189+
// Chunks by using generateTestRandomChunk function.
190+
func generateTestRandomChunks(count int) []chunk.Chunk {
191+
chunks := make([]chunk.Chunk, count)
192+
for i := 0; i < count; i++ {
193+
chunks[i] = generateTestRandomChunk()
194+
}
195+
return chunks
196+
}
197+
188198
// TestGenerateTestRandomChunk validates that
189199
// generateTestRandomChunk returns random data by comparing
190200
// two generated chunks.
@@ -219,6 +229,8 @@ func TestGenerateTestRandomChunk(t *testing.T) {
219229
// chunk values are in the retrieval indexes.
220230
func newRetrieveIndexesTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) {
221231
return func(t *testing.T) {
232+
t.Helper()
233+
222234
item, err := db.retrievalDataIndex.Get(addressToItem(chunk.Address()))
223235
if err != nil {
224236
t.Fatal(err)
@@ -238,6 +250,8 @@ func newRetrieveIndexesTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTim
238250
// chunk values are in the retrieval indexes when access time must be stored.
239251
func newRetrieveIndexesTestWithAccess(db *DB, ch chunk.Chunk, storeTimestamp, accessTimestamp int64) func(t *testing.T) {
240252
return func(t *testing.T) {
253+
t.Helper()
254+
241255
item, err := db.retrievalDataIndex.Get(addressToItem(ch.Address()))
242256
if err != nil {
243257
t.Fatal(err)
@@ -258,6 +272,8 @@ func newRetrieveIndexesTestWithAccess(db *DB, ch chunk.Chunk, storeTimestamp, ac
258272
// chunk values are in the pull index.
259273
func newPullIndexTest(db *DB, ch chunk.Chunk, binID uint64, wantError error) func(t *testing.T) {
260274
return func(t *testing.T) {
275+
t.Helper()
276+
261277
item, err := db.pullIndex.Get(shed.Item{
262278
Address: ch.Address(),
263279
BinID: binID,
@@ -275,6 +291,8 @@ func newPullIndexTest(db *DB, ch chunk.Chunk, binID uint64, wantError error) fun
275291
// chunk values are in the push index.
276292
func newPushIndexTest(db *DB, ch chunk.Chunk, storeTimestamp int64, wantError error) func(t *testing.T) {
277293
return func(t *testing.T) {
294+
t.Helper()
295+
278296
item, err := db.pushIndex.Get(shed.Item{
279297
Address: ch.Address(),
280298
StoreTimestamp: storeTimestamp,
@@ -292,6 +310,8 @@ func newPushIndexTest(db *DB, ch chunk.Chunk, storeTimestamp int64, wantError er
292310
// chunk values are in the push index.
293311
func newGCIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp int64, binID uint64) func(t *testing.T) {
294312
return func(t *testing.T) {
313+
t.Helper()
314+
295315
item, err := db.gcIndex.Get(shed.Item{
296316
Address: chunk.Address(),
297317
BinID: binID,
@@ -308,6 +328,8 @@ func newGCIndexTest(db *DB, chunk chunk.Chunk, storeTimestamp, accessTimestamp i
308328
// an index contains expected number of key/value pairs.
309329
func newItemsCountTest(i shed.Index, want int) func(t *testing.T) {
310330
return func(t *testing.T) {
331+
t.Helper()
332+
311333
var c int
312334
err := i.Iterate(func(item shed.Item) (stop bool, err error) {
313335
c++
@@ -326,6 +348,8 @@ func newItemsCountTest(i shed.Index, want int) func(t *testing.T) {
326348
// value is the same as the number of items in DB.gcIndex.
327349
func newIndexGCSizeTest(db *DB) func(t *testing.T) {
328350
return func(t *testing.T) {
351+
t.Helper()
352+
329353
var want uint64
330354
err := db.gcIndex.Iterate(func(item shed.Item) (stop bool, err error) {
331355
want++
@@ -354,6 +378,8 @@ type testIndexChunk struct {
354378
// testItemsOrder tests the order of chunks in the index. If sortFunc is not nil,
355379
// chunks will be sorted with it before validation.
356380
func testItemsOrder(t *testing.T, i shed.Index, chunks []testIndexChunk, sortFunc func(i, j int) (less bool)) {
381+
t.Helper()
382+
357383
newItemsCountTest(i, len(chunks))(t)
358384

359385
if sortFunc != nil {

0 commit comments

Comments
 (0)