Skip to content

Commit 11d0ff6

Browse files
holisticodezelig
authored andcommitted
Fix retrieval tests and simulation backends (#17723)
* swarm/network/stream: introduced visualized snapshot sync test * swarm/network/stream: non-existing hash visualization sim * swarm/network/stream: fixed retrieval tests; new backend for visualization * swarm/network/stream: cleanup of visualized_snapshot_sync_sim_test.go * swarm/network/stream: rebased PR on master * swarm/network/stream: fixed loop logic in retrieval tests * swarm/network/stream: fixed iterations for snapshot tests * swarm/network/stream: address PR comments * swarm/network/stream: addressed PR comments
1 parent 72a0768 commit 11d0ff6

File tree

5 files changed

+391
-221
lines changed

5 files changed

+391
-221
lines changed

p2p/simulations/events.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ type Event struct {
5858

5959
// Msg is set if the type is EventTypeMsg
6060
Msg *Msg `json:"msg,omitempty"`
61+
62+
//Optionally provide data (currently for simulation frontends only)
63+
Data interface{} `json:"data"`
6164
}
6265

6366
// NewEvent creates a new event for the given object which should be either a

swarm/network/stream/snapshot_retrieval_test.go

Lines changed: 79 additions & 153 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,47 @@ func TestRetrieval(t *testing.T) {
104104
}
105105
}
106106

107-
/*
107+
var retrievalSimServiceMap = map[string]simulation.ServiceFunc{
108+
"streamer": retrievalStreamerFunc,
109+
}
110+
111+
func retrievalStreamerFunc(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
112+
n := ctx.Config.Node()
113+
addr := network.NewAddr(n)
114+
store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
115+
if err != nil {
116+
return nil, nil, err
117+
}
118+
bucket.Store(bucketKeyStore, store)
119+
120+
localStore := store.(*storage.LocalStore)
121+
netStore, err := storage.NewNetStore(localStore, nil)
122+
if err != nil {
123+
return nil, nil, err
124+
}
125+
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
126+
delivery := NewDelivery(kad, netStore)
127+
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
128+
129+
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
130+
DoSync: true,
131+
SyncUpdateDelay: 3 * time.Second,
132+
DoRetrieve: true,
133+
})
134+
135+
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
136+
bucket.Store(bucketKeyFileStore, fileStore)
137+
138+
cleanup = func() {
139+
os.RemoveAll(datadir)
140+
netStore.Close()
141+
r.Close()
142+
}
143+
144+
return r, cleanup, nil
145+
}
108146

147+
/*
109148
The test loads a snapshot file to construct the swarm network,
110149
assuming that the snapshot file identifies a healthy
111150
kademlia network. Nevertheless a health check runs in the
@@ -114,43 +153,7 @@ simulation's `action` function.
114153
The snapshot should have 'streamer' in its service list.
115154
*/
116155
func runFileRetrievalTest(nodeCount int) error {
117-
sim := simulation.New(map[string]simulation.ServiceFunc{
118-
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
119-
node := ctx.Config.Node()
120-
addr := network.NewAddr(node)
121-
store, datadir, err := createTestLocalStorageForID(node.ID(), addr)
122-
if err != nil {
123-
return nil, nil, err
124-
}
125-
bucket.Store(bucketKeyStore, store)
126-
127-
localStore := store.(*storage.LocalStore)
128-
netStore, err := storage.NewNetStore(localStore, nil)
129-
if err != nil {
130-
return nil, nil, err
131-
}
132-
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
133-
delivery := NewDelivery(kad, netStore)
134-
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
135-
136-
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
137-
DoSync: true,
138-
SyncUpdateDelay: 3 * time.Second,
139-
})
140-
141-
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
142-
bucket.Store(bucketKeyFileStore, fileStore)
143-
144-
cleanup = func() {
145-
os.RemoveAll(datadir)
146-
netStore.Close()
147-
r.Close()
148-
}
149-
150-
return r, cleanup, nil
151-
152-
},
153-
})
156+
sim := simulation.New(retrievalSimServiceMap)
154157
defer sim.Close()
155158

