Skip to content

Commit ca56afd

Browse files
authored
chunk, shed, storage: chunk.Store GetMulti method (ethersphere#1691)
1 parent 06a923e commit ca56afd

File tree

11 files changed

+375
-29
lines changed

11 files changed

+375
-29
lines changed

chunk/chunk.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ func (d *Descriptor) String() string {
243243

244244
type Store interface {
245245
Get(ctx context.Context, mode ModeGet, addr Address) (ch Chunk, err error)
246+
GetMulti(ctx context.Context, mode ModeGet, addrs ...Address) (ch []Chunk, err error)
246247
Put(ctx context.Context, mode ModePut, chs ...Chunk) (exist []bool, err error)
247248
Has(ctx context.Context, addr Address) (yes bool, err error)
248249
Set(ctx context.Context, mode ModeSet, addr Address) (err error)

network/stream/common_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,10 @@ func (rrs *roundRobinStore) Get(_ context.Context, _ chunk.ModeGet, _ storage.Ad
227227
return nil, errors.New("roundRobinStore doesn't support Get")
228228
}
229229

230+
func (rrs *roundRobinStore) GetMulti(_ context.Context, _ chunk.ModeGet, _ ...storage.Address) ([]storage.Chunk, error) {
231+
return nil, errors.New("roundRobinStore doesn't support GetMulti")
232+
}
233+
230234
func (rrs *roundRobinStore) Put(ctx context.Context, mode chunk.ModePut, chs ...storage.Chunk) ([]bool, error) {
231235
i := atomic.AddUint32(&rrs.index, 1)
232236
idx := int(i) % len(rrs.stores)

shed/index.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,37 @@ func (f Index) Get(keyFields Item) (out Item, err error) {
147147
return out.Merge(keyFields), nil
148148
}
149149

150+
// Fill populates fields on provided items that are part of the
151+
// encoded value by getting them based on information passed in their
152+
// fields. Every item must have all fields needed for encoding the
153+
// key set. The passed slice items will be changed so that they
154+
// contain data from the index values. No new slice is allocated.
155+
// This function uses a single leveldb snapshot.
156+
func (f Index) Fill(items []Item) (err error) {
157+
snapshot, err := f.db.ldb.GetSnapshot()
158+
if err != nil {
159+
return err
160+
}
161+
defer snapshot.Release()
162+
163+
for i, item := range items {
164+
key, err := f.encodeKeyFunc(item)
165+
if err != nil {
166+
return err
167+
}
168+
value, err := snapshot.Get(key, nil)
169+
if err != nil {
170+
return err
171+
}
172+
v, err := f.decodeValueFunc(item, value)
173+
if err != nil {
174+
return err
175+
}
176+
items[i] = v.Merge(item)
177+
}
178+
return nil
179+
}
180+
150181
// Has accepts key fields represented as Item to check
151182
// if there this Item's encoded key is stored in
152183
// the index.

shed/index_test.go

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ var retrievalIndexFuncs = IndexFuncs{
4949
},
5050
}
5151

52-
// TestIndex validates put, get, has and delete functions of the Index implementation.
52+
// TestIndex validates put, get, fill, has and delete functions of the Index implementation.
5353
func TestIndex(t *testing.T) {
5454
db, cleanupFunc := newTestDB(t)
5555
defer cleanupFunc()
@@ -283,6 +283,68 @@ func TestIndex(t *testing.T) {
283283
t.Fatalf("got error %v, want %v", err, wantErr)
284284
}
285285
})
286+
287+
t.Run("fill", func(t *testing.T) {
288+
want := []Item{
289+
{
290+
Address: []byte("put-hash-1"),
291+
Data: []byte("DATA123"),
292+
StoreTimestamp: time.Now().UTC().UnixNano(),
293+
},
294+
{
295+
Address: []byte("put-hash-32"),
296+
Data: []byte("DATA124"),
297+
StoreTimestamp: time.Now().UTC().UnixNano(),
298+
},
299+
{
300+
Address: []byte("put-hash-42"),
301+
Data: []byte("DATA125"),
302+
StoreTimestamp: time.Now().UTC().UnixNano(),
303+
},
304+
{
305+
Address: []byte("put-hash-71"),
306+
Data: []byte("DATA126"),
307+
StoreTimestamp: time.Now().UTC().UnixNano(),
308+
},
309+
}
310+
311+
for _, item := range want {
312+
err := index.Put(item)
313+
if err != nil {
314+
t.Fatal(err)
315+
}
316+
}
317+
items := make([]Item, len(want))
318+
for i, w := range want {
319+
items[i] = Item{
320+
Address: w.Address,
321+
}
322+
}
323+
err = index.Fill(items)
324+
if err != nil {
325+
t.Fatal(err)
326+
}
327+
for i := range items {
328+
checkItem(t, items[i], want[i])
329+
}
330+
331+
t.Run("not found", func(t *testing.T) {
332+
items := make([]Item, len(want))
333+
for i, w := range want {
334+
items[i] = Item{
335+
Address: w.Address,
336+
}
337+
}
338+
items = append(items, Item{
339+
Address: []byte("put-hash-missing"),
340+
})
341+
want := leveldb.ErrNotFound
342+
err := index.Fill(items)
343+
if err != want {
344+
t.Errorf("got error %v, want %v", err, want)
345+
}
346+
})
347+
})
286348
}
287349

288350
// TestIndex_Iterate validates index Iterate

storage/common_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,19 @@ func (m *MapChunkStore) Get(_ context.Context, _ chunk.ModeGet, ref Address) (Ch
248248
return chunk, nil
249249
}
250250

251+
func (m *MapChunkStore) GetMulti(_ context.Context, _ chunk.ModeGet, refs ...Address) (chunks []Chunk, err error) {
252+
m.mu.RLock()
253+
defer m.mu.RUnlock()
254+
for _, ref := range refs {
255+
chunk := m.chunks[ref.Hex()]
256+
if chunk == nil {
257+
return nil, ErrChunkNotFound
258+
}
259+
chunks = append(chunks, chunk)
260+
}
261+
return chunks, nil
262+
}
263+
251264
// Need to implement Has from SyncChunkStore
252265
func (m *MapChunkStore) Has(ctx context.Context, ref Address) (has bool, err error) {
253266
m.mu.RLock()

storage/localstore/localstore_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,15 @@ func generateTestRandomChunks(count int) []chunk.Chunk {
195195
return chunks
196196
}
197197

198+
// chunkAddresses return chunk addresses of provided chunks.
199+
func chunkAddresses(chunks []chunk.Chunk) []chunk.Address {
200+
addrs := make([]chunk.Address, len(chunks))
201+
for i, ch := range chunks {
202+
addrs[i] = ch.Address()
203+
}
204+
return addrs
205+
}
206+
198207
// TestGenerateTestRandomChunk validates that
199208
// generateTestRandomChunk returns random data by comparing
200209
// two generated chunks.

storage/localstore/mode_get.go

Lines changed: 37 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -67,34 +67,7 @@ func (db *DB) get(mode chunk.ModeGet, addr chunk.Address) (out shed.Item, err er
6767
switch mode {
6868
// update the access timestamp and gc index
6969
case chunk.ModeGetRequest:
70-
if db.updateGCSem != nil {
71-
// wait before creating new goroutines
72-
// if updateGCSem buffer id full
73-
db.updateGCSem <- struct{}{}
74-
}
75-
db.updateGCWG.Add(1)
76-
go func() {
77-
defer db.updateGCWG.Done()
78-
if db.updateGCSem != nil {
79-
// free a spot in updateGCSem buffer
80-
// for a new goroutine
81-
defer func() { <-db.updateGCSem }()
82-
}
83-
84-
metricName := "localstore.updateGC"
85-
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
86-
defer totalTimeMetric(metricName, time.Now())
87-
88-
err := db.updateGC(out)
89-
if err != nil {
90-
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
91-
log.Error("localstore update gc", "err", err)
92-
}
93-
// if gc update hook is defined, call it
94-
if testHookUpdateGC != nil {
95-
testHookUpdateGC()
96-
}
97-
}()
70+
db.updateGCItems(out)
9871

9972
case chunk.ModeGetPin:
10073
pinnedItem, err := db.pinIndex.Get(item)
@@ -112,6 +85,42 @@ func (db *DB) get(mode chunk.ModeGet, addr chunk.Address) (out shed.Item, err er
11285
return out, nil
11386
}
11487

88+
// updateGCItems is called when ModeGetRequest is used
89+
// for Get or GetMulti to update access time and gc indexes
90+
// for all returned chunks.
91+
func (db *DB) updateGCItems(items ...shed.Item) {
92+
if db.updateGCSem != nil {
93+
// wait before creating new goroutines
94+
// if updateGCSem buffer id full
95+
db.updateGCSem <- struct{}{}
96+
}
97+
db.updateGCWG.Add(1)
98+
go func() {
99+
defer db.updateGCWG.Done()
100+
if db.updateGCSem != nil {
101+
// free a spot in updateGCSem buffer
102+
// for a new goroutine
103+
defer func() { <-db.updateGCSem }()
104+
}
105+
106+
metricName := "localstore.updateGC"
107+
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
108+
defer totalTimeMetric(metricName, time.Now())
109+
110+
for _, item := range items {
111+
err := db.updateGC(item)
112+
if err != nil {
113+
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
114+
log.Error("localstore update gc", "err", err)
115+
}
116+
}
117+
// if gc update hook is defined, call it
118+
if testHookUpdateGC != nil {
119+
testHookUpdateGC()
120+
}
121+
}()
122+
}
123+
115124
// updateGC updates garbage collection index for
116125
// a single item. Provided item is expected to have
117126
// only Address and Data fields with non zero values,
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Copyright 2019 The Swarm Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
package localstore
18+
19+
import (
20+
"context"
21+
"fmt"
22+
"time"
23+
24+
"github.com/ethereum/go-ethereum/metrics"
25+
"github.com/ethersphere/swarm/chunk"
26+
"github.com/ethersphere/swarm/shed"
27+
"github.com/syndtr/goleveldb/leveldb"
28+
)
29+
30+
// GetMulti returns chunks from the database. If one of the chunks is not found
31+
// chunk.ErrChunkNotFound will be returned. All required indexes will be updated
32+
// required by the Getter Mode. GetMulti is required to implement chunk.Store
33+
// interface.
34+
func (db *DB) GetMulti(ctx context.Context, mode chunk.ModeGet, addrs ...chunk.Address) (chunks []chunk.Chunk, err error) {
35+
metricName := fmt.Sprintf("localstore.GetMulti.%s", mode)
36+
37+
metrics.GetOrRegisterCounter(metricName, nil).Inc(1)
38+
defer totalTimeMetric(metricName, time.Now())
39+
40+
defer func() {
41+
if err != nil {
42+
metrics.GetOrRegisterCounter(metricName+".error", nil).Inc(1)
43+
}
44+
}()
45+
46+
out, err := db.getMulti(mode, addrs...)
47+
if err != nil {
48+
if err == leveldb.ErrNotFound {
49+
return nil, chunk.ErrChunkNotFound
50+
}
51+
return nil, err
52+
}
53+
chunks = make([]chunk.Chunk, len(out))
54+
for i, ch := range out {
55+
chunks[i] = chunk.NewChunk(ch.Address, ch.Data).WithPinCounter(ch.PinCounter)
56+
}
57+
return chunks, nil
58+
}
59+
60+
// getMulti returns Items from the retrieval index
61+
// and updates other indexes.
62+
func (db *DB) getMulti(mode chunk.ModeGet, addrs ...chunk.Address) (out []shed.Item, err error) {
63+
out = make([]shed.Item, len(addrs))
64+
for i, addr := range addrs {
65+
out[i].Address = addr
66+
}
67+
68+
err = db.retrievalDataIndex.Fill(out)
69+
if err != nil {
70+
return nil, err
71+
}
72+
73+
switch mode {
74+
// update the access timestamp and gc index
75+
case chunk.ModeGetRequest:
76+
db.updateGCItems(out...)
77+
78+
case chunk.ModeGetPin:
79+
err := db.pinIndex.Fill(out)
80+
if err != nil {
81+
return nil, err
82+
}
83+
84+
// no updates to indexes
85+
case chunk.ModeGetSync:
86+
case chunk.ModeGetLookup:
87+
default:
88+
return out, ErrInvalidMode
89+
}
90+
return out, nil
91+
}

0 commit comments

Comments
 (0)