Skip to content

Commit 5579bea

Browse files
Corey Richardsonmergify[bot]
authored andcommitted
Hotfix/libp2p details (#3328)
* Fix coda advanced generate-libp2p-keypair There was a double deferred that wasn't being waited on correctly, too much logger spam, and the tmpdir was bad * Don't double-encode peer_id * fix CLI arg docs * configure before handling protocol, but split up listening * Fix DHT/discovery lifecycle * working discovery! * turn coda_net2 tests back off
1 parent 54945c1 commit 5579bea

File tree

2 files changed

+123
-100
lines changed

2 files changed

+123
-100
lines changed

src/codanet.go

Lines changed: 13 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,13 @@ import (
44
"bytes"
55
"context"
66
"fmt"
7-
"log"
8-
"os"
9-
"path"
10-
"strconv"
11-
"time"
12-
137
dsb "github.com/ipfs/go-ds-badger"
148
logging "github.com/ipfs/go-log"
159
p2p "github.com/libp2p/go-libp2p"
1610
crypto "github.com/libp2p/go-libp2p-core/crypto"
1711
host "github.com/libp2p/go-libp2p-core/host"
12+
"github.com/libp2p/go-libp2p-core/network"
1813
"github.com/libp2p/go-libp2p-core/peer"
19-
"github.com/libp2p/go-libp2p-core/peerstore"
2014
routing "github.com/libp2p/go-libp2p-core/routing"
2115
discovery "github.com/libp2p/go-libp2p-discovery"
2216
kad "github.com/libp2p/go-libp2p-kad-dht"
@@ -32,12 +26,16 @@ import (
3226
ws "github.com/libp2p/go-ws-transport"
3327
"github.com/multiformats/go-multiaddr"
3428
"golang.org/x/crypto/blake2b"
29+
"log"
30+
"os"
31+
"path"
32+
"strconv"
3533
)
3634

3735
// Helper contains all the daemon state
3836
type Helper struct {
3937
Host host.Host
40-
Mdns mdns.Service
38+
Mdns *mdns.Service
4139
Dht *kad.IpfsDHT
4240
Ctx context.Context
4341
Pubsub *pubsub.PubSub
@@ -47,14 +45,6 @@ type Helper struct {
4745
Discovery *discovery.RoutingDiscovery
4846
}
4947

50-
type mdnsListener struct {
51-
FoundPeer chan peer.AddrInfo
52-
}
53-
54-
func (l *mdnsListener) HandlePeerFound(info peer.AddrInfo) {
55-
l.FoundPeer <- info
56-
}
57-
5848
type customValidator struct {
5949
Base record.Validator
6050
}
@@ -69,6 +59,8 @@ func (cv customValidator) Select(key string, values [][]byte) (int, error) {
6959
return cv.Base.Select(key, values)
7060
}
7161

62+
// TODO: just put this into main.go?
63+
7264
// MakeHelper does all the initialization to run one host
7365
func MakeHelper(ctx context.Context, listenOn []multiaddr.Multiaddr, statedir string, pk crypto.PrivKey, networkID string) (*Helper, error) {
7466
logger := logging.Logger("codanet.Helper")
@@ -103,7 +95,7 @@ func MakeHelper(ctx context.Context, listenOn []multiaddr.Multiaddr, statedir st
10395

10496
rv := customValidator{Base: record.NamespacedValidator{"pk": record.PublicKeyValidator{}}}
10597

106-
// gross hack to exfiltrate a channel from the side effect of option evaluation
98+
// gross hack to exfiltrate the DHT from the side effect of option evaluation
10799
kadch := make(chan *kad.IpfsDHT)
108100

109101
// Make sure this doesn't get too out of sync with the defaults,
@@ -129,75 +121,23 @@ func MakeHelper(ctx context.Context, listenOn []multiaddr.Multiaddr, statedir st
129121
return nil, err
130122
}
131123

132-
mdns, err := mdns.NewMdnsService(ctx, host, time.Minute, "_coda-discovery._udp.local")
133-
if err != nil {
134-
return nil, err
135-
}
136-
l := &mdnsListener{FoundPeer: make(chan peer.AddrInfo)}
137-
mdns.RegisterNotifee(l)
138-
139124
kad := <-kadch
140125

141-
if err = kad.Bootstrap(ctx); err != nil {
142-
return nil, err
143-
}
144-
routingDiscovery := discovery.NewRoutingDiscovery(kad)
145-
146-
log.Println("Announcing ourselves for", rendezvousString)
147-
148-
discovered := make(chan peer.AddrInfo)
149-
150-
foundPeer := func(info peer.AddrInfo, source string) {
151-
if info.ID != "" {
152-
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
153-
defer cancel()
154-
if err := host.Connect(ctx, info); err != nil {
155-
logger.Warning("couldn't connect to %s peer %v (maybe the network ID mismatched?): %v", source, info.Loggable(), err)
156-
} else {
157-
logger.Info("Found a %s peer: %s", source, info.Loggable())
158-
host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.ConnectedAddrTTL)
159-
discovered <- info
160-
}
161-
}
162-
}
163-
164-
// report local discovery peers
165-
go func() {
166-
for info := range l.FoundPeer {
167-
foundPeer(info, "local")
168-
}
169-
}()
170-
171-
// report dht peers
172-
go func() {
173-
for {
174-
// default is to yield only 100 peers at a time. for now, always be
175-
// looking... TODO: Is there a better way to use discovery? Should we only
176-
// have to explicitly search once?
177-
dhtpeers, err := routingDiscovery.FindPeers(ctx, rendezvousString)
178-
if err != nil {
179-
logger.Error("failed to find DHT peers: ", err)
180-
}
181-
for info := range dhtpeers {
182-
foundPeer(info, "dht")
183-
}
184-
}
185-
}()
186-
187126
pubsub, err := pubsub.NewFloodSub(ctx, host, pubsub.WithStrictSignatureVerification(true), pubsub.WithMessageSigning(true))
188127
if err != nil {
189128
return nil, err
190129
}
191130

131+
// nil fields are initialized by beginAdvertising
192132
return &Helper{
193133
Host: host,
194134
Ctx: ctx,
195-
Mdns: mdns,
135+
Mdns: nil,
196136
Dht: kad,
197137
Pubsub: pubsub,
198138
Logger: logger,
199-
DiscoveredPeers: discovered,
139+
DiscoveredPeers: nil,
200140
Rendezvous: rendezvousString,
201-
Discovery: routingDiscovery,
141+
Discovery: nil,
202142
}, nil
203143
}

src/libp2p_helper/main.go

Lines changed: 110 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,15 @@ import (
1313
"sync"
1414
"time"
1515

16+
mdns "github.com/libp2p/go-libp2p/p2p/discovery"
17+
1618
"github.com/go-errors/errors"
1719
logging "github.com/ipfs/go-log"
1820
logwriter "github.com/ipfs/go-log/writer"
1921
crypto "github.com/libp2p/go-libp2p-core/crypto"
2022
net "github.com/libp2p/go-libp2p-core/network"
2123
peer "github.com/libp2p/go-libp2p-core/peer"
24+
"github.com/libp2p/go-libp2p-core/peerstore"
2225
protocol "github.com/libp2p/go-libp2p-core/protocol"
2326
discovery "github.com/libp2p/go-libp2p-discovery"
2427
pubsub "github.com/libp2p/go-libp2p-pubsub"
@@ -130,6 +133,10 @@ func needsConfigure() error {
130133
return badRPC(errors.New("helper not yet configured"))
131134
}
132135

136+
func needsDHT() error {
137+
return badRPC(errors.New("helper not yet joined to pubsub"))
138+
}
139+
133140
type configureMsg struct {
134141
Statedir string `json:"statedir"`
135142
Privk string `json:"privk"`
@@ -161,19 +168,6 @@ func (m *configureMsg) run(app *app) (interface{}, error) {
161168
maddrs[i] = res
162169
}
163170
helper, err := codanet.MakeHelper(app.Ctx, maddrs, m.Statedir, privk, m.NetworkID)
164-
go func() {
165-
for info := range helper.DiscoveredPeers {
166-
addrStrings := make([]string, len(info.Addrs))
167-
for i, a := range info.Addrs {
168-
addrStrings[i] = a.String()
169-
}
170-
app.writeMsg(discoveredPeerUpcall{
171-
ID: peer.IDB58Encode(info.ID),
172-
Addrs: addrStrings,
173-
Upcall: "discoveredPeer",
174-
})
175-
}
176-
}()
177171
if err != nil {
178172
return nil, badHelper(err)
179173
}
@@ -219,6 +213,10 @@ func (t *publishMsg) run(app *app) (interface{}, error) {
219213
if app.P2p == nil {
220214
return nil, needsConfigure()
221215
}
216+
if app.P2p.Dht == nil {
217+
return nil, needsDHT()
218+
}
219+
222220
data, err := b58.Decode(t.Data)
223221
if err != nil {
224222
return nil, badRPC(err)
@@ -244,6 +242,9 @@ func (s *subscribeMsg) run(app *app) (interface{}, error) {
244242
if app.P2p == nil {
245243
return nil, needsConfigure()
246244
}
245+
if app.P2p.Dht == nil {
246+
return nil, needsDHT()
247+
}
247248
err := app.P2p.Pubsub.RegisterTopicValidator(s.Topic, func(ctx context.Context, id peer.ID, msg *pubsub.Message) bool {
248249
seqno := <-seqs
249250
ch := make(chan bool)
@@ -403,17 +404,9 @@ type incomingMsgUpcall struct {
403404

404405
func handleStreamReads(app *app, stream net.Stream, idx int) {
405406
go func() {
406-
buf := make([]byte, 512)
407+
buf := make([]byte, 4096)
407408
for {
408409
len, err := stream.Read(buf)
409-
if err != nil && err != io.EOF {
410-
app.writeMsg(streamLostUpcall{
411-
Upcall: "streamLost",
412-
StreamIdx: idx,
413-
Reason: fmt.Sprintf("read failure: %s", err.Error()),
414-
})
415-
stream.Reset()
416-
}
417410

418411
if len != 0 {
419412
app.writeMsg(incomingMsgUpcall{
@@ -423,6 +416,15 @@ func handleStreamReads(app *app, stream net.Stream, idx int) {
423416
})
424417
}
425418

419+
if err != nil && err != io.EOF {
420+
app.writeMsg(streamLostUpcall{
421+
Upcall: "streamLost",
422+
StreamIdx: idx,
423+
Reason: fmt.Sprintf("read failure: %s", err.Error()),
424+
})
425+
break
426+
}
427+
426428
if err == io.EOF {
427429
break
428430
}
@@ -459,7 +461,11 @@ func (o *openStreamMsg) run(app *app) (interface{}, error) {
459461
}
460462

461463
app.Streams[streamIdx] = stream
462-
handleStreamReads(app, stream, streamIdx)
464+
go func() {
465+
// FIXME HACK: allow time for the openStreamResult to get printed before we start inserting stream events
466+
time.Sleep(250 * time.Millisecond)
467+
handleStreamReads(app, stream, streamIdx)
468+
}()
463469
return openStreamResult{StreamIdx: streamIdx, RemoteAddr: stream.Conn().RemoteMultiaddr().String(), RemotePeerID: stream.Conn().RemotePeer().String()}, nil
464470
}
465471

@@ -490,7 +496,7 @@ func (cs *resetStreamMsg) run(app *app) (interface{}, error) {
490496
return nil, needsConfigure()
491497
}
492498
if stream, ok := app.Streams[cs.StreamIdx]; ok {
493-
err := stream.Close()
499+
err := stream.Reset()
494500
if err != nil {
495501
return nil, badp2p(err)
496502
}
@@ -604,12 +610,89 @@ func (ap *addPeerMsg) run(app *app) (interface{}, error) {
604610
type beginAdvertisingMsg struct {
605611
}
606612

613+
type mdnsListener struct {
614+
FoundPeer chan peer.AddrInfo
615+
}
616+
617+
func (l *mdnsListener) HandlePeerFound(info peer.AddrInfo) {
618+
l.FoundPeer <- info
619+
}
620+
607621
func (ap *beginAdvertisingMsg) run(app *app) (interface{}, error) {
608622
if app.P2p == nil {
609623
return nil, needsConfigure()
610624
}
611625

612-
discovery.Advertise(app.Ctx, app.P2p.Discovery, app.P2p.Rendezvous)
626+
mdns, err := mdns.NewMdnsService(app.Ctx, app.P2p.Host, time.Minute, "_coda-discovery._udp.local")
627+
if err != nil {
628+
return nil, err
629+
}
630+
app.P2p.Mdns = &mdns
631+
l := &mdnsListener{FoundPeer: make(chan peer.AddrInfo)}
632+
mdns.RegisterNotifee(l)
633+
634+
routingDiscovery := discovery.NewRoutingDiscovery(app.P2p.Dht)
635+
636+
if routingDiscovery == nil {
637+
return nil, errors.New("failed to create routing discovery")
638+
}
639+
640+
app.P2p.Discovery = routingDiscovery
641+
642+
discovered := make(chan peer.AddrInfo)
643+
app.P2p.DiscoveredPeers = discovered
644+
645+
foundPeer := func(info peer.AddrInfo, source string) {
646+
if info.ID != "" && len(info.Addrs) != 0 {
647+
ctx, cancel := context.WithTimeout(app.Ctx, 15*time.Second)
648+
defer cancel()
649+
if err := app.P2p.Host.Connect(ctx, info); err != nil {
650+
app.P2p.Logger.Warning("couldn't connect to %s peer %v (maybe the network ID mismatched?): %v", source, info.Loggable(), err)
651+
} else {
652+
app.P2p.Logger.Info("Found a %s peer: %s", source, info.Loggable())
653+
app.P2p.Host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.ConnectedAddrTTL)
654+
addrStrings := make([]string, len(info.Addrs))
655+
for i, a := range info.Addrs {
656+
addrStrings[i] = a.String()
657+
}
658+
app.writeMsg(discoveredPeerUpcall{
659+
ID: peer.IDB58Encode(info.ID),
660+
Addrs: addrStrings,
661+
Upcall: "discoveredPeer",
662+
})
663+
}
664+
}
665+
}
666+
667+
// report local discovery peers
668+
go func() {
669+
for info := range l.FoundPeer {
670+
foundPeer(info, "local")
671+
}
672+
}()
673+
674+
// report dht peers
675+
go func() {
676+
for {
677+
// default is to yield only 100 peers at a time. for now, always be
678+
// looking... TODO: Is there a better way to use discovery? Should we only
679+
// have to explicitly search once at startup?
680+
dhtpeers, err := routingDiscovery.FindPeers(app.Ctx, app.P2p.Rendezvous)
681+
if err != nil {
682+
app.P2p.Logger.Error("failed to find DHT peers: ", err)
683+
}
684+
for info := range dhtpeers {
685+
foundPeer(info, "dht")
686+
}
687+
time.Sleep(5 * time.Minute)
688+
}
689+
}()
690+
691+
if err := app.P2p.Dht.Bootstrap(app.Ctx); err != nil {
692+
return nil, badp2p(err)
693+
}
694+
695+
discovery.Advertise(app.Ctx, routingDiscovery, app.P2p.Rendezvous)
613696

614697
return "beginAdvertising success", nil
615698
}
@@ -647,7 +730,7 @@ func main() {
647730
logwriter.Configure(logwriter.Output(os.Stderr), logwriter.LdJSONFormatter)
648731
log.SetOutput(os.Stderr)
649732
logging.SetAllLoggers(logging2.INFO)
650-
helper_log := logging.Logger("helper top-level JSON handling")
733+
helperLog := logging.Logger("helper top-level JSON handling")
651734

652735
go func() {
653736
i := 0
@@ -688,7 +771,7 @@ func main() {
688771
}
689772
defer func() {
690773
if r := recover(); r != nil {
691-
helper_log.Error("While handling RPC:", line, "\nThe following panic occurred: ", r)
774+
helperLog.Error("While handling RPC:", line, "\nThe following panic occurred: ", r)
692775
}
693776
}()
694777
res, err := msg.run(app)

0 commit comments

Comments
 (0)