Skip to content

Commit 88168ff

Browse files
holisticodezelig
authored andcommitted
Stream subscriptions (#18355)
* swarm/network: eachBin now starts at kaddepth for nn * swarm/network: fix Kademlia.EachBin * swarm/network: fix kademlia.EachBin * swarm/network: correct EachBin implementation according to requirements * swarm/network: less addresses simplified tests * swarm: calc kad depth outside loop in EachBin test * swarm/network: removed printResults * swarm/network: cleanup imports * swarm/network: remove kademlia.EachBin; fix RequestSubscriptions and add unit test * swarm/network/stream: address PR comments * swarm/network/stream: package-wide subscriptionFunc * swarm/network/stream: refactor to kad.EachConn
1 parent d5cad48 commit 88168ff

File tree

5 files changed

+234
-320
lines changed

5 files changed

+234
-320
lines changed

swarm/network/kademlia.go

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -356,37 +356,6 @@ func (k *Kademlia) Off(p *Peer) {
356356
}
357357
}
358358

359-
// EachBin is a two level nested iterator
360-
// The outer iterator returns all bins that have known peers, in order from shallowest to deepest
361-
// The inner iterator returns all peers per bin returned by the outer iterator, in no defined order
362-
// TODO the po returned by the inner iterator is not reliable. However, it is not being used in this method
363-
func (k *Kademlia) EachBin(base []byte, pof pot.Pof, o int, eachBinFunc func(conn *Peer, po int) bool) {
364-
k.lock.RLock()
365-
defer k.lock.RUnlock()
366-
367-
var startPo int
368-
var endPo int
369-
kadDepth := depthForPot(k.conns, k.NeighbourhoodSize, k.base)
370-
371-
k.conns.EachBin(base, Pof, o, func(po, size int, f func(func(val pot.Val, i int) bool) bool) bool {
372-
if startPo > 0 && endPo != k.MaxProxDisplay {
373-
startPo = endPo + 1
374-
}
375-
if po < kadDepth {
376-
endPo = po
377-
} else {
378-
endPo = k.MaxProxDisplay
379-
}
380-
381-
for bin := startPo; bin <= endPo; bin++ {
382-
f(func(val pot.Val, _ int) bool {
383-
return eachBinFunc(val.(*Peer), bin)
384-
})
385-
}
386-
return true
387-
})
388-
}
389-
390359
// EachConn is an iterator with args (base, po, f) applies f to each live peer
391360
// that has proximity order po or less as measured from the base
392361
// if base is nil, kademlia base address is used

swarm/network/kademlia_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2017 The go-ethereum Authors
1+
// Copyright 2018 The go-ethereum Authors
22
// This file is part of the go-ethereum library.
33
//
44
// The go-ethereum library is free software: you can redistribute it and/or modify

swarm/network/stream/snapshot_sync_test.go

Lines changed: 0 additions & 266 deletions
Original file line numberDiff line numberDiff line change
@@ -106,43 +106,6 @@ func TestSyncingViaGlobalSync(t *testing.T) {
106106
}
107107
}
108108

109-
func TestSyncingViaDirectSubscribe(t *testing.T) {
110-
if runtime.GOOS == "darwin" && os.Getenv("TRAVIS") == "true" {
111-
t.Skip("Flaky on mac on travis")
112-
}
113-
//if nodes/chunks have been provided via commandline,
114-
//run the tests with these values
115-
if *nodes != 0 && *chunks != 0 {
116-
log.Info(fmt.Sprintf("Running test with %d chunks and %d nodes...", *chunks, *nodes))
117-
err := testSyncingViaDirectSubscribe(t, *chunks, *nodes)
118-
if err != nil {
119-
t.Fatal(err)
120-
}
121-
} else {
122-
var nodeCnt []int
123-
var chnkCnt []int
124-
//if the `longrunning` flag has been provided
125-
//run more test combinations
126-
if *longrunning {
127-
chnkCnt = []int{1, 8, 32, 256, 1024}
128-
nodeCnt = []int{32, 16}
129-
} else {
130-
//default test
131-
chnkCnt = []int{4, 32}
132-
nodeCnt = []int{32, 16}
133-
}
134-
for _, chnk := range chnkCnt {
135-
for _, n := range nodeCnt {
136-
log.Info(fmt.Sprintf("Long running test with %d chunks and %d nodes...", chnk, n))
137-
err := testSyncingViaDirectSubscribe(t, chnk, n)
138-
if err != nil {
139-
t.Fatal(err)
140-
}
141-
}
142-
}
143-
}
144-
}
145-
146109
var simServiceMap = map[string]simulation.ServiceFunc{
147110
"streamer": streamerFunc,
148111
}
@@ -323,235 +286,6 @@ func runSim(conf *synctestConfig, ctx context.Context, sim *simulation.Simulatio
323286
})
324287
}
325288

