Skip to content

Commit d359d0e

Browse files
Corey Richardsonbkase
authored andcommitted
First version of coda_net2, the libp2p-integrated coda networki… (#2450)
* First version of coda_net2, the libp2p-integrated coda networking library * TODO the three functions that aren't done yet * go mod tidy * nixify the libp2p_helper build * use real logger, process startup code from membership.ml * less Or_error; Keypair.to_peerid * first test * update go deps, fix stream EOF handling, add some notes about generate_methodidx to a README * use coda_base58 * guard against unconfigured state * add listening_addrs method * add me method * reformat, add pubsub test * add multiaddr comment * PeerID.t * note about multiple validators * harmonize pk/sk * reformat, remove random_peers, comment. * TODO note about connection limits * remove binary (oops) * review feedback * Have subscribe return a pipe Previously you could specify which Strict_pipe behavior you want. This is less annoying, and hopefully Strict_pipe will be gone? * type the helper RPCs * straggling details * WIP [goupdate, integrate subscription and validation] * integrate validation with subscription the justification I used for keeping them separate is kinda weak. Topic validation is no longer optional! * some bugfix, more docs, json logs, rework validation * deriving yojson for upcall messages * Fix pipe leak in subscription error paths * Don't handle protocols we're already handling * Fix duplicate subscription detection Previously it would fail to actually look for other subscriptions for that topic... * Connection management note * Move the warning attribute * Thanks for the types @deepthiskumar! * Note some pipe leakage * Rename safe_secret The "safe" bit was that it was JSON-string-safe * Flip around duplication protocol handler condition * process->peer * Flesh out peers TODO * graceful shutdown TODO love * Do not close the pipes in reset * Swap around filter_inplace bool * make some control flow more obvious * Nicer error message * Note about stale lockfile * yeah how did that happen * remove extraneous sleeps * remove callipyge dep ended up doing keygen in the helper * flesh out the sender todo (only needed for trust) * pid file handling more notes * WIP * Use pinned nixpkgs to build libp2p_helper * Finish up stream_state stuff * reformat * mutex one line at a time * Process RPCs serially I'm 95% sure it's safe to do these concurrently minus one edge case regarding stream closure. I'm not sure how to handle the edge case, so just process them serially. * Make reformat * Attempt to install libp2p in unit tests * Nix try again * Trying nix again * Use sudo HOME too * Better yaml * Sudo the world * Better better sudo * Better sudo on the libp2p install * Assume this system is like my computer root sudo the nix-build * Attempt with chmod 777 on the dir * Qualify path to libp2p_helper * Typo on path name * Try disabiling coda_net2 tests and libp2p nix hack
0 parents  commit d359d0e

File tree

11 files changed

+1998
-0
lines changed

11 files changed

+1998
-0
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# libp2p_helper hints
2+
3+
If you are adding new `methodIdx` values, edit `generate_methodidx/main.go`
4+
(search for `TypesAndValues`) with the names of the new values. Then, run `go
5+
run generate_methodidx/main.go > libp2p_helper/methodidx_jsonenum.go`.

default.nix

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
((import (builtins.fetchTarball {
2+
name = "nixpkgs-unstable-2019-03-18";
3+
url = https://github.com/nixos/nixpkgs/archive/0125544e2a0552590c87dca1583768b49ba911c0.tar.gz;
4+
sha256 = "04xvlqw3zbq91zkfa506b2k1ajmj7pqh3nvdh9maabw6m5jhm5rl";
5+
})) {}).buildGoModule rec {
6+
name = "libp2p_helper-${version}";
7+
version = "0.1";
8+
src = ./src;
9+
modSha256 = "1spndcx0z50cmpfxfd0971nj9n0v77fghxl36hr1pvs6kv0ra5c3";
10+
}
11+

src/codanet.go

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
package codanet
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"fmt"
7+
dsb "github.com/ipfs/go-ds-badger"
8+
p2p "github.com/libp2p/go-libp2p"
9+
crypto "github.com/libp2p/go-libp2p-core/crypto"
10+
host "github.com/libp2p/go-libp2p-core/host"
11+
routing "github.com/libp2p/go-libp2p-core/routing"
12+
discovery "github.com/libp2p/go-libp2p-discovery"
13+
kad "github.com/libp2p/go-libp2p-kad-dht"
14+
kadopts "github.com/libp2p/go-libp2p-kad-dht/opts"
15+
"github.com/libp2p/go-libp2p-peerstore"
16+
"github.com/libp2p/go-libp2p-peerstore/pstoreds"
17+
pnet "github.com/libp2p/go-libp2p-pnet"
18+
pubsub "github.com/libp2p/go-libp2p-pubsub"
19+
"github.com/libp2p/go-libp2p-record"
20+
secio "github.com/libp2p/go-libp2p-secio"
21+
p2pconfig "github.com/libp2p/go-libp2p/config"
22+
mdns "github.com/libp2p/go-libp2p/p2p/discovery"
23+
tcp "github.com/libp2p/go-tcp-transport"
24+
ws "github.com/libp2p/go-ws-transport"
25+
"github.com/multiformats/go-multiaddr"
26+
"golang.org/x/crypto/blake2b"
27+
"log"
28+
"os"
29+
"path"
30+
"strconv"
31+
"time"
32+
)
33+
34+
// Helper contains all the daemon state
35+
type Helper struct {
36+
Host host.Host
37+
Mdns mdns.Service
38+
Dht *kad.IpfsDHT
39+
Ctx context.Context
40+
Pubsub *pubsub.PubSub
41+
}
42+
43+
type mdnsListener struct {
44+
FoundPeer chan peerstore.PeerInfo
45+
}
46+
47+
func (l *mdnsListener) HandlePeerFound(info peerstore.PeerInfo) {
48+
l.FoundPeer <- info
49+
}
50+
51+
type customValidator struct {
52+
Base record.Validator
53+
}
54+
55+
func (cv customValidator) Validate(key string, value []byte) error {
56+
log.Printf("DHT Validating: %s = %s", key, value)
57+
return cv.Base.Validate(key, value)
58+
}
59+
60+
func (cv customValidator) Select(key string, values [][]byte) (int, error) {
61+
log.Printf("DHT Selecting Among: %s = %s", key, bytes.Join(values, []byte("; ")))
62+
return cv.Base.Select(key, values)
63+
}
64+
65+
// MakeHelper does all the initialization to run one host
66+
func MakeHelper(ctx context.Context, listenOn []multiaddr.Multiaddr, statedir string, pk crypto.PrivKey, networkID string) (*Helper, error) {
67+
dso := dsb.DefaultOptions
68+
69+
bp := path.Join(statedir, strconv.Itoa(os.Getpid()))
70+
os.MkdirAll(bp, 0700)
71+
72+
ds, err := dsb.NewDatastore(path.Join(statedir, strconv.Itoa(os.Getpid()), "libp2p-peerstore-v0"), &dso)
73+
if err != nil {
74+
return nil, err
75+
}
76+
77+
dsoDht := dsb.DefaultOptions
78+
dsDht, err := dsb.NewDatastore(path.Join(statedir, strconv.Itoa(os.Getpid()), "libp2p-dht-v0"), &dsoDht)
79+
if err != nil {
80+
return nil, err
81+
}
82+
83+
ps, err := pstoreds.NewPeerstore(ctx, ds, pstoreds.DefaultOpts())
84+
if err != nil {
85+
return nil, err
86+
}
87+
88+
pnetKey := blake2b.Sum256([]byte("/coda/0.0.1"))
89+
prot, err := pnet.NewV1ProtectorFromBytes(&pnetKey)
90+
if err != nil {
91+
return nil, err
92+
}
93+
94+
rv := customValidator{Base: record.NamespacedValidator{"pk": record.PublicKeyValidator{}}}
95+
96+
kadch := make(chan *kad.IpfsDHT)
97+
98+
// Make sure this doesn't get too out of sync with the defaults,
99+
// NewWithoutDefaults is considered unstable.
100+
host, err := p2p.NewWithoutDefaults(ctx,
101+
p2p.Transport(tcp.NewTCPTransport),
102+
p2p.Transport(ws.New),
103+
p2p.Muxer("/mplex/6.7.0", DefaultMplexTransport),
104+
p2p.Security(secio.ID, secio.New),
105+
p2p.Identity(pk),
106+
p2p.Peerstore(ps),
107+
p2p.DisableRelay(),
108+
p2p.ListenAddrs(listenOn...),
109+
p2p.Routing(
110+
p2pconfig.RoutingC(func(host host.Host) (routing.PeerRouting, error) {
111+
kad, err := kad.New(ctx, host, kadopts.Datastore(dsDht), kadopts.Validator(rv))
112+
go func() { kadch <- kad }()
113+
return kad, err
114+
})),
115+
p2p.PrivateNetwork(prot))
116+
117+
if err != nil {
118+
return nil, err
119+
}
120+
121+
rendezvousString := fmt.Sprintf("/coda/0.0.1/%s", networkID)
122+
123+
mdns, err := mdns.NewMdnsService(ctx, host, time.Minute, "_coda-discovery._udp.local")
124+
if err != nil {
125+
return nil, err
126+
}
127+
l := &mdnsListener{FoundPeer: make(chan peerstore.PeerInfo)}
128+
mdns.RegisterNotifee(l)
129+
130+
kad := <-kadch
131+
132+
if err = kad.Bootstrap(ctx); err != nil {
133+
return nil, err
134+
}
135+
routingDiscovery := discovery.NewRoutingDiscovery(kad)
136+
137+
log.Println("Announcing ourselves for", rendezvousString)
138+
discovery.Advertise(ctx, routingDiscovery, rendezvousString)
139+
140+
// try and find some peers for this chain
141+
//dhtpeers, err := routingDiscovery.FindPeers(ctx, rendezvousString, discovery.Limit(16))
142+
//if err != nil {
143+
// return nil, err
144+
//}
145+
146+
foundPeer := func(info peerstore.PeerInfo, source string) {
147+
if info.ID != "" {
148+
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
149+
defer cancel()
150+
if err := host.Connect(ctx, info); err != nil {
151+
log.Printf("Warn: couldn't connect to %s peer %v (different chain?): %v", source, info.Loggable(), err)
152+
} else {
153+
log.Printf("Found a %s peer: %s", source, info.Loggable())
154+
host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.PermanentAddrTTL)
155+
}
156+
}
157+
}
158+
159+
go func() {
160+
for {
161+
info := <-l.FoundPeer
162+
foundPeer(info, "local")
163+
}
164+
}()
165+
166+
pubsub, err := pubsub.NewFloodSub(ctx, host, pubsub.WithStrictSignatureVerification(true), pubsub.WithMessageSigning(true))
167+
if err != nil {
168+
return nil, err
169+
}
170+
171+
return &Helper{
172+
Host: host,
173+
Ctx: ctx,
174+
Mdns: mdns,
175+
Dht: kad,
176+
Pubsub: pubsub,
177+
}, nil
178+
}