156159
log.Info("Initializing test config")
@@ -200,49 +203,29 @@ func runFileRetrievalTest(nodeCount int) error {
200203

201204
// File retrieval check is repeated until all uploaded files are retrieved from all nodes
202205
// or until the timeout is reached.
203-
allSuccess := false
204-
for !allSuccess {
206+
REPEAT:
207+
for {
205208
for _, id := range nodeIDs {
206-
//for each expected chunk, check if it is in the local store
207-
localChunks := conf.idToChunksMap[id]
208-
localSuccess := true
209-
for _, ch := range localChunks {
210-
//get the real chunk by the index in the index array
211-
chunk := conf.hashes[ch]
212-
log.Trace(fmt.Sprintf("node has chunk: %s:", chunk))
213-
//check if the expected chunk is indeed in the localstore
214-
var err error
215-
//check on the node's FileStore (netstore)
216-
item, ok := sim.NodeItem(id, bucketKeyFileStore)
217-
if !ok {
218-
return fmt.Errorf("No registry")
219-
}
220-
fileStore := item.(*storage.FileStore)
221-
//check all chunks
222-
for i, hash := range conf.hashes {
223-
reader, _ := fileStore.Retrieve(context.TODO(), hash)
224-
//check that we can read the file size and that it corresponds to the generated file size
225-
if s, err := reader.Size(ctx, nil); err != nil || s != int64(len(randomFiles[i])) {
226-
allSuccess = false
227-
log.Warn("Retrieve error", "err", err, "hash", hash, "nodeId", id)
228-
} else {
229-
log.Debug(fmt.Sprintf("File with root hash %x successfully retrieved", hash))
230-
}
231-
}
232-
if err != nil {
233-
log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
234-
localSuccess = false
235-
} else {
236-
log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
209+
//for each expected file, check if it is in the local store
210+
item, ok := sim.NodeItem(id, bucketKeyFileStore)
211+
if !ok {
212+
return fmt.Errorf("No filestore")
213+
}
214+
fileStore := item.(*storage.FileStore)
215+
//check all chunks
216+
for i, hash := range conf.hashes {
217+
reader, _ := fileStore.Retrieve(context.TODO(), hash)
218+
//check that we can read the file size and that it corresponds to the generated file size
219+
if s, err := reader.Size(ctx, nil); err != nil || s != int64(len(randomFiles[i])) {
220+
log.Warn("Retrieve error", "err", err, "hash", hash, "nodeId", id)
221+
time.Sleep(500 * time.Millisecond)
222+
continue REPEAT
237223
}
224+
log.Debug(fmt.Sprintf("File with root hash %x successfully retrieved", hash))
238225
}
239-
allSuccess = localSuccess
240226
}
227+
return nil
241228
}
242-
if !allSuccess {
243-
return fmt.Errorf("Not all chunks succeeded!")
244-
}
245-
return nil
246229
})
247230

248231
if result.Error != nil {
@@ -263,44 +246,7 @@ simulation's `action` function.
263246
The snapshot should have 'streamer' in its service list.
264247
*/
265248
func runRetrievalTest(chunkCount int, nodeCount int) error {
266-
sim := simulation.New(map[string]simulation.ServiceFunc{
267-
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
268-
node := ctx.Config.Node()
269-
addr := network.NewAddr(node)
270-
store, datadir, err := createTestLocalStorageForID(node.ID(), addr)
271-
if err != nil {
272-
return nil, nil, err
273-
}
274-
bucket.Store(bucketKeyStore, store)
275-
276-
localStore := store.(*storage.LocalStore)
277-
netStore, err := storage.NewNetStore(localStore, nil)
278-
if err != nil {
279-
return nil, nil, err
280-
}
281-
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
282-
delivery := NewDelivery(kad, netStore)
283-
netStore.NewNetFetcherFunc = network.NewFetcherFactory(delivery.RequestFromPeers, true).New
284-
285-
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
286-
DoSync: true,
287-
SyncUpdateDelay: 0,
288-
})
289-
290-
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
291-
bucketKeyFileStore = simulation.BucketKey("filestore")
292-
bucket.Store(bucketKeyFileStore, fileStore)
293-
294-
cleanup = func() {
295-
os.RemoveAll(datadir)
296-
netStore.Close()
297-
r.Close()
298-
}
299-
300-
return r, cleanup, nil
301-
302-
},
303-
})
249+
sim := simulation.New(retrievalSimServiceMap)
304250
defer sim.Close()
305251

306252
conf := &synctestConfig{}
@@ -330,8 +276,6 @@ func runRetrievalTest(chunkCount int, nodeCount int) error {
330276
conf.addrToIDMap[string(a)] = n
331277
}
332278

333-
//an array for the random files
334-
var randomFiles []string
335279
//this is the node selected for upload
336280
node := sim.RandomUpNode()
337281
item, ok := sim.NodeItem(node.ID, bucketKeyStore)
@@ -349,49 +293,31 @@ func runRetrievalTest(chunkCount int, nodeCount int) error {
349293

350294
// File retrieval check is repeated until all uploaded files are retrieved from all nodes
351295
// or until the timeout is reached.
352-
allSuccess := false
353-
for !allSuccess {
296+
REPEAT:
297+
for {
354298
for _, id := range nodeIDs {
355299
//for each expected chunk, check if it is in the local store
356-
localChunks := conf.idToChunksMap[id]
357-
localSuccess := true
358-
for _, ch := range localChunks {
359-
//get the real chunk by the index in the index array
360-
chunk := conf.hashes[ch]
361-
log.Trace(fmt.Sprintf("node has chunk: %s:", chunk))
362-
//check if the expected chunk is indeed in the localstore
363-
var err error
364-
//check on the node's FileStore (netstore)
365-
item, ok := sim.NodeItem(id, bucketKeyFileStore)
366-
if !ok {
367-
return fmt.Errorf("No registry")
368-
}
369-
fileStore := item.(*storage.FileStore)
370-
//check all chunks
371-
for i, hash := range conf.hashes {
372-
reader, _ := fileStore.Retrieve(context.TODO(), hash)
373-
//check that we can read the file size and that it corresponds to the generated file size
374-
if s, err := reader.Size(ctx, nil); err != nil || s != int64(len(randomFiles[i])) {
375-
allSuccess = false
376-
log.Warn("Retrieve error", "err", err, "hash", hash, "nodeId", id)
377-
} else {
378-
log.Debug(fmt.Sprintf("File with root hash %x successfully retrieved", hash))
379-
}
380-
}
381-
if err != nil {
382-
log.Warn(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
383-
localSuccess = false
384-
} else {
385-
log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
300+
//check on the node's FileStore (netstore)
301+
item, ok := sim.NodeItem(id, bucketKeyFileStore)
302+
if !ok {
303+
return fmt.Errorf("No filestore")
304+
}
305+
fileStore := item.(*storage.FileStore)
306+
//check all chunks
307+
for _, hash := range conf.hashes {
308+
reader, _ := fileStore.Retrieve(context.TODO(), hash)
309+
//check that we can read the chunk size and that it corresponds to the generated chunk size
310+
if s, err := reader.Size(ctx, nil); err != nil || s != int64(chunkSize) {
311+
log.Warn("Retrieve error", "err", err, "hash", hash, "nodeId", id, "size", s)
312+
time.Sleep(500 * time.Millisecond)
313+
continue REPEAT
386314
}
315+
log.Debug(fmt.Sprintf("Chunk with root hash %x successfully retrieved", hash))
387316
}
388-
allSuccess = localSuccess
389317
}
318+
// all nodes and files found, exit loop and return without error
319+
return nil
390320
}
391-
if !allSuccess {
392-
return fmt.Errorf("Not all chunks succeeded!")
393-
}
394-
return nil
395321
})
396322

397323
if result.Error != nil {

0 commit comments

Comments
 (0)