Skip to content

Commit 3fd9021

Browse files
acudjanos
authored andcommitted
network: new stream! protocol and pull syncer implementation (ethersphere#1538)
1 parent da6ccfd commit 3fd9021

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

71 files changed

+4472
-6825
lines changed

api/http/test_server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func NewTestSwarmServer(t *testing.T, serverFunc func(*api.API, *pin.API) TestSe
5757
}
5858

5959
tags := chunk.NewTags()
60-
fileStore := storage.NewFileStore(localStore, storage.NewFileStoreParams(), tags)
60+
fileStore := storage.NewFileStore(localStore, localStore, storage.NewFileStoreParams(), tags)
6161

6262
// Swarm feeds test setup
6363
feedsDir, err := ioutil.TempDir("", "swarm-feeds-test")

api/inspector.go

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,29 @@ package api
1818

1919
import (
2020
"context"
21+
"encoding/json"
2122
"fmt"
2223
"strings"
2324
"time"
2425

2526
"github.com/ethereum/go-ethereum/metrics"
2627
"github.com/ethersphere/swarm/log"
2728
"github.com/ethersphere/swarm/network"
29+
stream "github.com/ethersphere/swarm/network/stream/v2"
2830
"github.com/ethersphere/swarm/storage"
2931
)
3032

33+
const InspectorIsPullSyncingTolerance = 15 * time.Second
34+
3135
type Inspector struct {
3236
api *API
3337
hive *network.Hive
3438
netStore *storage.NetStore
39+
stream *stream.Registry
3540
}
3641

37-
func NewInspector(api *API, hive *network.Hive, netStore *storage.NetStore) *Inspector {
38-
return &Inspector{api, hive, netStore}
42+
func NewInspector(api *API, hive *network.Hive, netStore *storage.NetStore, pullSyncer *stream.Registry) *Inspector {
43+
return &Inspector{api, hive, netStore, pullSyncer}
3944
}
4045

4146
// Hive prints the kademlia table
@@ -49,15 +54,12 @@ func (i *Inspector) KademliaInfo() network.KademliaInfo {
4954
}
5055

5156
func (i *Inspector) IsPullSyncing() bool {
52-
lastReceivedChunksMsg := metrics.GetOrRegisterGauge("network.stream.received_chunks", nil)
53-
54-
// last received chunks msg time
55-
lrct := time.Unix(0, lastReceivedChunksMsg.Value())
57+
t := i.stream.LastReceivedChunkTime()
5658

5759
// if last received chunks msg time is after now-15sec. (i.e. within the last 15sec.) then we say that the node is still syncing
5860
// technically this is not correct, because this might have been a retrieve request, but for the time being it works for our purposes
5961
// because we know we are not making retrieve requests on the node while checking this
60-
return lrct.After(time.Now().Add(-15 * time.Second))
62+
return t.After(time.Now().Add(-InspectorIsPullSyncingTolerance))
6163
}
6264

6365
// DeliveriesPerPeer returns the sum of chunks we received from a given peer
@@ -96,3 +98,15 @@ func (i *Inspector) Has(chunkAddresses []storage.Address) string {
9698

9799
return strings.Join(hostChunks, "")
98100
}
101+
102+
func (i *Inspector) PeerStreams() (string, error) {
103+
peerInfo, err := i.stream.PeerInfo()
104+
if err != nil {
105+
return "", err
106+
}
107+
v, err := json.Marshal(peerInfo)
108+
if err != nil {
109+
return "", err
110+
}
111+
return string(v), nil
112+
}

api/inspector_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package api
2+
3+
import (
4+
"crypto/rand"
5+
"encoding/hex"
6+
"io/ioutil"
7+
"os"
8+
"strings"
9+
"testing"
10+
11+
"github.com/ethersphere/swarm/network"
12+
stream "github.com/ethersphere/swarm/network/stream/v2"
13+
"github.com/ethersphere/swarm/storage"
14+
"github.com/ethersphere/swarm/storage/localstore"
15+
16+
"github.com/ethereum/go-ethereum/p2p/enode"
17+
"github.com/ethereum/go-ethereum/rpc"
18+
"github.com/ethersphere/swarm/state"
19+
)
20+
21+
// TestInspectorPeerStreams validates that response from RPC peerStream has at
22+
// least some data.
23+
func TestInspectorPeerStreams(t *testing.T) {
24+
dir, err := ioutil.TempDir("", "swarm-")
25+
if err != nil {
26+
t.Fatal(err)
27+
}
28+
defer os.RemoveAll(dir)
29+
30+
baseKey := make([]byte, 32)
31+
_, err = rand.Read(baseKey)
32+
if err != nil {
33+
t.Fatal(err)
34+
}
35+
36+
localStore, err := localstore.New(dir, baseKey, &localstore.Options{})
37+
if err != nil {
38+
t.Fatal(err)
39+
}
40+
netStore := storage.NewNetStore(localStore, baseKey, enode.ID{})
41+
42+
i := NewInspector(nil, nil, netStore, stream.New(state.NewInmemoryStore(), baseKey, stream.NewSyncProvider(netStore, network.NewKademlia(
43+
baseKey,
44+
network.NewKadParams(),
45+
), false, false)))
46+
47+
server := rpc.NewServer()
48+
if err := server.RegisterName("inspector", i); err != nil {
49+
t.Fatal(err)
50+
}
51+
52+
client := rpc.DialInProc(server)
53+
54+
var peerInfo string
55+
56+
err = client.Call(&peerInfo, "inspector_peerStreams")
57+
if err != nil {
58+
t.Fatal(err)
59+
}
60+
61+
if !strings.Contains(peerInfo, `"base":"`+hex.EncodeToString(baseKey)[:16]+`"`) {
62+
t.Error("missing base key in response")
63+
}
64+
65+
t.Log(peerInfo)
66+
}

api/manifest_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func manifest(paths ...string) (manifestReader storage.LazySectionReader) {
4343

4444
func testGetEntry(t *testing.T, path, match string, multiple bool, paths ...string) *manifestTrie {
4545
quitC := make(chan bool)
46-
fileStore := storage.NewFileStore(nil, storage.NewFileStoreParams(), chunk.NewTags())
46+
fileStore := storage.NewFileStore(nil, nil, storage.NewFileStoreParams(), chunk.NewTags())
4747
ref := make([]byte, fileStore.HashSize())
4848
trie, err := readManifest(manifest(paths...), ref, fileStore, false, quitC, NOOPDecrypt)
4949
if err != nil {
@@ -100,7 +100,7 @@ func TestGetEntry(t *testing.T) {
100100
func TestExactMatch(t *testing.T) {
101101
quitC := make(chan bool)
102102
mf := manifest("shouldBeExactMatch.css", "shouldBeExactMatch.css.map")
103-
fileStore := storage.NewFileStore(nil, storage.NewFileStoreParams(), chunk.NewTags())
103+
fileStore := storage.NewFileStore(nil, nil, storage.NewFileStoreParams(), chunk.NewTags())
104104
ref := make([]byte, fileStore.HashSize())
105105
trie, err := readManifest(mf, ref, fileStore, false, quitC, nil)
106106
if err != nil {
@@ -133,7 +133,7 @@ func TestAddFileWithManifestPath(t *testing.T) {
133133
reader := &storage.LazyTestSectionReader{
134134
SectionReader: io.NewSectionReader(bytes.NewReader(manifest), 0, int64(len(manifest))),
135135
}
136-
fileStore := storage.NewFileStore(nil, storage.NewFileStoreParams(), chunk.NewTags())
136+
fileStore := storage.NewFileStore(nil, nil, storage.NewFileStoreParams(), chunk.NewTags())
137137
ref := make([]byte, fileStore.HashSize())
138138
trie, err := readManifest(reader, ref, fileStore, false, nil, NOOPDecrypt)
139139
if err != nil {

cmd/swarm-smoke/upload_and_sync.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ func trackChunks(testData []byte) error {
156156
wg.Wait()
157157

158158
checkChunksVsMostProxHosts(addrs, allHostChunks, bzzAddrs)
159+
metrics.GetOrRegisterGauge("deployment.nodes", nil).Update(int64(len(hosts)))
159160

160161
if !hasErr {
161162
// remove the chunks stored on the uploader node
@@ -207,11 +208,10 @@ func checkChunksVsMostProxHosts(addrs []storage.Address, allHostChunks map[strin
207208
}
208209
}
209210

210-
log.Debug("sync mode", "sync mode", syncMode)
211-
212211
if syncMode == "pullsync" || syncMode == "both" {
213212
for _, maxProxHost := range maxProxHosts {
214213
if allHostChunks[maxProxHost][i] == '0' {
214+
metrics.GetOrRegisterCounter("upload-and-sync.pull-sync.chunk-not-max-prox", nil).Inc(1)
215215
log.Error("chunk not found at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost])
216216
} else {
217217
log.Trace("chunk present at max prox host", "ref", addrs[i], "host", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost])
@@ -220,6 +220,7 @@ func checkChunksVsMostProxHosts(addrs []storage.Address, allHostChunks map[strin
220220

221221
// if chunk found at less than 2 hosts, which is actually less that the min size of a NN
222222
if foundAt < 2 {
223+
metrics.GetOrRegisterCounter("upload-and-sync.pull-sync.chunk-less-nn", nil).Inc(1)
223224
log.Error("chunk found at less than two hosts", "foundAt", foundAt, "ref", addrs[i])
224225
}
225226
}
@@ -235,6 +236,7 @@ func checkChunksVsMostProxHosts(addrs []storage.Address, allHostChunks map[strin
235236

236237
if !found {
237238
for _, maxProxHost := range maxProxHosts {
239+
metrics.GetOrRegisterCounter("upload-and-sync.push-sync.chunk-not-max-prox", nil).Inc(1)
238240
log.Error("chunk not found at any max prox host", "ref", addrs[i], "hosts", maxProxHost, "bzzAddr", bzzAddrs[maxProxHost])
239241
}
240242
}
@@ -270,6 +272,8 @@ func uploadAndSync(c *cli.Context, randomBytes []byte) error {
270272
}
271273
t2 := time.Since(t1)
272274
metrics.GetOrRegisterResettingTimer("upload-and-sync.upload-time", nil).Update(t2)
275+
uploadSpeed := float64(len(randomBytes)) / t2.Seconds() // bytes per second
276+
metrics.GetOrRegisterGauge("upload-and-sync.upload-speed", nil).Update(int64(uploadSpeed))
273277

274278
fhash, err := digest(bytes.NewReader(randomBytes))
275279
if err != nil {
@@ -315,6 +319,9 @@ func uploadAndSync(c *cli.Context, randomBytes []byte) error {
315319
ended := time.Since(start)
316320

317321
metrics.GetOrRegisterResettingTimer("upload-and-sync.single.fetch-time", nil).Update(ended)
322+
downloadSpeed := float64(len(randomBytes)) / ended.Seconds() // bytes per second
323+
metrics.GetOrRegisterGauge("upload-and-sync.download-speed", nil).Update(int64(downloadSpeed))
324+
318325
log.Info("fetch successful", "took", ended, "endpoint", httpEndpoint(hosts[randIndex]))
319326
break
320327
}

cmd/swarm-snapshot/create.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func createSnapshot(filename string, nodes int, services []string) (err error) {
7474
UnderlayAddr: addr.Under(),
7575
HiveParams: hp,
7676
}
77-
return network.NewBzz(config, kad, nil, nil, nil), nil, nil
77+
return network.NewBzz(config, kad, nil, nil, nil, nil, nil), nil, nil
7878
},
7979
})
8080
defer sim.Close()

cmd/swarm/config_test.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,11 @@ import (
3434
"github.com/ethereum/go-ethereum/rpc"
3535
"github.com/ethersphere/swarm"
3636
"github.com/ethersphere/swarm/api"
37+
"github.com/ethersphere/swarm/testutil"
3738
)
3839

3940
func TestConfigDump(t *testing.T) {
40-
swarm := runSwarm(t, "dumpconfig")
41+
swarm := runSwarm(t, "--verbosity", fmt.Sprintf("%d", *testutil.Loglevel), "dumpconfig")
4142
defaultConf := api.NewConfig()
4243
out, err := tomlSettings.Marshal(&defaultConf)
4344
if err != nil {
@@ -53,6 +54,7 @@ func TestConfigFailsSwapEnabledNoBackendURL(t *testing.T) {
5354
fmt.Sprintf("--%s", SwarmPortFlag.Name), "54545",
5455
fmt.Sprintf("--%s", utils.ListenPortFlag.Name), "0",
5556
fmt.Sprintf("--%s", SwarmSwapEnabledFlag.Name),
57+
"--verbosity", fmt.Sprintf("%d", *testutil.Loglevel),
5658
}
5759

5860
swarm := runSwarm(t, flags...)
@@ -89,6 +91,7 @@ func TestBzzKeyFlag(t *testing.T) {
8991
fmt.Sprintf("--%s", utils.DataDirFlag.Name), dir,
9092
fmt.Sprintf("--%s", utils.IPCPathFlag.Name), conf.IPCPath,
9193
fmt.Sprintf("--%s", SwarmBzzKeyHexFlag.Name), hexKey,
94+
"--verbosity", fmt.Sprintf("%d", *testutil.Loglevel),
9295
}
9396

9497
node.Cmd = runSwarm(t, flags...)
@@ -137,6 +140,7 @@ func TestEmptyBzzAccountFlagMultipleAccounts(t *testing.T) {
137140
fmt.Sprintf("--%s", SwarmPortFlag.Name), "0",
138141
fmt.Sprintf("--%s", utils.ListenPortFlag.Name), "0",
139142
fmt.Sprintf("--%s", utils.DataDirFlag.Name), dir,
143+
"--verbosity", fmt.Sprintf("%d", *testutil.Loglevel),
140144
}
141145

142146
node.Cmd = runSwarm(t, flags...)
@@ -160,6 +164,7 @@ func TestEmptyBzzAccountFlagSingleAccount(t *testing.T) {
160164
fmt.Sprintf("--%s", utils.ListenPortFlag.Name), "0",
161165
fmt.Sprintf("--%s", utils.DataDirFlag.Name), dir,
162166
fmt.Sprintf("--%s", utils.IPCPathFlag.Name), conf.IPCPath,
167+
"--verbosity", fmt.Sprintf("%d", *testutil.Loglevel),
163168
}
164169

165170
node.Cmd = runSwarm(t, flags...)
@@ -205,6 +210,7 @@ func TestEmptyBzzAccountFlagNoAccountWrongPassword(t *testing.T) {
205210
fmt.Sprintf("--%s", SwarmPortFlag.Name), "0",
206211
fmt.Sprintf("--%s", utils.ListenPortFlag.Name), "0",
207212
fmt.Sprintf("--%s", utils.DataDirFlag.Name), dir,
213+
"--verbosity", fmt.Sprintf("%d", *testutil.Loglevel),
208214
}
209215

210216
node.Cmd = runSwarm(t, flags...)
@@ -244,6 +250,7 @@ func TestConfigCmdLineOverrides(t *testing.T) {
244250
fmt.Sprintf("--%s", EnsAPIFlag.Name), "",
245251
fmt.Sprintf("--%s", utils.DataDirFlag.Name), dir,
246252
fmt.Sprintf("--%s", utils.IPCPathFlag.Name), conf.IPCPath,
253+
"--verbosity", fmt.Sprintf("%d", *testutil.Loglevel),
247254
}
248255
node.Cmd = runSwarm(t, flags...)
249256
node.Cmd.InputLine(testPassphrase)
@@ -342,6 +349,7 @@ func TestConfigFileOverrides(t *testing.T) {
342349
fmt.Sprintf("--%s", EnsAPIFlag.Name), "",
343350
fmt.Sprintf("--%s", utils.DataDirFlag.Name), dir,
344351
fmt.Sprintf("--%s", utils.IPCPathFlag.Name), conf.IPCPath,
352+
"--verbosity", fmt.Sprintf("%d", *testutil.Loglevel),
345353
}
346354
node.Cmd = runSwarm(t, flags...)
347355
node.Cmd.InputLine(testPassphrase)
@@ -420,6 +428,7 @@ func TestConfigEnvVars(t *testing.T) {
420428
"--ens-api", "",
421429
"--datadir", dir,
422430
"--ipcpath", conf.IPCPath,
431+
"--verbosity", fmt.Sprintf("%d", *testutil.Loglevel),
423432
}
424433

425434
//node.Cmd = runSwarm(t,flags...)
@@ -551,6 +560,7 @@ func TestConfigCmdLineOverridesFile(t *testing.T) {
551560
fmt.Sprintf("--%s", EnsAPIFlag.Name), "",
552561
fmt.Sprintf("--%s", utils.DataDirFlag.Name), dir,
553562
fmt.Sprintf("--%s", utils.IPCPathFlag.Name), conf.IPCPath,
563+
"--verbosity", fmt.Sprintf("%d", *testutil.Loglevel),
554564
}
555565
node.Cmd = runSwarm(t, flags...)
556566
node.Cmd.InputLine(testPassphrase)

cmd/swarm/explore.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func hashes(ctx *cli.Context) {
4848
}
4949
defer f.Close()
5050

51-
fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams(), chunk.NewTags())
51+
fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, &storage.FakeChunkStore{}, storage.NewFileStoreParams(), chunk.NewTags())
5252
refs, err := fileStore.GetAllReferences(context.TODO(), f)
5353
if err != nil {
5454
utils.Fatalf("%v\n", err)

cmd/swarm/hash.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func hash(ctx *cli.Context) {
7878
defer f.Close()
7979

8080
stat, _ := f.Stat()
81-
fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, storage.NewFileStoreParams(), chunk.NewTags())
81+
fileStore := storage.NewFileStore(&storage.FakeChunkStore{}, &storage.FakeChunkStore{}, storage.NewFileStoreParams(), chunk.NewTags())
8282

8383
addr, _, err := fileStore.Store(context.TODO(), f, stat.Size(), false)
8484
if err != nil {

network/networkid_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ func newServices() adapters.Services {
216216
HiveParams: hp,
217217
NetworkID: uint64(currentNetworkID),
218218
}
219-
return NewBzz(config, kademlia(ctx.Config.ID), nil, nil, nil), nil
219+
return NewBzz(config, kademlia(ctx.Config.ID), nil, nil, nil, nil, nil), nil
220220
},
221221
}
222222
}

0 commit comments

Comments
 (0)