src/gen_keys/main.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package main
2+
import (
3+
pnet "github.com/libp2p/go-libp2p-pnet"
4+
"log"
5+
"io"
6+
"os"
7+
)
8+
9+
func main() {
10+
key, err := pnet.GenerateV1PSK()
11+
if err != nil {
12+
log.Fatal("sad %s", err);
13+
}
14+
io.Copy(os.Stdout, key)
15+
}

src/generate_methodidx/main.go

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
// Copyright 2017 Google Inc. All rights reserved.
2+
// Licensed under the Apache License, Version 2.0 (the "License");
3+
// you may not use this file except in compliance with the License.
4+
// You may obtain a copy of the License at
5+
// http://www.apache.org/licenses/LICENSE-2.0
6+
//
7+
// Unless required by applicable law or agreed to writing, software distributed
8+
// under the License is distributed on a "AS IS" BASIS, WITHOUT WARRANTIES OR
9+
// CONDITIONS OF ANY KIND, either express or implied.
10+
//
11+
// See the License for the specific language governing permissions and
12+
// limitations under the License.
13+
14+
// Added as a .go file to avoid embedding issues of the template.
15+
16+
package main
17+
18+
import (
19+
"bytes"
20+
"go/format"
21+
"log"
22+
"os"
23+
"text/template"
24+
)
25+
26+
var generatedTmpl = template.Must(template.New("generated").Parse(`
27+
// generated by {{.Command}}; DO NOT EDIT
28+
29+
package {{.PackageName}}
30+
31+
import (
32+
"encoding/json"
33+
"fmt"
34+
)
35+
36+
{{range $typename, $values := .TypesAndValues}}
37+
38+
var (
39+
_{{$typename}}NameToValue = map[string]{{$typename}} {
40+
{{range $values}}"{{.}}": {{.}},
41+
{{end}}
42+
}
43+
44+
_{{$typename}}ValueToName = map[{{$typename}}]string {
45+
{{range $values}}{{.}}: "{{.}}",
46+
{{end}}
47+
}
48+
)
49+
50+
func init() {
51+
var v {{$typename}}
52+
if _, ok := interface{}(v).(fmt.Stringer); ok {
53+
_{{$typename}}NameToValue = map[string]{{$typename}} {
54+
{{range $values}}interface{}({{.}}).(fmt.Stringer).String(): {{.}},
55+
{{end}}
56+
}
57+
}
58+
}
59+
60+
// MarshalJSON is generated so {{$typename}} satisfies json.Marshaler.
61+
func (r {{$typename}}) MarshalJSON() ([]byte, error) {
62+
if s, ok := interface{}(r).(fmt.Stringer); ok {
63+
return json.Marshal(s.String())
64+
}
65+
s, ok := _{{$typename}}ValueToName[r]
66+
if !ok {
67+
return nil, fmt.Errorf("invalid {{$typename}}: %d", r)
68+
}
69+
return json.Marshal(s)
70+
}
71+
72+
// UnmarshalJSON is generated so {{$typename}} satisfies json.Unmarshaler.
73+
func (r *{{$typename}}) UnmarshalJSON(data []byte) error {
74+
var s string
75+
if err := json.Unmarshal(data, &s); err != nil {
76+
return fmt.Errorf("{{$typename}} should be a string, got %s", data)
77+
}
78+
v, ok := _{{$typename}}NameToValue[s]
79+
if !ok {
80+
return fmt.Errorf("invalid {{$typename}} %q", s)
81+
}
82+
*r = v
83+
return nil
84+
}
85+
86+
{{end}}
87+
`))
88+
89+
type analysis struct {
90+
Command string
91+
PackageName string
92+
TypesAndValues map[string][]string
93+
}
94+
95+
func main() {
96+
var buf bytes.Buffer
97+
98+
an := analysis{
99+
Command: "generate_methodidx",
100+
PackageName: "main",
101+
TypesAndValues: map[string][]string{
102+
"methodIdx": []string{"configure", "listen", "publish", "subscribe", "unsubscribe", "validationComplete", "generateKeypair", "openStream", "closeStream", "resetStream", "sendStreamMsg", "removeStreamHandler", "addStreamHandler", "listeningAddrs"},
103+
},
104+
}
105+
106+
if err := generatedTmpl.Execute(&buf, an); err != nil {
107+
log.Fatalf("generating code: %v", err)
108+
}
109+
110+
src, err := format.Source(buf.Bytes())
111+
if err != nil {
112+
// Should never happen, but can arise when developing this code.
113+
// The user can compile the output to see the error.
114+
log.Printf("warning: internal error: invalid Go generated: %s", err)
115+
log.Printf("warning: compile the package to analyze the error")
116+
src = buf.Bytes()
117+
}
118+
os.Stdout.Write(src)
119+
}

0 commit comments

Comments
 (0)