326-
/*
327-
The test generates the given number of chunks
328-
329-
For every chunk generated, the nearest node addresses
330-
are identified, we verify that the nodes closer to the
331-
chunk addresses actually do have the chunks in their local stores.
332-
333-
The test loads a snapshot file to construct the swarm network,
334-
assuming that the snapshot file identifies a healthy
335-
kademlia network. The snapshot should have 'streamer' in its service list.
336-
*/
337-
func testSyncingViaDirectSubscribe(t *testing.T, chunkCount int, nodeCount int) error {
338-
339-
sim := simulation.New(map[string]simulation.ServiceFunc{
340-
"streamer": func(ctx *adapters.ServiceContext, bucket *sync.Map) (s node.Service, cleanup func(), err error) {
341-
n := ctx.Config.Node()
342-
addr := network.NewAddr(n)
343-
store, datadir, err := createTestLocalStorageForID(n.ID(), addr)
344-
if err != nil {
345-
return nil, nil, err
346-
}
347-
bucket.Store(bucketKeyStore, store)
348-
localStore := store.(*storage.LocalStore)
349-
netStore, err := storage.NewNetStore(localStore, nil)
350-
if err != nil {
351-
return nil, nil, err
352-
}
353-
kad := network.NewKademlia(addr.Over(), network.NewKadParams())
354-
delivery := NewDelivery(kad, netStore)
355-
netStore.NewNetFetcherFunc = network.NewFetcherFactory(dummyRequestFromPeers, true).New
356-
357-
r := NewRegistry(addr.ID(), delivery, netStore, state.NewInmemoryStore(), &RegistryOptions{
358-
Retrieval: RetrievalDisabled,
359-
Syncing: SyncingRegisterOnly,
360-
}, nil)
361-
bucket.Store(bucketKeyRegistry, r)
362-
363-
fileStore := storage.NewFileStore(netStore, storage.NewFileStoreParams())
364-
bucket.Store(bucketKeyFileStore, fileStore)
365-
366-
cleanup = func() {
367-
os.RemoveAll(datadir)
368-
netStore.Close()
369-
r.Close()
370-
}
371-
372-
return r, cleanup, nil
373-
374-
},
375-
})
376-
defer sim.Close()
377-
378-
ctx, cancelSimRun := context.WithTimeout(context.Background(), 2*time.Minute)
379-
defer cancelSimRun()
380-
381-
conf := &synctestConfig{}
382-
//map of discover ID to indexes of chunks expected at that ID
383-
conf.idToChunksMap = make(map[enode.ID][]int)
384-
//map of overlay address to discover ID
385-
conf.addrToIDMap = make(map[string]enode.ID)
386-
//array where the generated chunk hashes will be stored
387-
conf.hashes = make([]storage.Address, 0)
388-
389-
err := sim.UploadSnapshot(fmt.Sprintf("testing/snapshot_%d.json", nodeCount))
390-
if err != nil {
391-
return err
392-
}
393-
394-
if _, err := sim.WaitTillHealthy(ctx); err != nil {
395-
return err
396-
}
397-
398-
disconnections := sim.PeerEvents(
399-
context.Background(),
400-
sim.NodeIDs(),
401-
simulation.NewPeerEventsFilter().Drop(),
402-
)
403-
404-
var disconnected atomic.Value
405-
go func() {
406-
for d := range disconnections {
407-
if d.Error != nil {
408-
log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
409-
disconnected.Store(true)
410-
}
411-
}
412-
}()
413-
414-
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) error {
415-
nodeIDs := sim.UpNodeIDs()
416-
for _, n := range nodeIDs {
417-
//get the kademlia overlay address from this ID
418-
a := n.Bytes()
419-
//append it to the array of all overlay addresses
420-
conf.addrs = append(conf.addrs, a)
421-
//the proximity calculation is on overlay addr,
422-
//the p2p/simulations check func triggers on enode.ID,
423-
//so we need to know which overlay addr maps to which nodeID
424-
conf.addrToIDMap[string(a)] = n
425-
}
426-
427-
var subscriptionCount int
428-
429-
filter := simulation.NewPeerEventsFilter().ReceivedMessages().Protocol("stream").MsgCode(4)
430-
eventC := sim.PeerEvents(ctx, nodeIDs, filter)
431-
432-
for j, node := range nodeIDs {
433-
log.Trace(fmt.Sprintf("Start syncing subscriptions: %d", j))
434-
//start syncing!
435-
item, ok := sim.NodeItem(node, bucketKeyRegistry)
436-
if !ok {
437-
return fmt.Errorf("No registry")
438-
}
439-
registry := item.(*Registry)
440-
441-
var cnt int
442-
cnt, err = startSyncing(registry, conf)
443-
if err != nil {
444-
return err
445-
}
446-
//increment the number of subscriptions we need to wait for
447-
//by the count returned from startSyncing (SYNC subscriptions)
448-
subscriptionCount += cnt
449-
}
450-
451-
for e := range eventC {
452-
if e.Error != nil {
453-
return e.Error
454-
}
455-
subscriptionCount--
456-
if subscriptionCount == 0 {
457-
break
458-
}
459-
}
460-
//select a random node for upload
461-
node := sim.Net.GetRandomUpNode()
462-
item, ok := sim.NodeItem(node.ID(), bucketKeyStore)
463-
if !ok {
464-
return fmt.Errorf("No localstore")
465-
}
466-
lstore := item.(*storage.LocalStore)
467-
hashes, err := uploadFileToSingleNodeStore(node.ID(), chunkCount, lstore)
468-
if err != nil {
469-
return err
470-
}
471-
conf.hashes = append(conf.hashes, hashes...)
472-
mapKeysToNodes(conf)
473-
474-
if _, err := sim.WaitTillHealthy(ctx); err != nil {
475-
return err
476-
}
477-
478-
var globalStore mock.GlobalStorer
479-
if *useMockStore {
480-
globalStore = mockmem.NewGlobalStore()
481-
}
482-
// File retrieval check is repeated until all uploaded files are retrieved from all nodes
483-
// or until the timeout is reached.
484-
REPEAT:
485-
for {
486-
for _, id := range nodeIDs {
487-
//for each expected chunk, check if it is in the local store
488-
localChunks := conf.idToChunksMap[id]
489-
for _, ch := range localChunks {
490-
//get the real chunk by the index in the index array
491-
chunk := conf.hashes[ch]
492-
log.Trace(fmt.Sprintf("node has chunk: %s:", chunk))
493-
//check if the expected chunk is indeed in the localstore
494-
var err error
495-
if *useMockStore {
496-
//use the globalStore if the mockStore should be used; in that case,
497-
//the complete localStore stack is bypassed for getting the chunk
498-
_, err = globalStore.Get(common.BytesToAddress(id.Bytes()), chunk)
499-
} else {
500-
//use the actual localstore
501-
item, ok := sim.NodeItem(id, bucketKeyStore)
502-
if !ok {
503-
return fmt.Errorf("Error accessing localstore")
504-
}
505-
lstore := item.(*storage.LocalStore)
506-
_, err = lstore.Get(ctx, chunk)
507-
}
508-
if err != nil {
509-
log.Debug(fmt.Sprintf("Chunk %s NOT found for id %s", chunk, id))
510-
// Do not get crazy with logging the warn message
511-
time.Sleep(500 * time.Millisecond)
512-
continue REPEAT
513-
}
514-
log.Debug(fmt.Sprintf("Chunk %s IS FOUND for id %s", chunk, id))
515-
}
516-
}
517-
return nil
518-
}
519-
})
520-
521-
if result.Error != nil {
522-
return result.Error
523-
}
524-
525-
if yes, ok := disconnected.Load().(bool); ok && yes {
526-
t.Fatal("disconnect events received")
527-
}
528-
log.Info("Simulation ended")
529-
return nil
530-
}
531-
532-
//the server func to start syncing
533-
//issues `RequestSubscriptionMsg` to peers, based on po, by iterating over
534-
//the kademlia's `EachBin` function.
535-
//returns the number of subscriptions requested
536-
func startSyncing(r *Registry, conf *synctestConfig) (int, error) {
537-
var err error
538-
kad := r.delivery.kad
539-
subCnt := 0
540-
//iterate over each bin and solicit needed subscription to bins
541-
kad.EachBin(r.addr[:], pof, 0, func(conn *network.Peer, po int) bool {
542-
//identify begin and start index of the bin(s) we want to subscribe to
543-
subCnt++
544-
err = r.RequestSubscription(conf.addrToIDMap[string(conn.Address())], NewStream("SYNC", FormatSyncBinKey(uint8(po)), true), NewRange(0, 0), High)
545-
if err != nil {
546-
log.Error(fmt.Sprintf("Error in RequestSubsciption! %v", err))
547-
return false
548-
}
549-
return true
550-
551-
})
552-
return subCnt, nil
553-
}
554-
555289
//map chunk keys to addresses which are responsible
556290
func mapKeysToNodes(conf *synctestConfig) {
557291
nodemap := make(map[string][]int)

0 commit comments

Comments
 (0)