Skip to content

Commit 54945c1

Browse files
Corey Richardsonbkase
authored andcommitted
Add optional libp2p discovery (#3323)
use -libp2p-discovery as daemon arg to enable. There is a -libp2p-port argument but it is IGNORED at the moment in favor of the hardcoded port 28675 (arbitrary). When we remove the Haskell Kademlia, we'll take over the existing port infrastructure. There's a -disable-old-discovery that turns off old discovery as well. Risky! * cli flags * Gossip_net: keep the whole Config.t * add libp2p_membership field * missing opam file * start threading it all through * big ole nasty commit that does a bunch * libp2p keypair + initial peers * inject new libp2p peers back into haskell membership
1 parent 18cd6bd commit 54945c1

File tree

4 files changed

+137
-36
lines changed

4 files changed

+137
-36
lines changed

src/codanet.go

Lines changed: 60 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -4,47 +4,54 @@ import (
44
"bytes"
55
"context"
66
"fmt"
7+
"log"
8+
"os"
9+
"path"
10+
"strconv"
11+
"time"
12+
713
dsb "github.com/ipfs/go-ds-badger"
14+
logging "github.com/ipfs/go-log"
815
p2p "github.com/libp2p/go-libp2p"
916
crypto "github.com/libp2p/go-libp2p-core/crypto"
1017
host "github.com/libp2p/go-libp2p-core/host"
18+
"github.com/libp2p/go-libp2p-core/peer"
19+
"github.com/libp2p/go-libp2p-core/peerstore"
1120
routing "github.com/libp2p/go-libp2p-core/routing"
1221
discovery "github.com/libp2p/go-libp2p-discovery"
1322
kad "github.com/libp2p/go-libp2p-kad-dht"
1423
kadopts "github.com/libp2p/go-libp2p-kad-dht/opts"
15-
"github.com/libp2p/go-libp2p-peerstore"
1624
"github.com/libp2p/go-libp2p-peerstore/pstoreds"
1725
pnet "github.com/libp2p/go-libp2p-pnet"
1826
pubsub "github.com/libp2p/go-libp2p-pubsub"
19-
"github.com/libp2p/go-libp2p-record"
27+
record "github.com/libp2p/go-libp2p-record"
2028
secio "github.com/libp2p/go-libp2p-secio"
2129
p2pconfig "github.com/libp2p/go-libp2p/config"
2230
mdns "github.com/libp2p/go-libp2p/p2p/discovery"
2331
tcp "github.com/libp2p/go-tcp-transport"
2432
ws "github.com/libp2p/go-ws-transport"
2533
"github.com/multiformats/go-multiaddr"
2634
"golang.org/x/crypto/blake2b"
27-
"log"
28-
"os"
29-
"path"
30-
"strconv"
31-
"time"
3235
)
3336

3437
// Helper contains all the daemon state
3538
type Helper struct {
36-
Host host.Host
37-
Mdns mdns.Service
38-
Dht *kad.IpfsDHT
39-
Ctx context.Context
40-
Pubsub *pubsub.PubSub
39+
Host host.Host
40+
Mdns mdns.Service
41+
Dht *kad.IpfsDHT
42+
Ctx context.Context
43+
Pubsub *pubsub.PubSub
44+
Logger logging.EventLogger
45+
DiscoveredPeers chan peer.AddrInfo
46+
Rendezvous string
47+
Discovery *discovery.RoutingDiscovery
4148
}
4249

4350
type mdnsListener struct {
44-
FoundPeer chan peerstore.PeerInfo
51+
FoundPeer chan peer.AddrInfo
4552
}
4653

47-
func (l *mdnsListener) HandlePeerFound(info peerstore.PeerInfo) {
54+
func (l *mdnsListener) HandlePeerFound(info peer.AddrInfo) {
4855
l.FoundPeer <- info
4956
}
5057

@@ -64,6 +71,7 @@ func (cv customValidator) Select(key string, values [][]byte) (int, error) {
6471

6572
// MakeHelper does all the initialization to run one host
6673
func MakeHelper(ctx context.Context, listenOn []multiaddr.Multiaddr, statedir string, pk crypto.PrivKey, networkID string) (*Helper, error) {
74+
logger := logging.Logger("codanet.Helper")
6775
dso := dsb.DefaultOptions
6876

6977
bp := path.Join(statedir, strconv.Itoa(os.Getpid()))
@@ -85,14 +93,17 @@ func MakeHelper(ctx context.Context, listenOn []multiaddr.Multiaddr, statedir st
8593
return nil, err
8694
}
8795

88-
pnetKey := blake2b.Sum256([]byte("/coda/0.0.1"))
96+
rendezvousString := fmt.Sprintf("/coda/0.0.1/%s", networkID)
97+
98+
pnetKey := blake2b.Sum256([]byte(rendezvousString))
8999
prot, err := pnet.NewV1ProtectorFromBytes(&pnetKey)
90100
if err != nil {
91101
return nil, err
92102
}
93103

94104
rv := customValidator{Base: record.NamespacedValidator{"pk": record.PublicKeyValidator{}}}
95105

106+
// gross hack to exfiltrate a channel from the side effect of option evaluation
96107
kadch := make(chan *kad.IpfsDHT)
97108

98109
// Make sure this doesn't get too out of sync with the defaults,
@@ -118,13 +129,11 @@ func MakeHelper(ctx context.Context, listenOn []multiaddr.Multiaddr, statedir st
118129
return nil, err
119130
}
120131

121-
rendezvousString := fmt.Sprintf("/coda/0.0.1/%s", networkID)
122-
123132
mdns, err := mdns.NewMdnsService(ctx, host, time.Minute, "_coda-discovery._udp.local")
124133
if err != nil {
125134
return nil, err
126135
}
127-
l := &mdnsListener{FoundPeer: make(chan peerstore.PeerInfo)}
136+
l := &mdnsListener{FoundPeer: make(chan peer.AddrInfo)}
128137
mdns.RegisterNotifee(l)
129138

130139
kad := <-kadch
@@ -135,44 +144,60 @@ func MakeHelper(ctx context.Context, listenOn []multiaddr.Multiaddr, statedir st
135144
routingDiscovery := discovery.NewRoutingDiscovery(kad)
136145

137146
log.Println("Announcing ourselves for", rendezvousString)
138-
discovery.Advertise(ctx, routingDiscovery, rendezvousString)
139147

140-
// try and find some peers for this chain
141-
//dhtpeers, err := routingDiscovery.FindPeers(ctx, rendezvousString, discovery.Limit(16))
142-
//if err != nil {
143-
// return nil, err
144-
//}
148+
discovered := make(chan peer.AddrInfo)
145149

146-
foundPeer := func(info peerstore.PeerInfo, source string) {
150+
foundPeer := func(info peer.AddrInfo, source string) {
147151
if info.ID != "" {
148152
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
149153
defer cancel()
150154
if err := host.Connect(ctx, info); err != nil {
151-
log.Printf("Warn: couldn't connect to %s peer %v (different chain?): %v", source, info.Loggable(), err)
155+
logger.Warning("couldn't connect to %s peer %v (maybe the network ID mismatched?): %v", source, info.Loggable(), err)
152156
} else {
153-
log.Printf("Found a %s peer: %s", source, info.Loggable())
154-
host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)
157+
logger.Info("Found a %s peer: %s", source, info.Loggable())
158+
host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.ConnectedAddrTTL)
159+
discovered <- info
155160
}
156161
}
157162
}
158163

164+
// report local discovery peers
159165
go func() {
160-
for {
161-
info := <-l.FoundPeer
166+
for info := range l.FoundPeer {
162167
foundPeer(info, "local")
163168
}
164169
}()
165170

171+
// report dht peers
172+
go func() {
173+
for {
174+
// default is to yield only 100 peers at a time. for now, always be
175+
// looking... TODO: Is there a better way to use discovery? Should we only
176+
// have to explicitly search once?
177+
dhtpeers, err := routingDiscovery.FindPeers(ctx, rendezvousString)
178+
if err != nil {
179+
logger.Error("failed to find DHT peers: ", err)
180+
}
181+
for info := range dhtpeers {
182+
foundPeer(info, "dht")
183+
}
184+
}
185+
}()
186+
166187
pubsub, err := pubsub.NewFloodSub(ctx, host, pubsub.WithStrictSignatureVerification(true), pubsub.WithMessageSigning(true))
167188
if err != nil {
168189
return nil, err
169190
}
170191

171192
return &Helper{
172-
Host: host,
173-
Ctx: ctx,
174-
Mdns: mdns,
175-
Dht: kad,
176-
Pubsub: pubsub,
193+
Host: host,
194+
Ctx: ctx,
195+
Mdns: mdns,
196+
Dht: kad,
197+
Pubsub: pubsub,
198+
Logger: logger,
199+
DiscoveredPeers: discovered,
200+
Rendezvous: rendezvousString,
201+
Discovery: routingDiscovery,
177202
}, nil
178203
}

src/generate_methodidx/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func main() {
9999
Command: "generate_methodidx",
100100
PackageName: "main",
101101
TypesAndValues: map[string][]string{
102-
"methodIdx": []string{"configure", "listen", "publish", "subscribe", "unsubscribe", "validationComplete", "generateKeypair", "openStream", "closeStream", "resetStream", "sendStreamMsg", "removeStreamHandler", "addStreamHandler", "listeningAddrs"},
102+
"methodIdx": []string{"configure", "listen", "publish", "subscribe", "unsubscribe", "validationComplete", "generateKeypair", "openStream", "closeStream", "resetStream", "sendStreamMsg", "removeStreamHandler", "addStreamHandler", "listeningAddrs", "addPeer", "beginAdvertising"},
103103
},
104104
}
105105

src/libp2p_helper/main.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
net "github.com/libp2p/go-libp2p-core/network"
2121
peer "github.com/libp2p/go-libp2p-core/peer"
2222
protocol "github.com/libp2p/go-libp2p-core/protocol"
23+
discovery "github.com/libp2p/go-libp2p-discovery"
2324
pubsub "github.com/libp2p/go-libp2p-pubsub"
2425
b58 "github.com/mr-tron/base58/base58"
2526
"github.com/multiformats/go-multiaddr"
@@ -64,6 +65,8 @@ const (
6465
removeStreamHandler
6566
addStreamHandler
6667
listeningAddrs
68+
addPeer
69+
beginAdvertising
6770
)
6871

6972
type envelope struct {
@@ -134,6 +137,12 @@ type configureMsg struct {
134137
ListenOn []string `json:"ifaces"`
135138
}
136139

140+
type discoveredPeerUpcall struct {
141+
ID string `json:"peer_id"`
142+
Addrs []string `json:"multiaddrs"`
143+
Upcall string `json:"upcall"`
144+
}
145+
137146
func (m *configureMsg) run(app *app) (interface{}, error) {
138147
privkBytes, err := b58.Decode(m.Privk)
139148
if err != nil {
@@ -152,6 +161,19 @@ func (m *configureMsg) run(app *app) (interface{}, error) {
152161
maddrs[i] = res
153162
}
154163
helper, err := codanet.MakeHelper(app.Ctx, maddrs, m.Statedir, privk, m.NetworkID)
164+
go func() {
165+
for info := range helper.DiscoveredPeers {
166+
addrStrings := make([]string, len(info.Addrs))
167+
for i, a := range info.Addrs {
168+
addrStrings[i] = a.String()
169+
}
170+
app.writeMsg(discoveredPeerUpcall{
171+
ID: peer.IDB58Encode(info.ID),
172+
Addrs: addrStrings,
173+
Upcall: "discoveredPeer",
174+
})
175+
}
176+
}()
155177
if err != nil {
156178
return nil, badHelper(err)
157179
}
@@ -546,6 +568,52 @@ func (rs *removeStreamHandlerMsg) run(app *app) (interface{}, error) {
546568
return "removeStreamHandler success", nil
547569
}
548570

571+
type addPeerMsg struct {
572+
Multiaddr string `json:"multiaddr"`
573+
}
574+
575+
func (ap *addPeerMsg) run(app *app) (interface{}, error) {
576+
if app.P2p == nil {
577+
return nil, needsConfigure()
578+
}
579+
multiaddr, err := multiaddr.NewMultiaddr(ap.Multiaddr)
580+
if err != nil {
581+
// TODO: this isn't necessarily an RPC error. Perhaps the encoded multiaddr
582+
// isn't supported by this version of libp2p.
583+
// But more likely, it is an RPC error.
584+
return nil, badRPC(err)
585+
}
586+
info, err := peer.AddrInfoFromP2pAddr(multiaddr)
587+
if err != nil {
588+
// TODO: this isn't necessarily an RPC error. Perhaps the contained peer ID
589+
// isn't supported by this version of libp2p.
590+
// But more likely, it is an RPC error.
591+
return nil, badRPC(err)
592+
}
593+
594+
// discovery should notice the connection event and do the dht thing
595+
err = app.P2p.Host.Connect(app.Ctx, *info)
596+
597+
if err != nil {
598+
return nil, badp2p(err)
599+
}
600+
601+
return "addPeer success", nil
602+
}
603+
604+
type beginAdvertisingMsg struct {
605+
}
606+
607+
func (ap *beginAdvertisingMsg) run(app *app) (interface{}, error) {
608+
if app.P2p == nil {
609+
return nil, needsConfigure()
610+
}
611+
612+
discovery.Advertise(app.Ctx, app.P2p.Discovery, app.P2p.Rendezvous)
613+
614+
return "beginAdvertising success", nil
615+
}
616+
549617
var msgHandlers = map[methodIdx]func() action{
550618
configure: func() action { return &configureMsg{} },
551619
listen: func() action { return &listenMsg{} },
@@ -561,6 +629,8 @@ var msgHandlers = map[methodIdx]func() action{
561629
removeStreamHandler: func() action { return &removeStreamHandlerMsg{} },
562630
addStreamHandler: func() action { return &addStreamHandlerMsg{} },
563631
listeningAddrs: func() action { return &listeningAddrsMsg{} },
632+
addPeer: func() action { return &addPeerMsg{} },
633+
beginAdvertising: func() action { return &beginAdvertisingMsg{} },
564634
}
565635

566636
type errorResult struct {

src/libp2p_helper/methodidx_jsonenum.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ var (
2323
"removeStreamHandler": removeStreamHandler,
2424
"addStreamHandler": addStreamHandler,
2525
"listeningAddrs": listeningAddrs,
26+
"addPeer": addPeer,
27+
"beginAdvertising": beginAdvertising,
2628
}
2729

2830
_methodIdxValueToName = map[methodIdx]string{
@@ -40,6 +42,8 @@ var (
4042
removeStreamHandler: "removeStreamHandler",
4143
addStreamHandler: "addStreamHandler",
4244
listeningAddrs: "listeningAddrs",
45+
addPeer: "addPeer",
46+
beginAdvertising: "beginAdvertising",
4347
}
4448
)
4549

@@ -61,6 +65,8 @@ func init() {
6165
interface{}(removeStreamHandler).(fmt.Stringer).String(): removeStreamHandler,
6266
interface{}(addStreamHandler).(fmt.Stringer).String(): addStreamHandler,
6367
interface{}(listeningAddrs).(fmt.Stringer).String(): listeningAddrs,
68+
interface{}(addPeer).(fmt.Stringer).String(): addPeer,
69+
interface{}(beginAdvertising).(fmt.Stringer).String(): beginAdvertising,
6470
}
6571
}
6672
}

0 commit comments

Comments
 (0)