Skip to content

Commit 9feec51

Browse files
lmarsfjl
authored andcommitted
p2p: add network simulation framework (#14982)
This commit introduces a network simulation framework which can be used to run simulated networks of devp2p nodes. The intention is to use this for testing protocols, performing benchmarks and visualising emergent network behaviour.
1 parent 673007d commit 9feec51

34 files changed

+6522
-69
lines changed

cmd/p2psim/main.go

Lines changed: 414 additions & 0 deletions
Large diffs are not rendered by default.

node/api.go

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package node
1818

1919
import (
20+
"context"
2021
"fmt"
2122
"strings"
2223
"time"
@@ -25,6 +26,7 @@ import (
2526
"github.com/ethereum/go-ethereum/crypto"
2627
"github.com/ethereum/go-ethereum/p2p"
2728
"github.com/ethereum/go-ethereum/p2p/discover"
29+
"github.com/ethereum/go-ethereum/rpc"
2830
"github.com/rcrowley/go-metrics"
2931
)
3032

@@ -73,6 +75,44 @@ func (api *PrivateAdminAPI) RemovePeer(url string) (bool, error) {
7375
return true, nil
7476
}
7577

78+
// PeerEvents creates an RPC subscription which receives peer events from the
79+
// node's p2p.Server
80+
func (api *PrivateAdminAPI) PeerEvents(ctx context.Context) (*rpc.Subscription, error) {
81+
// Make sure the server is running, fail otherwise
82+
server := api.node.Server()
83+
if server == nil {
84+
return nil, ErrNodeStopped
85+
}
86+
87+
// Create the subscription
88+
notifier, supported := rpc.NotifierFromContext(ctx)
89+
if !supported {
90+
return nil, rpc.ErrNotificationsUnsupported
91+
}
92+
rpcSub := notifier.CreateSubscription()
93+
94+
go func() {
95+
events := make(chan *p2p.PeerEvent)
96+
sub := server.SubscribeEvents(events)
97+
defer sub.Unsubscribe()
98+
99+
for {
100+
select {
101+
case event := <-events:
102+
notifier.Notify(rpcSub.ID, event)
103+
case <-sub.Err():
104+
return
105+
case <-rpcSub.Err():
106+
return
107+
case <-notifier.Closed():
108+
return
109+
}
110+
}
111+
}()
112+
113+
return rpcSub, nil
114+
}
115+
76116
// StartRPC starts the HTTP RPC API server.
77117
func (api *PrivateAdminAPI) StartRPC(host *string, port *int, cors *string, apis *string) (bool, error) {
78118
api.node.lock.Lock()
@@ -163,7 +203,7 @@ func (api *PrivateAdminAPI) StartWS(host *string, port *int, allowedOrigins *str
163203
}
164204
}
165205

166-
if err := api.node.startWS(fmt.Sprintf("%s:%d", *host, *port), api.node.rpcAPIs, modules, origins); err != nil {
206+
if err := api.node.startWS(fmt.Sprintf("%s:%d", *host, *port), api.node.rpcAPIs, modules, origins, api.node.config.WSExposeAll); err != nil {
167207
return false, err
168208
}
169209
return true, nil

node/config.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,13 @@ type Config struct {
128128
// If the module list is empty, all RPC API endpoints designated public will be
129129
// exposed.
130130
WSModules []string `toml:",omitempty"`
131+
132+
// WSExposeAll exposes all API modules via the WebSocket RPC interface rather
133+
// than just the public ones.
134+
//
135+
// *WARNING* Only set this if the node is running in a trusted network, exposing
136+
// private APIs to untrusted users is a major security risk.
137+
WSExposeAll bool `toml:",omitempty"`
131138
}
132139

133140
// IPCEndpoint resolves an IPC endpoint based on a configured value, taking into

node/node.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ func (n *Node) startRPC(services map[reflect.Type]Service) error {
261261
n.stopInProc()
262262
return err
263263
}
264-
if err := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins); err != nil {
264+
if err := n.startWS(n.wsEndpoint, apis, n.config.WSModules, n.config.WSOrigins, n.config.WSExposeAll); err != nil {
265265
n.stopHTTP()
266266
n.stopIPC()
267267
n.stopInProc()
@@ -412,7 +412,7 @@ func (n *Node) stopHTTP() {
412412
}
413413

414414
// startWS initializes and starts the websocket RPC endpoint.
415-
func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrigins []string) error {
415+
func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrigins []string, exposeAll bool) error {
416416
// Short circuit if the WS endpoint isn't being exposed
417417
if endpoint == "" {
418418
return nil
@@ -425,7 +425,7 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig
425425
// Register all the APIs exposed by the services
426426
handler := rpc.NewServer()
427427
for _, api := range apis {
428-
if whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
428+
if exposeAll || whitelist[api.Namespace] || (len(whitelist) == 0 && api.Public) {
429429
if err := handler.RegisterName(api.Namespace, api.Service); err != nil {
430430
return err
431431
}
@@ -441,7 +441,7 @@ func (n *Node) startWS(endpoint string, apis []rpc.API, modules []string, wsOrig
441441
return err
442442
}
443443
go rpc.NewWSServer(wsOrigins, handler).Serve(listener)
444-
log.Info(fmt.Sprintf("WebSocket endpoint opened: ws://%s", endpoint))
444+
log.Info(fmt.Sprintf("WebSocket endpoint opened: ws://%s", listener.Addr()))
445445

446446
// All listeners booted successfully
447447
n.wsEndpoint = endpoint
@@ -556,6 +556,17 @@ func (n *Node) Attach() (*rpc.Client, error) {
556556
return rpc.DialInProc(n.inprocHandler), nil
557557
}
558558

559+
// RPCHandler returns the in-process RPC request handler.
560+
func (n *Node) RPCHandler() (*rpc.Server, error) {
561+
n.lock.RLock()
562+
defer n.lock.RUnlock()
563+
564+
if n.inprocHandler == nil {
565+
return nil, ErrNodeStopped
566+
}
567+
return n.inprocHandler, nil
568+
}
569+
559570
// Server retrieves the currently running P2P network layer. This method is meant
560571
// only to inspect fields of the currently running server, life cycle management
561572
// should be left to this Node entity.

p2p/dial.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,24 @@ const (
4747
maxResolveDelay = time.Hour
4848
)
4949

50+
// NodeDialer is used to connect to nodes in the network, typically by using
51+
// an underlying net.Dialer but also using net.Pipe in tests
52+
type NodeDialer interface {
53+
Dial(*discover.Node) (net.Conn, error)
54+
}
55+
56+
// TCPDialer implements the NodeDialer interface by using a net.Dialer to
57+
// create TCP connections to nodes in the network
58+
type TCPDialer struct {
59+
*net.Dialer
60+
}
61+
62+
// Dial creates a TCP connection to the node
63+
func (t TCPDialer) Dial(dest *discover.Node) (net.Conn, error) {
64+
addr := &net.TCPAddr{IP: dest.IP, Port: int(dest.TCP)}
65+
return t.Dialer.Dial("tcp", addr.String())
66+
}
67+
5068
// dialstate schedules dials and discovery lookups.
5169
// it get's a chance to compute new tasks on every iteration
5270
// of the main loop in Server.run.
@@ -318,14 +336,13 @@ func (t *dialTask) resolve(srv *Server) bool {
318336

319337
// dial performs the actual connection attempt.
320338
func (t *dialTask) dial(srv *Server, dest *discover.Node) bool {
321-
addr := &net.TCPAddr{IP: dest.IP, Port: int(dest.TCP)}
322-
fd, err := srv.Dialer.Dial("tcp", addr.String())
339+
fd, err := srv.Dialer.Dial(dest)
323340
if err != nil {
324341
log.Trace("Dial error", "task", t, "err", err)
325342
return false
326343
}
327344
mfd := newMeteredConn(fd, false)
328-
srv.setupConn(mfd, t.flags, dest)
345+
srv.SetupConn(mfd, t.flags, dest)
329346
return true
330347
}
331348

p2p/dial_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -597,7 +597,7 @@ func TestDialResolve(t *testing.T) {
597597
}
598598

599599
// Now run the task, it should resolve the ID once.
600-
config := Config{Dialer: &net.Dialer{Deadline: time.Now().Add(-5 * time.Minute)}}
600+
config := Config{Dialer: TCPDialer{&net.Dialer{Deadline: time.Now().Add(-5 * time.Minute)}}}
601601
srv := &Server{ntab: table, Config: config}
602602
tasks[0].Do(srv)
603603
if !reflect.DeepEqual(table.resolveCalls, []discover.NodeID{dest.ID}) {

p2p/discover/node.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,11 @@ func (n *Node) UnmarshalText(text []byte) error {
225225
// The node identifier is a marshaled elliptic curve public key.
226226
type NodeID [NodeIDBits / 8]byte
227227

228+
// Bytes returns a byte slice representation of the NodeID
229+
func (n NodeID) Bytes() []byte {
230+
return n[:]
231+
}
232+
228233
// NodeID prints as a long hexadecimal number.
229234
func (n NodeID) String() string {
230235
return fmt.Sprintf("%x", n[:])
@@ -240,6 +245,41 @@ func (n NodeID) TerminalString() string {
240245
return hex.EncodeToString(n[:8])
241246
}
242247

248+
// MarshalText implements the encoding.TextMarshaler interface.
249+
func (n NodeID) MarshalText() ([]byte, error) {
250+
return []byte(hex.EncodeToString(n[:])), nil
251+
}
252+
253+
// UnmarshalText implements the encoding.TextUnmarshaler interface.
254+
func (n *NodeID) UnmarshalText(text []byte) error {
255+
id, err := HexID(string(text))
256+
if err != nil {
257+
return err
258+
}
259+
*n = id
260+
return nil
261+
}
262+
263+
// BytesID converts a byte slice to a NodeID
264+
func BytesID(b []byte) (NodeID, error) {
265+
var id NodeID
266+
if len(b) != len(id) {
267+
return id, fmt.Errorf("wrong length, want %d bytes", len(id))
268+
}
269+
copy(id[:], b)
270+
return id, nil
271+
}
272+
273+
// MustBytesID converts a byte slice to a NodeID.
274+
// It panics if the byte slice is not a valid NodeID.
275+
func MustBytesID(b []byte) NodeID {
276+
id, err := BytesID(b)
277+
if err != nil {
278+
panic(err)
279+
}
280+
return id
281+
}
282+
243283
// HexID converts a hex string to a NodeID.
244284
// The string may be prefixed with 0x.
245285
func HexID(in string) (NodeID, error) {

p2p/discover/node_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package discover
1818

1919
import (
20+
"bytes"
2021
"fmt"
2122
"math/big"
2223
"math/rand"
@@ -192,6 +193,35 @@ func TestHexID(t *testing.T) {
192193
}
193194
}
194195

196+
func TestNodeID_textEncoding(t *testing.T) {
197+
ref := NodeID{
198+
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x10,
199+
0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x20,
200+
0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x30,
201+
0x31, 0x32, 0x33, 0x34, 0x35, 0x36, 0x37, 0x38, 0x39, 0x40,
202+
0x41, 0x42, 0x43, 0x44, 0x45, 0x46, 0x47, 0x48, 0x49, 0x50,
203+
0x51, 0x52, 0x53, 0x54, 0x55, 0x56, 0x57, 0x58, 0x59, 0x60,
204+
0x61, 0x62, 0x63, 0x64,
205+
}
206+
hex := "01020304050607080910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364"
207+
208+
text, err := ref.MarshalText()
209+
if err != nil {
210+
t.Fatal(err)
211+
}
212+
if !bytes.Equal(text, []byte(hex)) {
213+
t.Fatalf("text encoding did not match\nexpected: %s\ngot: %s", hex, text)
214+
}
215+
216+
id := new(NodeID)
217+
if err := id.UnmarshalText(text); err != nil {
218+
t.Fatal(err)
219+
}
220+
if *id != ref {
221+
t.Fatalf("text decoding did not match\nexpected: %s\ngot: %s", ref, id)
222+
}
223+
}
224+
195225
func TestNodeID_recover(t *testing.T) {
196226
prv := newkey()
197227
hash := make([]byte, 32)

p2p/message.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import (
2727
"sync/atomic"
2828
"time"
2929

30+
"github.com/ethereum/go-ethereum/event"
31+
"github.com/ethereum/go-ethereum/p2p/discover"
3032
"github.com/ethereum/go-ethereum/rlp"
3133
)
3234

@@ -271,3 +273,67 @@ func ExpectMsg(r MsgReader, code uint64, content interface{}) error {
271273
}
272274
return nil
273275
}
276+
277+
// msgEventer wraps a MsgReadWriter and sends events whenever a message is sent
278+
// or received
279+
type msgEventer struct {
280+
MsgReadWriter
281+
282+
feed *event.Feed
283+
peerID discover.NodeID
284+
Protocol string
285+
}
286+
287+
// newMsgEventer returns a msgEventer which sends message events to the given
288+
// feed
289+
func newMsgEventer(rw MsgReadWriter, feed *event.Feed, peerID discover.NodeID, proto string) *msgEventer {
290+
return &msgEventer{
291+
MsgReadWriter: rw,
292+
feed: feed,
293+
peerID: peerID,
294+
Protocol: proto,
295+
}
296+
}
297+
298+
// ReadMsg reads a message from the underlying MsgReadWriter and emits a
299+
// "message received" event
300+
func (self *msgEventer) ReadMsg() (Msg, error) {
301+
msg, err := self.MsgReadWriter.ReadMsg()
302+
if err != nil {
303+
return msg, err
304+
}
305+
self.feed.Send(&PeerEvent{
306+
Type: PeerEventTypeMsgRecv,
307+
Peer: self.peerID,
308+
Protocol: self.Protocol,
309+
MsgCode: &msg.Code,
310+
MsgSize: &msg.Size,
311+
})
312+
return msg, nil
313+
}
314+
315+
// WriteMsg writes a message to the underlying MsgReadWriter and emits a
316+
// "message sent" event
317+
func (self *msgEventer) WriteMsg(msg Msg) error {
318+
err := self.MsgReadWriter.WriteMsg(msg)
319+
if err != nil {
320+
return err
321+
}
322+
self.feed.Send(&PeerEvent{
323+
Type: PeerEventTypeMsgSend,
324+
Peer: self.peerID,
325+
Protocol: self.Protocol,
326+
MsgCode: &msg.Code,
327+
MsgSize: &msg.Size,
328+
})
329+
return nil
330+
}
331+
332+
// Close closes the underlying MsgReadWriter if it implements the io.Closer
333+
// interface
334+
func (self *msgEventer) Close() error {
335+
if v, ok := self.MsgReadWriter.(io.Closer); ok {
336+
return v.Close()
337+
}
338+
return nil
339+
}

0 commit comments

Comments
 (0)