diff --git a/.coveralls.yml b/.coveralls.yml new file mode 100644 index 0000000..e69de29 diff --git a/README.md b/README.md index dccfb94..73bde91 100644 --- a/README.md +++ b/README.md @@ -13,12 +13,12 @@ This library bundles different components for lower-level peer-to-peer connectio - Light Ethereum Subprotocol (LES/2) The library is based on [ethereumjs/node-devp2p](https://github.com/ethereumjs/node-devp2p) as well -as other sub-libraries (``node-*`` named) (all outdated). +as other sub-libraries (`node-*` named) (all outdated). ## Run/Build -This library has to be compiled with babel to a ``Node 6`` friendly source format. -For triggering a (first) build to create the ``lib/`` directory run: +This library has to be compiled with babel to a `Node 6` friendly source format. +For triggering a (first) build to create the `lib/` directory run: ``` npm run build @@ -32,7 +32,7 @@ node -r babel-register [YOUR_SCRIPT_TO_RUN.js] ## Usage/Examples -All components of this library are implemented as Node ``EventEmitter`` objects +All components of this library are implemented as Node `EventEmitter` objects and make heavy use of the Node.js network stack. You can react on events from the network like this: @@ -45,11 +45,11 @@ dpt.on('peer:added', (peer) => { Basic example to connect to some bootstrap nodes and get basic peer info: - - [simple](examples/simple.js) +- [simple](examples/simple.js) Communicate with peers to read new transaction and block information: - - [peer-communication](examples/peer-communication.js) +- [peer-communication](examples/peer-communication.js) Run an example with: @@ -59,9 +59,62 @@ node -r babel-register ./examples/peer-communication.js ## Distributed Peer Table (DPT) / Node Discovery -Maintain/manage a list of peers, see [./src/dpt/](./src/dpt/), also +Maintain/manage a list of peers, see [./src/dpt/](./src/dpt/), also includes node discovery ([./src/dpt/server.js](./src/dpt/server.js)) + +## Branches +- [master](https://github.com/ethereumjs/ethereumjs-devp2p) +- [discovery-v5](https://github.com/tcsiwula/ethereumjs-devp2p/tree/discovery-v5) (wip) +- [discv4-enr-extension-support](https://github.com/tcsiwula/ethereumjs-devp2p/tree/discv4-enr-extension-support) (wip) +- [les](https://github.com/ethereumjs/ethereumjs-devp2p) (todo) + + +### Node discovery v5 support (dscv5) + +Node discovery v5 (dscv5) support is turned off by default. + +It is currently in development on this branch: +[github.com/tcsiwula/ethereumjs-devp2p/tree/discovery-v5](https://github.com/tcsiwula/ethereumjs-devp2p/tree/discovery-v5). + +#### Running node discovery v5 + +Try the experimental discv5 peer-communication.js example. +Run the following: + +``` +npm install +npm run build +npm run v5 +``` + +or: + +``` +npm install +npm run build +node -r babel-register ./examples/peer-communication.js 5 +``` + +#### Running node discovery v4 + +Try the stable discv4 peer-communication.js example. +Run the following: + +``` +npm install +npm run build +npm run v4 +``` + +or: + +``` +npm install +npm run build +node -r babel-register ./examples/peer-communication.js 4 +``` + ### Usage Create your peer table: @@ -76,7 +129,7 @@ const dpt = new DPT(Buffer.from(PRIVATE_KEY, 'hex'), { }) ``` -Add some bootstrap nodes (or some custom nodes with ``dpt.addPeer()``): +Add some bootstrap nodes (or some custom nodes with `dpt.addPeer()`): ``` dpt.bootstrap(bootnode).catch((err) => console.error('Something went wrong!')) @@ -84,42 +137,48 @@ dpt.bootstrap(bootnode).catch((err) => console.error('Something went wrong!')) ### API - #### `DPT` (extends `EventEmitter`) -Distributed Peer Table. Manages a Kademlia DHT K-bucket (``Kbucket``) for storing peer information -and a ``BanList`` for keeping a list of bad peers. ``Server`` implements the node discovery (``ping``, -``pong``, ``findNeighbours``). + +Distributed Peer Table. Manages a Kademlia DHT K-bucket (`Kbucket`) for storing peer information +and a `BanList` for keeping a list of bad peers. `Server` implements the node discovery (`ping`, +`pong`, `findNeighbours`). ##### `new DPT(privateKey, options)` + Creates new DPT object + - `privateKey` - Key for message encoding/signing. -- `options.refreshInterval` - Interval in ms for refreshing (calling ``findNeighbours``) the peer list (default: ``60s``). -- `options.createSocket` - A datagram (dgram) ``createSocket`` function, passed to ``Server`` (default: ``dgram.createSocket.bind(null, 'udp4')``). -- `options.timeout` - Timeout in ms for server ``ping``, passed to ``Server`` (default: ``10s``). -- `options.endpoint` - Endpoint information to send with the server ``ping``, passed to ``Server`` (default: ``{ address: '0.0.0.0', udpPort: null, tcpPort: null }``). +- `options.refreshInterval` - Interval in ms for refreshing (calling `findNeighbours`) the peer list (default: `60s`). +- `options.createSocket` - A datagram (dgram) `createSocket` function, passed to `Server` (default: `dgram.createSocket.bind(null, 'udp4')`). +- `options.timeout` - Timeout in ms for server `ping`, passed to `Server` (default: `10s`). +- `options.endpoint` - Endpoint information to send with the server `ping`, passed to `Server` (default: `{ address: '0.0.0.0', udpPort: null, tcpPort: null }`). + +#### `dpt.bootstrap(peer)` (`async`) + +Uses a peer as new bootstrap peer and calls `findNeighbouts`. + +- `peer` - Peer to be added, format `{ address: [ADDRESS], udpPort: [UDPPORT], tcpPort: [TCPPORT] }`. -#### `dpt.bootstrap(peer)` (``async``) -Uses a peer as new bootstrap peer and calls ``findNeighbouts``. -- `peer` - Peer to be added, format ``{ address: [ADDRESS], udpPort: [UDPPORT], tcpPort: [TCPPORT] }``. +#### `dpt.addPeer(object)` (`async`) -#### `dpt.addPeer(object)` (``async``) Adds a new peer. -- `object` - Peer to be added, format ``{ address: [ADDRESS], udpPort: [UDPPORT], tcpPort: [TCPPORT] }``. -For other utility functions like ``getPeer``, ``getPeers`` see [./src/dpt/index.js](./src/dpt/index.js). +- `object` - Peer to be added, format `{ address: [ADDRESS], udpPort: [UDPPORT], tcpPort: [TCPPORT] }`. + +For other utility functions like `getPeer`, `getPeers` see [./src/dpt/index.js](./src/dpt/index.js). ### Events Events emitted: -| Event | Description | -| ------------- |:----------------------------------------:| -| peer:added | Peer added to DHT bucket | -| peer:removed | Peer removed from DHT bucket | -| peer:new | New peer added | -| listening | Forwarded from server | -| close | Forwarded from server | -| error | Forwarded from server | +| Event | Description | +| ------------ | :--------------------------: | +| peer:added | Peer added to DHT bucket | +| peer:removed | Peer removed from DHT bucket | +| peer:new | New peer added | +| listening | Forwarded from server | +| close | Forwarded from server | +| error | Forwarded from server | ### Reference @@ -133,7 +192,7 @@ Connect to a peer, organize the communication, see [./src/rlpx/](./src/rlpx/) ### Usage -Create your ``RLPx`` object, e.g.: +Create your `RLPx` object, e.g.: ``` const rlpx = new devp2p.RLPx(PRIVATE_KEY, { @@ -150,38 +209,42 @@ const rlpx = new devp2p.RLPx(PRIVATE_KEY, { ### API #### `RLPx` (extends `EventEmitter`) -Manages the handshake (`ECIES`) and the handling of the peer communication (``Peer``). + +Manages the handshake (`ECIES`) and the handling of the peer communication (`Peer`). ##### `new RLPx(privateKey, options)` + Creates new RLPx object + - `privateKey` - Key for message encoding/signing. -- `options.timeout` - Peer `ping` timeout in ms (default: ``10s``). -- `options.maxPeers` - Max number of peer connections (default: ``10``). -- `options.clientId` - Client ID string (default example: ``ethereumjs-devp2p/v2.1.3/darwin-x64/nodejs``). +- `options.timeout` - Peer `ping` timeout in ms (default: `10s`). +- `options.maxPeers` - Max number of peer connections (default: `10`). +- `options.clientId` - Client ID string (default example: `ethereumjs-devp2p/v2.1.3/darwin-x64/nodejs`). - `options.remoteClientIdFilter` - Optional list of client ID filter strings (e.g. `['go1.5', 'quorum']`). - `options.capabilities` - Upper layer protocol capabilities, e.g. `[devp2p.ETH.eth63, devp2p.ETH.eth62]`. -- `options.listenPort` - The listening port for the server or ``null`` for default. -- `options.dpt` - `DPT` object for the peers to connect to (default: ``null``, no `DPT` peer management). +- `options.listenPort` - The listening port for the server or `null` for default. +- `options.dpt` - `DPT` object for the peers to connect to (default: `null`, no `DPT` peer management). + +#### `rlpx.connect(peer)` (`async`) -#### `rlpx.connect(peer)` (``async``) Manually connect to peer without `DPT`. -- `peer` - Peer to connect to, format ``{ id: PEER_ID, address: PEER_ADDRESS, port: PEER_PORT }``. -For other connection/utility functions like ``listen``, ``getPeers`` see [./src/rlpx/index.js](./src/rlpx/index.js). +- `peer` - Peer to connect to, format `{ id: PEER_ID, address: PEER_ADDRESS, port: PEER_PORT }`. + +For other connection/utility functions like `listen`, `getPeers` see [./src/rlpx/index.js](./src/rlpx/index.js). ### Events Events emitted: -| Event | Description | -| ------------- |:----------------------------------------:| -| peer:added | Handshake with peer successful | -| peer:removed | Disconnected from peer | -| peer:error | Error connecting to peer | -| listening | Forwarded from server | -| close | Forwarded from server | -| error | Forwarded from server | - +| Event | Description | +| ------------ | :----------------------------: | +| peer:added | Handshake with peer successful | +| peer:removed | Disconnected from peer | +| peer:error | Error connecting to peer | +| listening | Forwarded from server | +| close | Forwarded from server | +| error | Forwarded from server | ### Reference @@ -194,7 +257,7 @@ Upper layer protocol for exchanging Ethereum network data like block headers or ### Usage -Send the initial status message with ``sendStatus()``, then wait for the corresponding `status` message +Send the initial status message with `sendStatus()`, then wait for the corresponding `status` message to arrive to start the communication. ``` @@ -204,7 +267,7 @@ eth.once('status', () => { }) ``` -Wait for follow-up messages to arrive, send your responses. +Wait for follow-up messages to arrive, send your responses. ``` eth.on('message', async (code, payload) => { @@ -214,26 +277,33 @@ eth.on('message', async (code, payload) => { }) ``` -See the ``peer-communication.js`` example for a more detailed use case. +See the `peer-communication.js` example for a more detailed use case. ### API #### `ETH` (extends `EventEmitter`) + Handles the different message types like `NEW_BLOCK_HASHES` or `GET_NODE_DATA` (see `MESSAGE_CODES`) for a complete list. Currently protocol versions `PV62` and `PV63` are supported. ##### `new ETH(privateKey, options)` -Normally not instantiated directly but created as a ``SubProtocol`` in the ``Peer`` object. + +Normally not instantiated directly but created as a `SubProtocol` in the `Peer` object. + - `version` - The protocol version for communicating, e.g. `63`. - `peer` - `Peer` object to communicate with. -- `send` - Wrapped ``peer.sendMessage()`` function where the communication is routed to. +- `send` - Wrapped `peer.sendMessage()` function where the communication is routed to. #### `eth.sendStatus(status)` + Send initial status message. -- `status` - Status message to send, format ``{ networkId: CHAIN_ID, td: TOTAL_DIFFICULTY_BUFFER, bestHash: BEST_HASH_BUFFER, genesisHash: GENESIS_HASH_BUFFER }``. + +- `status` - Status message to send, format `{ networkId: CHAIN_ID, td: TOTAL_DIFFICULTY_BUFFER, bestHash: BEST_HASH_BUFFER, genesisHash: GENESIS_HASH_BUFFER }`. #### `eth.sendMessage(code, payload)` + Send initial status message. + - `code` - The message code, see `MESSAGE_CODES` for available message types. - `payload` - Payload as a list, will be rlp-encoded. @@ -241,10 +311,10 @@ Send initial status message. Events emitted: -| Event | Description | -| ------------- |:----------------------------------------:| -| message | Message received | -| status | Status info received | +| Event | Description | +| ------- | :------------------: | +| message | Message received | +| status | Status info received | ### Reference @@ -256,7 +326,7 @@ Upper layer protocol used by light clients, see [./src/les/](./src/les/). ### Usage -Send the initial status message with ``sendStatus()``, then wait for the corresponding `status` message +Send the initial status message with `sendStatus()`, then wait for the corresponding `status` message to arrive to start the communication. ``` @@ -266,7 +336,7 @@ les.once('status', () => { }) ``` -Wait for follow-up messages to arrive, send your responses. +Wait for follow-up messages to arrive, send your responses. ``` les.on('message', async (code, payload) => { @@ -276,26 +346,33 @@ les.on('message', async (code, payload) => { }) ``` -See the ``peer-communication-les.js`` example for a more detailed use case. +See the `peer-communication-les.js` example for a more detailed use case. ### API #### `LES` (extends `EventEmitter`) + Handles the different message types like `BLOCK_HEADERS` or `GET_PROOFS_V2` (see `MESSAGE_CODES`) for -a complete list. Currently protocol version ``LES/2`` running in client-mode is supported. +a complete list. Currently protocol version `LES/2` running in client-mode is supported. ##### `new LES(privateKey, options)` -Normally not instantiated directly but created as a ``SubProtocol`` in the ``Peer`` object. + +Normally not instantiated directly but created as a `SubProtocol` in the `Peer` object. + - `version` - The protocol version for communicating, e.g. `2`. - `peer` - `Peer` object to communicate with. -- `send` - Wrapped ``peer.sendMessage()`` function where the communication is routed to. +- `send` - Wrapped `peer.sendMessage()` function where the communication is routed to. #### `les.sendStatus(status)` + Send initial status message. -- `status` - Status message to send, format ``{ networkId: CHAIN_ID, headTd: TOTAL_DIFFICULTY_BUFFER, headHash: HEAD_HASH_BUFFER, headNum: HEAD_NUM_BUFFER, genesisHash: GENESIS_HASH_BUFFER }``. + +- `status` - Status message to send, format `{ networkId: CHAIN_ID, headTd: TOTAL_DIFFICULTY_BUFFER, headHash: HEAD_HASH_BUFFER, headNum: HEAD_NUM_BUFFER, genesisHash: GENESIS_HASH_BUFFER }`. #### `les.sendMessage(code, reqId, payload)` + Send initial status message. + - `code` - The message code, see `MESSAGE_CODES` for available message types. - `reqId` - Request ID, will be echoed back on response. - `payload` - Payload as a list, will be rlp-encoded. @@ -304,10 +381,10 @@ Send initial status message. Events emitted: -| Event | Description | -| ------------- |:----------------------------------------:| -| message | Message received | -| status | Status info received | +| Event | Description | +| ------- | :------------------: | +| message | Message received | +| status | Status info received | ### Reference @@ -315,7 +392,7 @@ Events emitted: ## Tests -There are unit tests in the ``test/`` directory which can be run with: +There are unit tests in the `test/` directory which can be run with: ``` npm run test @@ -325,8 +402,8 @@ npm run test This library uses [debug](https://github.com/visionmedia/debug) debugging utility package. -For the debugging output to show up, set the ``DEBUG`` environment variable (e.g. in Linux/Mac OS: -``export DEBUG=*,-babel``). +For the debugging output to show up, set the `DEBUG` environment variable (e.g. in Linux/Mac OS: +`export DEBUG=*,-babel`). You should now see debug output like to following when running one of the examples above (the indented lines): @@ -343,7 +420,7 @@ Remove peer: 52.169.42.101:30303 (peer disconnect, reason code: 16) (total: 1) ### Other Implementations -The following is a list of major implementations of the ``devp2p`` stack in other languages: +The following is a list of major implementations of the `devp2p` stack in other languages: - [pydevp2p](https://github.com/ethereum/pydevp2p) (Python) - [Go Ethereum](https://github.com/ethereum/go-ethereum/tree/master/p2p) (Go) @@ -351,7 +428,7 @@ The following is a list of major implementations of the ``devp2p`` stack in othe ### Links -- [Blog article series](https://ocalog.com/post/10/) on implementing Ethereum protocol stack +- [Blog article series](https://ocalog.com/post/10/) on implementing Ethereum protocol stack ## License diff --git a/examples/bootstrapNodes.json b/examples/bootstrapNodes.json new file mode 100644 index 0000000..08811e2 --- /dev/null +++ b/examples/bootstrapNodes.json @@ -0,0 +1,47 @@ +[ + { + "ip": "52.16.188.185", + "port": "30303", + "id": "a979fb575495b8d6db44f750317d0f4622bf4c2aa3365d6af7c284339968eef29b69ad0dce72a4d8db5ebb4968de0e3bec910127f134779fbcb0cb6d3331163c", + "network": "Mainnet", + "chainId": 1, + "location": "MAINNET_BOOTNODES_Node1", + "comment": "MAINNET_BOOTNODES_Node1" + }, + { + "ip": "13.93.211.84", + "port": "30303", + "id": "aa36fdf33dd030378a0168efe6ed7d5cc587fafa3cdd375854fe735a2e11ea3650ba29644e2db48368c46e1f60e716300ba49396cd63778bf8a818c09bded46f", + "network": "Mainnet", + "chainId": 1, + "location": "MAINNET_BOOTNODES_Node2", + "comment": "MAINNET_BOOTNODES_Node2" + }, + { + "ip": "191.235.84.50", + "port": "30303", + "id": "78de8a0916848093c73790ead81d1928bec737d565119932b98c6b100d944b7a95e94f847f689fc723399d2e31129d182f7ef3863f2b4c820abbf3ab2722344d", + "network": "Mainnet", + "chainId": 1, + "location": "MAINNET_BOOTNODES_Node3", + "comment": "MAINNET_BOOTNODES_Node3" + }, + { + "ip": "13.75.154.138", + "port": "30303", + "id": "158f8aab45f6d19c6cbf4a089c2670541a8da11978a2f90dbf6a502a4a3bab80d288afdbeb7ec0ef6d92de563767f3b1ea9e8e334ca711e9f8e2df5a0385e8e6", + "network": "Mainnet", + "chainId": 1, + "location": "MAINNET_BOOTNODES_Node4", + "comment": "MAINNET_BOOTNODES_Node4" + }, + { + "ip": "52.74.57.123", + "port": "30303", + "id": "1118980bf48b0a3640bdba04e0fe78b1add18e1cd99bf22d53daac1fd9972ad650df52176e7c7d89d1114cfef2bc23a2959aa54998a46afcf7d91809f0855082", + "network": "Mainnet", + "chainId": 1, + "location": "MAINNET_BOOTNODES_Node5", + "comment": "MAINNET_BOOTNODES_Node5" + } +] diff --git a/examples/discv5/peer-communication.js b/examples/discv5/peer-communication.js new file mode 100644 index 0000000..2a33128 --- /dev/null +++ b/examples/discv5/peer-communication.js @@ -0,0 +1,420 @@ +// run it: +// npm install +// node -r babel-register ./examples/peer-communication.js + +const devp2p = require("../src"); +const EthereumTx = require("ethereumjs-tx"); +const EthereumBlock = require("ethereumjs-block"); +const LRUCache = require("lru-cache"); +const ms = require("ms"); +const chalk = require("chalk"); +const assert = require("assert"); +const { randomBytes } = require("crypto"); +const rlp = require("rlp-encoding"); +const Buffer = require("safe-buffer").Buffer; + +const PRIVATE_KEY = randomBytes(32); +const CHAIN_ID = 1; + +const BOOTNODES = require("./bootstrapNodes.json") + .filter(node => { + return node.chainId === CHAIN_ID; + }) + .map(node => { + return { + address: node.ip, + udpPort: node.port, + tcpPort: node.port + }; + }); + +const CHECK_BLOCK_TITLE = "Byzantium Fork"; // Only for debugging/console output +const CHECK_BLOCK_NR = 4370000; +const CHECK_BLOCK = + "b1fcff633029ee18ab6482b58ff8b6e95dd7c82a954c852157152a7a6d32785e"; +const CHECK_BLOCK_HEADER = rlp.decode( + Buffer.from( + "f9020aa0a0890da724dd95c90a72614c3a906e402134d3859865f715f5dfb398ac00f955a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942a65aca4d5fc5b5c859090a6c34d164135398226a074cccff74c5490fbffc0e6883ea15c0e1139e2652e671f31f25f2a36970d2f87a00e750bf284c2b3ed1785b178b6f49ff3690a3a91779d400de3b9a3333f699a80a0c68e3e82035e027ade5d966c36a1d49abaeec04b83d64976621c355e58724b8bb90100040019000040000000010000000000021000004020100688001a05000020816800000010a0000100201400000000080100020000000400080000800004c0200000201040000000018110400c000000200001000000280000000100000010010080000120010000050041004000018000204002200804000081000011800022002020020140000000020005080001800000000008102008140008600000000100000500000010080082002000102080000002040120008820400020100004a40801000002a0040c000010000114000000800000050008300020100000000008010000000100120000000040000000808448200000080a00000624013000000080870552416761fabf83475b02836652b383661a72845a25c530894477617266506f6f6ca0dc425fdb323c469c91efac1d2672dfdd3ebfde8fa25d68c1b3261582503c433788c35ca7100349f430", + "hex" + ) +); + +const getPeerAddr = peer => + `${peer._socket.remoteAddress}:${peer._socket.remotePort}`; + +// set the default version to 4 +let VERSION = devp2p._util.v4; + +// option to run version 5 via cli: node -r babel-register ./examples/peer-communication.js 5 +const cliVersion = process.argv[2]; + +if (cliVersion == 5) { + VERSION = devp2p._util.v5; +} + +// DPT +const dpt = new devp2p.DPT(PRIVATE_KEY, { + refreshInterval: 30000, + version: VERSION, + endpoint: { + address: "0.0.0.0", + udpPort: null, + tcpPort: null + } +}); + +dpt.on("error", err => console.error(chalk.red(`DPT error: ${err}`))); + +// RLPx +const rlpx = new devp2p.RLPx(PRIVATE_KEY, { + dpt: dpt, + maxPeers: 25, + capabilities: [devp2p.ETH.eth63, devp2p.ETH.eth62], + listenPort: null +}); + +rlpx.on("error", err => + console.error(chalk.red(`RLPx error: ${err.stack || err}`)) +); + +rlpx.on("peer:added", peer => { + const addr = getPeerAddr(peer); + const eth = peer.getProtocols()[0]; + const requests = { headers: [], bodies: [], msgTypes: {} }; + + const clientId = peer.getHelloMessage().clientId; + + console.log( + chalk.green( + `Add peer: ${addr} ${clientId} (eth${eth.getVersion()}) (total: ${ + rlpx.getPeers().length + })` + ) + ); + + eth.sendStatus({ + networkId: CHAIN_ID, + td: devp2p._util.int2buffer(17179869184), // total difficulty in genesis block + bestHash: Buffer.from( + "d4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3", + "hex" + ), + genesisHash: Buffer.from( + "d4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3", + "hex" + ) + }); + + // check CHECK_BLOCK + let forkDrop = null; + let forkVerified = false; + eth.once("status", () => { + eth.sendMessage(devp2p.ETH.MESSAGE_CODES.GET_BLOCK_HEADERS, [ + CHECK_BLOCK_NR, + 1, + 0, + 0 + ]); + forkDrop = setTimeout(() => { + peer.disconnect(devp2p.RLPx.DISCONNECT_REASONS.USELESS_PEER); + }, ms("15s")); + peer.once("close", () => clearTimeout(forkDrop)); + }); + + eth.on("message", async (code, payload) => { + if (code in requests.msgTypes) { + requests.msgTypes[code] += 1; + } else { + requests.msgTypes[code] = 1; + } + + switch (code) { + case devp2p.ETH.MESSAGE_CODES.NEW_BLOCK_HASHES: + if (!forkVerified) break; + + for (let item of payload) { + const blockHash = item[0]; + if (blocksCache.has(blockHash.toString("hex"))) continue; + setTimeout(() => { + eth.sendMessage(devp2p.ETH.MESSAGE_CODES.GET_BLOCK_HEADERS, [ + blockHash, + 1, + 0, + 0 + ]); + requests.headers.push(blockHash); + }, ms("0.1s")); + } + break; + + case devp2p.ETH.MESSAGE_CODES.TX: + if (!forkVerified) break; + + for (let item of payload) { + const tx = new EthereumTx(item); + if (isValidTx(tx)) onNewTx(tx, peer); + } + + break; + + case devp2p.ETH.MESSAGE_CODES.GET_BLOCK_HEADERS: + const headers = []; + // hack + if (devp2p._util.buffer2int(payload[0]) === CHECK_BLOCK_NR) { + headers.push(CHECK_BLOCK_HEADER); + } + + if (requests.headers.length === 0 && requests.msgTypes[code] >= 8) { + peer.disconnect(devp2p.RLPx.DISCONNECT_REASONS.USELESS_PEER); + } else { + eth.sendMessage(devp2p.ETH.MESSAGE_CODES.BLOCK_HEADERS, headers); + } + break; + + case devp2p.ETH.MESSAGE_CODES.BLOCK_HEADERS: + if (!forkVerified) { + if (payload.length !== 1) { + console.log( + `${addr} expected one header for ${CHECK_BLOCK_TITLE} verify (received: ${ + payload.length + })` + ); + peer.disconnect(devp2p.RLPx.DISCONNECT_REASONS.USELESS_PEER); + break; + } + + const expectedHash = CHECK_BLOCK; + const header = new EthereumBlock.Header(payload[0]); + if (header.hash().toString("hex") === expectedHash) { + // console.log(`${addr} verified to be on the same side of the ${CHECK_BLOCK_TITLE}`) + clearTimeout(forkDrop); + forkVerified = true; + } + } else { + if (payload.length > 1) { + console.log( + `${addr} not more than one block header expected (received: ${ + payload.length + })` + ); + break; + } + + let isValidPayload = false; + const header = new EthereumBlock.Header(payload[0]); + while (requests.headers.length > 0) { + const blockHash = requests.headers.shift(); + if (header.hash().equals(blockHash)) { + isValidPayload = true; + setTimeout(() => { + eth.sendMessage(devp2p.ETH.MESSAGE_CODES.GET_BLOCK_BODIES, [ + blockHash + ]); + requests.bodies.push(header); + }, ms("0.1s")); + break; + } + } + + if (!isValidPayload) { + console.log( + `${addr} received wrong block header ${header + .hash() + .toString("hex")}` + ); + } + } + + break; + + case devp2p.ETH.MESSAGE_CODES.GET_BLOCK_BODIES: + if (requests.headers.length === 0 && requests.msgTypes[code] >= 8) { + peer.disconnect(devp2p.RLPx.DISCONNECT_REASONS.USELESS_PEER); + } else { + eth.sendMessage(devp2p.ETH.MESSAGE_CODES.BLOCK_BODIES, []); + } + break; + + case devp2p.ETH.MESSAGE_CODES.BLOCK_BODIES: + if (!forkVerified) break; + + if (payload.length !== 1) { + console.log( + `${addr} not more than one block body expected (received: ${ + payload.length + })` + ); + break; + } + + let isValidPayload = false; + while (requests.bodies.length > 0) { + const header = requests.bodies.shift(); + const block = new EthereumBlock([ + header.raw, + payload[0][0], + payload[0][1] + ]); + const isValid = await isValidBlock(block); + if (isValid) { + isValidPayload = true; + onNewBlock(block, peer); + break; + } + } + + if (!isValidPayload) { + console.log(`${addr} received wrong block body`); + } + + break; + + case devp2p.ETH.MESSAGE_CODES.NEW_BLOCK: + if (!forkVerified) break; + + const newBlock = new EthereumBlock(payload[0]); + const isValidNewBlock = await isValidBlock(newBlock); + if (isValidNewBlock) onNewBlock(newBlock, peer); + + break; + + case devp2p.ETH.MESSAGE_CODES.GET_NODE_DATA: + if (requests.headers.length === 0 && requests.msgTypes[code] >= 8) { + peer.disconnect(devp2p.RLPx.DISCONNECT_REASONS.USELESS_PEER); + } else { + eth.sendMessage(devp2p.ETH.MESSAGE_CODES.NODE_DATA, []); + } + break; + + case devp2p.ETH.MESSAGE_CODES.NODE_DATA: + break; + + case devp2p.ETH.MESSAGE_CODES.GET_RECEIPTS: + if (requests.headers.length === 0 && requests.msgTypes[code] >= 8) { + peer.disconnect(devp2p.RLPx.DISCONNECT_REASONS.USELESS_PEER); + } else { + eth.sendMessage(devp2p.ETH.MESSAGE_CODES.RECEIPTS, []); + } + break; + + case devp2p.ETH.MESSAGE_CODES.RECEIPTS: + break; + } + }); +}); + +rlpx.on("peer:removed", (peer, reasonCode, disconnectWe) => { + const who = disconnectWe ? "we disconnect" : "peer disconnect"; + const total = rlpx.getPeers().length; + console.log( + chalk.yellow( + `Remove peer: ${getPeerAddr( + peer + )} - ${who}, reason: ${peer.getDisconnectPrefix(reasonCode)} (${String( + reasonCode + )}) (total: ${total})` + ) + ); +}); + +rlpx.on("peer:error", (peer, err) => { + if (err.code === "ECONNRESET") return; + + if (err instanceof assert.AssertionError) { + const peerId = peer.getId(); + if (peerId !== null) dpt.banPeer(peerId, ms("5m")); + + console.error( + chalk.red(`Peer error (${getPeerAddr(peer)}): ${err.message}`) + ); + return; + } + + console.error( + chalk.red(`Peer error (${getPeerAddr(peer)}): ${err.stack || err}`) + ); +}); + +// // uncomment, if you want accept incoming connections +// rlpx.listen(30303, '0.0.0.0') +// dpt.bind(30303, '0.0.0.0') + +for (let bootnode of BOOTNODES) { + dpt.bootstrap(bootnode).catch(err => { + console.error(chalk.bold.red(`DPT bootstrap error: ${err.stack || err}`)); + }); +} + +// connect to local ethereum node (debug) +/* +dpt.addPeer({ address: '127.0.0.1', udpPort: 30303, tcpPort: 30303 }) + .then((peer) => { + return rlpx.connect({ + id: peer.id, + address: peer.address, + port: peer.tcpPort + }) + }) + .catch((err) => console.log(`error on connection to local node: ${err.stack || err}`)) +*/ + +const txCache = new LRUCache({ max: 1000 }); +function onNewTx(tx, peer) { + const txHashHex = tx.hash().toString("hex"); + if (txCache.has(txHashHex)) return; + + txCache.set(txHashHex, true); + + // uncomment if you want tx:hostname:port details (debug) + // console.log(`New tx: ${txHashHex} (from ${getPeerAddr(peer)})`) +} + +const blocksCache = new LRUCache({ max: 100 }); +function onNewBlock(block, peer) { + const blockHashHex = block.hash().toString("hex"); + const blockNumber = devp2p._util.buffer2int(block.header.number); + if (blocksCache.has(blockHashHex)) return; + + blocksCache.set(blockHashHex, true); + console.log( + `----------------------------------------------------------------------------------------------------------` + ); + console.log( + `New block ${blockNumber}: ${blockHashHex} (from ${getPeerAddr(peer)})` + ); + console.log( + `----------------------------------------------------------------------------------------------------------` + ); + for (let tx of block.transactions) onNewTx(tx, peer); +} + +function isValidTx(tx) { + return tx.validate(false); +} + +async function isValidBlock(block) { + if (!block.validateUnclesHash()) return false; + if (!block.transactions.every(isValidTx)) return false; + return new Promise((resolve, reject) => { + block.genTxTrie(() => { + try { + resolve(block.validateTransactionsTrie()); + } catch (err) { + reject(err); + } + }); + }); +} + +setInterval(() => { + const peersCount = dpt.getPeers().length; + const openSlots = rlpx._getOpenSlots(); + const queueLength = rlpx._peersQueue.length; + const queueLength2 = rlpx._peersQueue.filter(o => o.ts <= Date.now()).length; + + console.log( + chalk.yellow( + `Total nodes in DPT: ${peersCount}, open slots: ${openSlots}, queue: ${queueLength} / ${queueLength2}` + ) + ); +}, ms("30s")); diff --git a/examples/discv5/simple.js b/examples/discv5/simple.js new file mode 100644 index 0000000..a2771a3 --- /dev/null +++ b/examples/discv5/simple.js @@ -0,0 +1,52 @@ +const chalk = require("chalk"); +const { DISCV5 } = require("../../src"); +const Buffer = require("safe-buffer").Buffer; + +const PRIVATE_KEY = + "d772e3d6a001a38064dd23964dd2836239fa0e6cec8b28972a87460a17210fe9"; +const BOOTNODES = require("./../bootstrapNodes.json").map(node => { + return { + address: node.ip, + udpPort: node.port, + tcpPort: node.port + }; +}); + +const discv5 = new DISCV5(Buffer.from(PRIVATE_KEY, "hex"), { + version: "5", + endpoint: { + address: "0.0.0.0", + udpPort: null, + tcpPort: null + } +}); + +discv5.on("error", err => console.error(chalk.red(err.stack || err))); + +discv5.on("peer:added", peer => { + const info = `(${peer.id.toString("hex")},${peer.address},${peer.udpPort},${ + peer.tcpPort + })`; + console.log( + chalk.green(`New peer: ${info} (total: ${discv5.getPeers().length})`) + ); +}); + +discv5.on("peer:removed", peer => { + console.log( + chalk.yellow( + `Remove peer: ${peer.id.toString("hex")} (total: ${ + discv5.getPeers().length + })` + ) + ); +}); + +// for accept incoming connections uncomment next line +discv5.bind(30303, "0.0.0.0"); + +for (let bootnode of BOOTNODES) { + discv5 + .bootstrap(bootnode) + .catch(err => console.error(chalk.bold.red(err.stack || err))); +} diff --git a/examples/peer-communication.js b/examples/peer-communication.js index c71cdd5..0e4dc9a 100644 --- a/examples/peer-communication.js +++ b/examples/peer-communication.js @@ -1,3 +1,7 @@ +// run it: +// npm install +// node -r babel-register ./examples/peer-communication.js + const devp2p = require('../src') const EthereumTx = require('ethereumjs-tx') const EthereumBlock = require('ethereumjs-block') @@ -8,11 +12,10 @@ const assert = require('assert') const { randomBytes } = require('crypto') const rlp = require('rlp-encoding') const Buffer = require('safe-buffer').Buffer - const PRIVATE_KEY = randomBytes(32) const CHAIN_ID = 1 -const BOOTNODES = require('ethereum-common').bootstrapNodes.filter((node) => { +const BOOTNODES = require('./bootstrapNodes.json').filter((node) => { return node.chainId === CHAIN_ID }).map((node) => { return { @@ -21,7 +24,6 @@ const BOOTNODES = require('ethereum-common').bootstrapNodes.filter((node) => { tcpPort: node.port } }) -const REMOTE_CLIENTID_FILTER = ['go1.5', 'go1.6', 'go1.7', 'quorum', 'pirl', 'ubiq', 'gmc', 'gwhale', 'prichain'] const CHECK_BLOCK_TITLE = 'Byzantium Fork' // Only for debugging/console output const CHECK_BLOCK_NR = 4370000 @@ -30,9 +32,27 @@ const CHECK_BLOCK_HEADER = rlp.decode(Buffer.from('f9020aa0a0890da724dd95c90a726 const getPeerAddr = (peer) => `${peer._socket.remoteAddress}:${peer._socket.remotePort}` +// set the default version to 4 +let VERSION = devp2p._util.v4 + +// option to run version 5 via cli: node -r babel-register ./examples/peer-communication.js 5 +const cliVersion = process.argv[2] + +if (cliVersion === "5") { + VERSION = devp2p._util.v5 +} + +console.log(`peer-communication.js`) +console.log(`VERSION = ${VERSION}`) +console.log(`cliVersion = ${cliVersion} `) +console.log(`cliVersion === 5 ===> ${cliVersion === 5} `) +console.log(`cliVersion === "5" ===> ${cliVersion === "5"} `) + + // DPT const dpt = new devp2p.DPT(PRIVATE_KEY, { refreshInterval: 30000, + version: VERSION, endpoint: { address: '0.0.0.0', udpPort: null, @@ -50,7 +70,6 @@ const rlpx = new devp2p.RLPx(PRIVATE_KEY, { devp2p.ETH.eth63, devp2p.ETH.eth62 ], - remoteClientIdFilter: REMOTE_CLIENTID_FILTER, listenPort: null }) @@ -62,6 +81,7 @@ rlpx.on('peer:added', (peer) => { const requests = { headers: [], bodies: [], msgTypes: {} } const clientId = peer.getHelloMessage().clientId + console.log(chalk.green(`Add peer: ${addr} ${clientId} (eth${eth.getVersion()}) (total: ${rlpx.getPeers().length})`)) eth.sendStatus({ @@ -138,7 +158,7 @@ rlpx.on('peer:added', (peer) => { const expectedHash = CHECK_BLOCK const header = new EthereumBlock.Header(payload[0]) if (header.hash().toString('hex') === expectedHash) { - console.log(`${addr} verified to be on the same side of the ${CHECK_BLOCK_TITLE}`) + // console.log(`${addr} verified to be on the same side of the ${CHECK_BLOCK_TITLE}`) clearTimeout(forkDrop) forkVerified = true } @@ -286,7 +306,9 @@ function onNewTx (tx, peer) { if (txCache.has(txHashHex)) return txCache.set(txHashHex, true) - console.log(`New tx: ${txHashHex} (from ${getPeerAddr(peer)})`) + + // uncomment if you want tx:hostname:port details (debug) + // console.log(`New tx: ${txHashHex} (from ${getPeerAddr(peer)})`) } const blocksCache = new LRUCache({ max: 100 }) diff --git a/examples/simple.js b/examples/simple.js index be5be48..4b8f0bf 100644 --- a/examples/simple.js +++ b/examples/simple.js @@ -1,38 +1,51 @@ -const chalk = require('chalk') -const { DPT } = require('../src') -const Buffer = require('safe-buffer').Buffer +const chalk = require("chalk"); +const { DPT } = require("../src"); +const Buffer = require("safe-buffer").Buffer; -const PRIVATE_KEY = 'd772e3d6a001a38064dd23964dd2836239fa0e6cec8b28972a87460a17210fe9' -const BOOTNODES = require('ethereum-common').bootstrapNodes.map((node) => { +const PRIVATE_KEY = + "d772e3d6a001a38064dd23964dd2836239fa0e6cec8b28972a87460a17210fe9"; +const BOOTNODES = require("ethereum-common").bootstrapNodes.map(node => { return { address: node.ip, udpPort: node.port, tcpPort: node.port - } -}) + }; +}); -const dpt = new DPT(Buffer.from(PRIVATE_KEY, 'hex'), { +const dpt = new DPT(Buffer.from(PRIVATE_KEY, "hex"), { endpoint: { - address: '0.0.0.0', + address: "0.0.0.0", udpPort: null, tcpPort: null } -}) +}); -dpt.on('error', (err) => console.error(chalk.red(err.stack || err))) +dpt.on("error", err => console.error(chalk.red(err.stack || err))); -dpt.on('peer:added', (peer) => { - const info = `(${peer.id.toString('hex')},${peer.address},${peer.udpPort},${peer.tcpPort})` - console.log(chalk.green(`New peer: ${info} (total: ${dpt.getPeers().length})`)) -}) +dpt.on("peer:added", peer => { + const info = `(${peer.id.toString("hex")},${peer.address},${peer.udpPort},${ + peer.tcpPort + })`; + console.log( + chalk.green(`New peer: ${info} (total: ${dpt.getPeers().length})`) + ); +}); -dpt.on('peer:removed', (peer) => { - console.log(chalk.yellow(`Remove peer: ${peer.id.toString('hex')} (total: ${dpt.getPeers().length})`)) -}) +dpt.on("peer:removed", peer => { + console.log( + chalk.yellow( + `Remove peer: ${peer.id.toString("hex")} (total: ${ + dpt.getPeers().length + })` + ) + ); +}); // for accept incoming connections uncomment next line -// dpt.bind(30303, '0.0.0.0') +dpt.bind(30303, "0.0.0.0"); for (let bootnode of BOOTNODES) { - dpt.bootstrap(bootnode).catch((err) => console.error(chalk.bold.red(err.stack || err))) + dpt + .bootstrap(bootnode) + .catch(err => console.error(chalk.bold.red(err.stack || err))); } diff --git a/package.json b/package.json index 265a4ef..f8e51a5 100644 --- a/package.json +++ b/package.json @@ -12,7 +12,8 @@ "eth62", "eth63", "les", - "les2" + "les2", + "discv5" ], "homepage": "https://github.com/fanatid/ethereumjs-devp2p", "bugs": { @@ -23,7 +24,8 @@ "Alex Beregszaszi ", "Kirill Fomichev (https://github.com/fanatid)", "Martin Becze ", - "Holger Drewes " + "Holger Drewes ", + "Tim Siwula " ], "files": [ "src", @@ -38,14 +40,18 @@ "node": ">=6.0" }, "scripts": { - "coverage": "nyc npm run test && nyc report --reporter=text-lcov > .nyc_output/lcov.info", + "coverage": "npm run test && nyc report --reporter=text-lcov > .nyc_output/lcov.info", "coveralls": "npm run coverage && coveralls <.nyc_output/lcov.info", "build": "babel src -d lib", "integration": "tape -r babel-register test/integration/*.js", - "lint": "standard", + "lint": "standard --fix", "prepublish": "npm run build", "test": "npm run lint && npm run unit && npm run integration", - "unit": "tape -r babel-register test/*.js" + "unit": "tape -r babel-register test/*.js", + "dev": "nodemon lib/index.js", + "v5-simple": "node examples/discv5/simple.js", + "v5-peer": "node -r babel-register ./examples/peer-communication.js 5", + "v4": "node -r babel-register ./examples/peer-communication.js 4" }, "dependencies": { "babel-runtime": "^6.11.6", @@ -70,11 +76,17 @@ "babel-preset-env": "^1.6.1", "babel-register": "^6.14.0", "chalk": "^1.1.3", - "coveralls": "^3.0.0", + "coveralls": "^3.0.2", + "eslint-config-standard": "^11.0.0", + "eslint-plugin-import": "^2.13.0", + "eslint-plugin-node": "^6.0.1", + "eslint-plugin-promise": "^3.8.0", + "eslint-plugin-standard": "^3.1.0", "ethereum-common": "~0.2.0", "ethereumjs-block": "^1.3.0", "ethereumjs-tx": "^1.1.1", - "nyc": "^11.4.1", + "nodemon": "^1.18.4", + "nyc": "^11.9.0", "standard": "*", "tape": "^4.5.1" }, diff --git a/src/discv5/index.js b/src/discv5/index.js new file mode 100644 index 0000000..9101140 --- /dev/null +++ b/src/discv5/index.js @@ -0,0 +1,143 @@ +const { EventEmitter } = require("events"); +const secp256k1 = require("secp256k1"); +const Buffer = require("safe-buffer").Buffer; +const { randomBytes } = require("crypto"); +const createDebugLogger = require("debug"); +const ms = require("ms"); +const { pk2id, id2pk } = require("../util"); +const KBucket = require("./kbucket"); +const BanList = require("../dpt/ban-list"); +const Server = require("./server"); +const debug = createDebugLogger("devp2p:dpt"); +const chalk = require("chalk"); +const message = require("./message"); + +class DISCV5 extends EventEmitter { + constructor(privateKey, options) { + super(); + + this._privateKey = Buffer.from(privateKey); + this._id = pk2id(secp256k1.publicKeyCreate(this._privateKey, false)); + + // debug binary data + const info = id2pk(this._id); + + console.log(chalk.red(`+++++ index.js == DISCV5.this._id == ${info}`)); + + this._banlist = new BanList(); + this._kbucket = new KBucket(this._id); + this._kbucket.on("added", peer => this.emit("peer:added", peer)); + this._kbucket.on("removed", peer => this.emit("peer:removed", peer)); + this._kbucket.on("hey", (...args) => this._onKBucketPing(...args)); + + this._server = new Server(this, this._privateKey, { + createSocket: options.createSocket, + timeout: options.timeout, + version: options.version, + endpoint: options.endpoint + }); + + this._server.once("listening", () => this.emit("listening")); + this._server.once("close", () => this.emit("close")); + this._server.on("peers", peers => this._onServerPeers(peers)); + this._server.on("error", err => this.emit("error", err)); + + const refreshInterval = options.refreshInterval || ms("60s"); + this._refreshIntervalId = setInterval( + () => this.refresh(), + refreshInterval + ); + } + + bind(...args) { + this._server.bind(...args); + } + + destroy(...args) { + clearInterval(this._refreshIntervalId); + this._server.destroy(...args); + } + + _onKBucketPing(oldPeers, newPeer) { + if (this._banlist.has(newPeer)) return; + + let count = 0; + let err = null; + for (let peer of oldPeers) { + this._server + .hey(peer) + .catch(_err => { + this._banlist.add(peer, ms("5m")); + this._kbucket.remove(peer); + err = err || _err; + }) + .then(() => { + if (++count < oldPeers.length) return; + + if (err === null) this._banlist.add(newPeer, ms("5m")); + else this._kbucket.add(newPeer); + }); + } + } + + _onServerPeers(peers) { + for (let peer of peers) this.addPeer(peer).catch(() => {}); + } + + async bootstrap(peer) { + debug(`bootstrap with peer ${peer.address}:${peer.udpPort}`); + + peer = await this.addPeer(peer); + this._server.neighbors(peer, this._id); + } + + async addPeer(obj) { + if (this._banlist.has(obj)) throw new Error("Peer is banned"); + debug(`attempt adding peer ${obj.address}:${obj.udpPort}`); + + // check k-bucket first + const peer = this._kbucket.get(obj); + if (peer !== null) return peer; + + // check that peer is alive + try { + const peer = await this._server.hey(obj); + this.emit("peer:new", peer); + this._kbucket.add(peer); + return peer; + } catch (err) { + this._banlist.add(obj, ms("10m")); + throw err; + } + } + + getPeer(obj) { + return this._kbucket.get(obj); + } + + getPeers() { + return this._kbucket.getAll(); + } + + getClosestPeers(id) { + return this._kbucket.closest(id); + } + + removePeer(obj) { + this._kbucket.remove(obj); + } + + banPeer(obj, maxAge) { + this._banlist.add(obj, maxAge); + this._kbucket.remove(obj); + } + + refresh() { + const peers = this.getPeers(); + debug(`call .refresh (${peers.length} peers in table)`); + + for (let peer of peers) this._server.neighbors(peer, randomBytes(64)); + } +} + +module.exports = DISCV5; diff --git a/src/discv5/kbucket.js b/src/discv5/kbucket.js new file mode 100644 index 0000000..0298ef0 --- /dev/null +++ b/src/discv5/kbucket.js @@ -0,0 +1,73 @@ +const { EventEmitter } = require("events"); +const Buffer = require("safe-buffer").Buffer; +const _KBucket = require("k-bucket"); + +const KBUCKET_SIZE = 16; +const KBUCKET_CONCURRENCY = 3; + +class KBucket extends EventEmitter { + constructor(id) { + super(); + + this._peers = new Map(); + + this._kbucket = new _KBucket({ + localNodeId: id, + numberOfNodesPerKBucket: KBUCKET_SIZE, + numberOfNodesToPing: KBUCKET_CONCURRENCY + }); + + this._kbucket.on("added", peer => { + KBucket.getKeys(peer).forEach(key => this._peers.set(key, peer)); + this.emit("added", peer); + }); + + this._kbucket.on("removed", peer => { + KBucket.getKeys(peer).forEach(key => this._peers.delete(key, peer)); + this.emit("removed", peer); + }); + + this._kbucket.on("hey", (...args) => this.emit("hey", ...args)); + + // this._kbucket.on("ping", (...args) => this.emit("ping", ...args)); + } + + static getKeys(obj) { + if (Buffer.isBuffer(obj)) return [obj.toString("hex")]; + if (typeof obj === "string") return [obj]; + + const keys = []; + if (Buffer.isBuffer(obj.id)) keys.push(obj.id.toString("hex")); + if (obj.address && obj.port) keys.push(`${obj.address}:${obj.port}`); + return keys; + } + + add(peer) { + const isExists = KBucket.getKeys(peer).some(key => this._peers.has(key)); + if (!isExists) this._kbucket.add(peer); + } + + get(obj) { + for (let key of KBucket.getKeys(obj)) { + const peer = this._peers.get(key); + if (peer !== undefined) return peer; + } + + return null; + } + + getAll() { + return this._kbucket.toArray(); + } + + closest(id) { + return this._kbucket.closest(id, KBUCKET_SIZE); + } + + remove(obj) { + const peer = this.get(obj); + if (peer !== null) this._kbucket.remove(peer.id); + } +} + +module.exports = KBucket; diff --git a/src/discv5/message.js b/src/discv5/message.js new file mode 100644 index 0000000..156cfb1 --- /dev/null +++ b/src/discv5/message.js @@ -0,0 +1,301 @@ +/* + discv5 message packet types + UDP {packet types} for node discovery protocol version 5 + Max packet size = 1280 bytes + See: https://github.com/fjl/p2p-drafts/blob/master/discv5-packets.md#packets + */ +const ip = require("ip"); +const rlp = require("rlp-encoding"); +const secp256k1 = require("secp256k1"); +const Buffer = require("safe-buffer").Buffer; +const { keccak256, int2buffer, buffer2int, assertEq } = require("../util"); + +// ping +const hey = { + encode: function(obj) { + return [ + int2buffer(obj.version), + endpoint.encode(obj.from), + endpoint.encode(obj.to), + timestamp.encode(obj.timestamp) + ]; + + // message = _pack(CMD_PING.id, payload, self.privkey) + // self.send(node, message) + // # Return the msg hash, which is used as a token to identify pongs. + // return message[:MAC_SIZE] + }, + decode: function(payload) { + return { + version: buffer2int(payload[0]), + from: endpoint.decode(payload[1]), + to: endpoint.decode(payload[2]), + timestamp: timestamp.decode(payload[3]) + }; + } +}; + +// const pong = { +// encode: function(obj) { +// return [endpoint.encode(obj.to), obj.hash, timestamp.encode(obj.timestamp)]; +// }, +// decode: function(payload) { +// return { +// to: endpoint.decode(payload[0]), +// hash: payload[1], +// timestamp: timestamp.decode(payload[2]) +// }; +// } +// }; + +/* + findNode packet (0x03) + requests a neightbors packet containing the closest know nodes to the target hash. +*/ +const findNode = { + // console.log(chalk.yellow(`+++++ +++++`)); + // console.log(chalk.yellow("******* findNode == " + info.typename)); + // console.log(chalk.yellow(`+++++ +++++`)); + encode: function(obj) { + console.log(chalk.blue("******* findNode.encode == ")); + console.log(chalk.blue(`+++++ +++++`)); + return [endpoint.encode(obj.to), obj.hash, timestamp.encode(obj.timestamp)]; + }, + decode: function(payload) { + return { + to: endpoint.decode(payload[0]), + hash: payload[1], + timestamp: timestamp.decode(payload[2]) + }; + } +}; + +const neighbors = { + encode: function(obj) { + return [obj.id, timestamp.encode(obj.timestamp)]; + }, + decode: function(payload) { + return { + id: payload[0], + timestamp: timestamp.decode(payload[1]) + }; + } +}; + +const requestTicket = { + encode: function(obj) { + return [obj.id, timestamp.encode(obj.timestamp)]; + }, + decode: function(payload) { + return { + id: payload[0], + timestamp: timestamp.decode(payload[1]) + }; + } +}; + +const ticket = { + encode: function(obj) { + return [ + obj.peers.map(peer => endpoint.encode(peer).concat(peer.id)), + timestamp.encode(obj.timestamp) + ]; + }, + decode: function(payload) { + return { + peers: payload[0].map(data => { + return { endpoint: endpoint.decode(data), id: data[3] }; // hack for id + }), + timestamp: timestamp.decode(payload[1]) + }; + } +}; + +const topicRegister = { + encode: function(obj) { + return [ + obj.peers.map(peer => endpoint.encode(peer).concat(peer.id)), + timestamp.encode(obj.timestamp) + ]; + }, + decode: function(payload) { + return { + peers: payload[0].map(data => { + return { endpoint: endpoint.decode(data), id: data[3] }; + }), + timestamp: timestamp.decode(payload[1]) + }; + } +}; + +const topicQuery = { + encode: function(obj) { + return [ + obj.peers.map(peer => endpoint.encode(peer).concat(peer.id)), + timestamp.encode(obj.timestamp) + ]; + }, + decode: function(payload) { + return { + peers: payload[0].map(data => { + return { endpoint: endpoint.decode(data), id: data[3] }; + }), + timestamp: timestamp.decode(payload[1]) + }; + } +}; + +const topicNodes = { + encode: function(obj) { + return [ + obj.peers.map(peer => endpoint.encode(peer).concat(peer.id)), + timestamp.encode(obj.timestamp) + ]; + }, + decode: function(payload) { + return { + peers: payload[0].map(data => { + return { endpoint: endpoint.decode(data), id: data[3] }; + }), + timestamp: timestamp.decode(payload[1]) + }; + } +}; + +function getTimestamp() { + return (Date.now() / 1000) | 0; +} + +const timestamp = { + encode: function(value = getTimestamp() + 60) { + const buffer = Buffer.allocUnsafe(4); + buffer.writeUInt32BE(value); + return buffer; + }, + decode: function(buffer) { + if (buffer.length !== 4) + throw new RangeError( + `Invalid timestamp buffer :${buffer.toString("hex")}` + ); + return buffer.readUInt32BE(0); + } +}; + +const address = { + encode: function(value) { + if (ip.isV4Format(value)) return ip.toBuffer(value); + if (ip.isV6Format(value)) return ip.toBuffer(value); + throw new Error(`Invalid address: ${value}`); + }, + decode: function(buffer) { + if (buffer.length === 4) return ip.toString(buffer); + if (buffer.length === 16) return ip.toString(buffer); + + const str = buffer.toString(); + if (ip.isV4Format(str) || ip.isV6Format(str)) return str; + + // also can be host, but skip it right now (because need async function for resolve) + // throw new Error(`Invalid address buffer: ${buffer.toString("hex")}`); + } +}; + +const port = { + encode: function(value) { + if (value === null) return Buffer.allocUnsafe(0); + if (value >>> 16 > 0) throw new RangeError(`Invalid port: ${value}`); + return Buffer.from([(value >>> 8) & 0xff, (value >>> 0) & 0xff]); + }, + decode: function(buffer) { + if (buffer.length === 0) return null; + // if (buffer.length !== 2) throw new RangeError(`Invalid port buffer: ${buffer.toString('hex')}`) + return buffer2int(buffer); + } +}; + +const endpoint = { + encode: function(obj) { + return [ + address.encode(obj.address), + port.encode(obj.udpPort), + port.encode(obj.tcpPort) + ]; + }, + decode: function(payload) { + return { + address: address.decode(payload[0]), + udpPort: port.decode(payload[1]), + tcpPort: port.decode(payload[2]) + }; + } +}; + +function encode(typename, data, privateKey) { + const type = types.byName[typename]; + if (type === undefined) throw new Error(`Invalid typename: ${typename}`); + const encodedMsg = messages[typename].encode(data); + const typedata = Buffer.concat([Buffer.from([type]), rlp.encode(encodedMsg)]); + + const sighash = keccak256(typedata); + const sig = secp256k1.sign(sighash, privateKey); + const hashdata = Buffer.concat([ + sig.signature, + Buffer.from([sig.recovery]), + typedata + ]); + const hash = keccak256(hashdata); + return Buffer.concat([hash, hashdata]); +} + +function decode(buffer) { + const hash = keccak256(buffer.slice(32)); + assertEq(buffer.slice(0, 32), hash, "Hash verification failed"); + + const typedata = buffer.slice(97); + const type = typedata[0]; + const typename = types.byType[type]; + if (typename === undefined) throw new Error(`Invalid type: ${type}`); + const data = messages[typename].decode(rlp.decode(typedata.slice(1))); + + const sighash = keccak256(typedata); + const signature = buffer.slice(32, 96); + const recoverId = buffer[96]; + const publicKey = secp256k1.recover(sighash, signature, recoverId, false); + + return { typename, data, publicKey }; +} + +const messages = { + hey, + findNode, + neighbors, + requestTicket, + ticket, + topicRegister, + topicQuery, + topicNodes +}; + +const types = { + byName: { + hey: 0x01, + findNode: 0x02, + neighbors: 0x03, + requestTicket: 0x04, + ticket: 0x05, + topicRegister: 0x06, + topicQuery: 0x07, + topicNodes: 0x08 + }, + byType: { + 0x01: "hey", + 0x02: "findNode", + 0x03: "neighbors", + 0x04: "requestTicket", + 0x05: "ticket", + 0x06: "topicRegister", + 0x07: "topicQuery", + 0x08: "topicNodes" + } +}; + +module.exports = { encode, decode }; diff --git a/src/discv5/node_record.js b/src/discv5/node_record.js new file mode 100644 index 0000000..5fe093f --- /dev/null +++ b/src/discv5/node_record.js @@ -0,0 +1,131 @@ +/* ------------------------------------------------------------------------------ + File: node_record.js + Description: Ethereum Node Record (ENR) data structure + ------------------------------------------------------------------------------ + Specification: https://github.com/fjl/p2p-drafts/blob/master/discv5-enr.md +*/ +const secp256k1 = require("secp256k1"); +const ip = require("ip"); +const rlp = require("rlp-encoding"); +const Buffer = require("safe-buffer").Buffer; +const { keccak256, int2buffer, buffer2int, assertEq } = require("../util"); + +/* --------------------------------------- + *** Constants *** + --------------------------------------- + MAX_RECORD_SIZE = 300 BYTES + SEQUENCE_SIZE = 64BITS = 8 bytes +*/ +const MAX_RECORD_SIZE = 300; +const SEQUENCE_SIZE = 8; + +/* --------------------------------------- + *** Node Record Specification *** + --------------------------------------- + Record { + signature: IdentityScheme, + sequence: 64bit int, + key/value: mapping + } +*/ + +class EthereumNodeRecord { + + // sorted key/value list + constructor(publicKey) { + } + + // let identityScheme = new DefaultIdentityScheme(); + + // signature of record contents + //let signature = "a"; + + // sequence number that acts like a nonce for record updates + let sequence = BigInt(0); + + // this is how records are signed and encoded + let content = Buffer.concat([ + rlp.encode(signature), + rpl.encode(sequence), + rlp.encode("id"), rlp.encode(defaultKeyValuePairs.get("id")), + rlp.encode("secp256k1"), rlp.encode(defaultKeyValuePairs.get("secp256k1")), + rlp.encode("ip"), rlp.encode(defaultKeyValuePairs.get("ip")), + rlp.encode("tcp"), rlp.encode(defaultKeyValuePairs.get("tcp")), + rlp.encode("udp"), rlp.encode(defaultKeyValuePairs.get("udp"))]); + + printRecord() { + console.log(this.signature); + } + +} + +/* --------------------------------------- + *** RLP Encoding *** + --------------------------------------- +*/ + function encode(typename, data, privateKey) { + const type = types.byName[typename]; + if (type === undefined) throw new Error(`Invalid typename: ${typename}`); + const encodedMsg = messages[typename].encode(data); + const typedata = Buffer.concat([Buffer.from([type]), rlp.encode(encodedMsg)]); + const sighash = keccak256(typedata); + const sig = secp256k1.sign(sighash, privateKey); + const hashdata = Buffer.concat([ + sig.signature, + Buffer.from([sig.recovery]), + typedata + ]); + const hash = keccak256(hashdata); + return Buffer.concat([hash, hashdata]); + } + +/* --------------------------------------- + *** Pre-defined Key/Value Pairs *** + --------------------------------------- + Key { + id = vlaue, + secp256k1 = vlaue, + ip = vlaue, + tcp = vlaue, + udp = vlaue + } +*/ + +let defaultKeyValuePairs = new Map( + [["id", "v4"], // name of identity scheme + ["secp256k1", ""], // compressed pub key 33 bytes + ["ip", ""], // ip address 4 or 6 bytes + ["tcp", ""], // tcp port number + ["udp", ""]]); // udp port number + +/* --------------------------------------- + *** "v4" Inentity Scheme *** + --------------------------------------- + IdentityScheme{ + sign() -> createRecordSignature(record contents), + verify() -> validateRecordSignature(), + derive() -> deriveNodeAddress() + } +*/ +class DefaultIdentityScheme { + let defaultSchemeList = "v4"; + + // signs a records content + function sign(content) { + const sighash = keccak256(content); + } + + // verifies a node record + function verify(signature, publicKey) { + let isValid = new Boolean(signature == defaultKeyValuePairs.get("secp256k1")); + return isValid; + } + + // derives the node address + function derive(publicKey) { + const nodeAddress = keccak256(publicKey); + return nodeAddress; + } +} + +module.exports = { EthereumNodeRecord, DefaultIdentityScheme }; diff --git a/src/discv5/server.js b/src/discv5/server.js new file mode 100644 index 0000000..770769a --- /dev/null +++ b/src/discv5/server.js @@ -0,0 +1,375 @@ +const { EventEmitter } = require("events"); +const dgram = require("dgram"); +const ms = require("ms"); +const createDebugLogger = require("debug"); +const LRUCache = require("lru-cache"); +const message = require("./message"); +const { keccak256, pk2id, createDeferred, v4, v5 } = require("../util"); +const chalk = require("chalk"); +const debug = createDebugLogger("devp2p:dpt:server"); + +const createSocketUDP4 = dgram.createSocket.bind(null, "udp4"); + +class Server extends EventEmitter { + constructor(dpt, privateKey, options) { + super(); + this._dpt = dpt; + this._privateKey = privateKey; + + if (options.version === "5") { + this._version = v5; + } else { + this._version = v4; + } + + console.log( + chalk.green( + `Starting node discovery protocol with version: ${this._version}` + ) + ); + + this._timeout = options.timeout || ms("10s"); + + this._endpoint = options.endpoint || { + address: "0.0.0.0", + udpPort: null, + tcpPort: null + }; + this._requests = new Map(); + this._parityRequestMap = new Map(); + this._requestsCache = new LRUCache({ + max: 1000, + maxAge: ms("1s"), + stale: false + }); + + const createSocket = options.createSocket || createSocketUDP4; + this._socket = createSocket(); + this._socket.once("listening", () => this.emit("listening")); + this._socket.once("close", () => this.emit("close")); + this._socket.on("error", err => this.emit("error", err)); + + // processes incoming messages + this._socket.on("message", (msg, rinfo) => { + try { + console.log(chalk.green(`+++++ +++++ message received --> server.js`)); + // console.log( + // chalk.green(`msg = ${JSON.stringify(message.decode(msg))}`) + // ); + console.log(chalk.green(`rinfo = ${JSON.stringify(rinfo)}`)); + + this._handler(msg, rinfo); + } catch (err) { + this.emit("error", err); + } + }); + } + + bind(...args) { + this._isAliveCheck(); + debug("call .bind"); + + this._socket.bind(...args); + } + + destroy(...args) { + this._isAliveCheck(); + debug("call .destroy"); + + this._socket.close(...args); + this._socket = null; + } + + async hey(peer) { + this._isAliveCheck(); + + const rckey = `${peer.address}:${peer.udpPort}`; + const promise = this._requestsCache.get(rckey); + if (promise !== undefined) return promise; + + const hash = this._send(peer, "hey", { + version: this._version, + from: this._endpoint, + to: peer + }); + + const deferred = createDeferred(); + const rkey = hash.toString("hex"); + + this._requests.set(rkey, { + peer, + deferred, + timeoutId: setTimeout(() => { + if (this._requests.get(rkey) !== undefined) { + debug( + `ping timeout: ${peer.address}:${peer.udpPort} ${peer.id && + peer.id.toString("hex")}` + ); + this._requests.delete(rkey); + deferred.reject( + new Error(`Timeout error: ping ${peer.address}:${peer.udpPort}`) + ); + } else { + return deferred.promise; + } + }, this._timeout) + }); + + this._requestsCache.set(rckey, deferred.promise); + return deferred.promise; + } + + // hey(peer, id) { + // this._isAliveCheck(); + // this._send(peer, "hey", { id }); + // } + + // requestTicket(peer, id) { + // this._isAliveCheck(); + // this._send(peer, "requestTicket", { id }); + // } + + neighbors(peer, id) { + this._isAliveCheck(); + this._send(peer, "neighbors", { id }); + } + + _isAliveCheck() { + if (this._socket === null) throw new Error("Server already destroyed"); + } + + _send(peer, typename, data) { + // debug( + // `send ${typename} to ${peer.address}:${peer.udpPort} (peerId: ${peer.id && + // peer.id.toString("hex")})` + // ); + + const msg = message.encode(typename, data, this._privateKey); + // Parity hack + // There is a bug in Parity up to at lease 1.8.10 not echoing the hash from + // discovery spec (hash: sha3(signature || packet-type || packet-data)) + // but just hashing the RLP-encoded packet data (see discovery.rs, on_ping()) + // 2018-02-28 + if (typename === "hey") { + const rkeyParity = keccak256(msg.slice(98)).toString("hex"); + this._parityRequestMap.set(rkeyParity, msg.slice(0, 32).toString("hex")); + setTimeout(() => { + if (this._parityRequestMap.get(rkeyParity) !== undefined) { + this._parityRequestMap.delete(rkeyParity); + } + }, this._timeout); + } + + this._socket.send(msg, 0, msg.length, peer.udpPort, peer.address); + return msg.slice(0, 32); // message id + } + + // processes each incoming message by it's message type, msg in binary data + _handler(msg, rinfo) { + const info = message.decode(msg); + + console.log("******* info.typename == " + info.typename); + console.log(chalk.green(`+++++ +++++`)); + + const peerId = pk2id(info.publicKey); + debug( + `received ${info.typename} from ${rinfo.address}:${ + rinfo.port + } (peerId: ${peerId.toString("hex")})` + ); + + // add peer if not in our table + const peer = this._dpt.getPeer(peerId); + if ( + peer === null && + info.typename === "hey" && + info.data.from.udpPort !== null + ) { + setTimeout(() => this.emit("peers", [info.data.from]), ms("100ms")); + } + + switch (info.typename) { + // case "ping": + // Object.assign(rinfo, { id: peerId, udpPort: rinfo.port }); + // this._send(rinfo, "pong", { + // to: { + // address: rinfo.address, + // udpPort: rinfo.port, + // tcpPort: info.data.from.tcpPort + // }, + // hash: msg.slice(0, 32) + // }); + // break; + + // case "pong": + // var rkey = info.data.hash.toString("hex"); + // const rkeyParity = this._parityRequestMap.get(rkey); + // if (rkeyParity) { + // rkey = rkeyParity; + // this._parityRequestMap.delete(rkeyParity); + // } + // const request = this._requests.get(rkey); + // if (request) { + // this._requests.delete(rkey); + // request.deferred.resolve({ + // id: peerId, + // address: request.peer.address, + // udpPort: request.peer.udpPort, + // tcpPort: request.peer.tcpPort + // }); + // } + // break; + + case "hey": + Object.assign(rinfo, { id: peerId, udpPort: rinfo.port }); + this._send(rinfo, "hey", { + to: { + address: rinfo.address, + udpPort: rinfo.port, + tcpPort: info.data.from.tcpPort + }, + hash: msg.slice(0, 32) + }); + break; + + /* + findNode packet (0x03) + requests a neighbors packet containing the closest know nodes to the target hash. + */ + case "findNode": + //ping example, denotes response message type + // Object.assign(rinfo, { id: peerId, udpPort: rinfo.port }); + // this._send(rinfo, "neighbors", { + // to: { + // address: rinfo.address, + // udpPort: rinfo.port, + // tcpPort: info.data.from.tcpPort + // }, + // hash: msg.slice(0, 32) + // }); + // break; + + console.log(chalk.blue("-------- server.findNode()")); + var rkey = info.data.hash.toString("hex"); + + const rkeyParity = this._parityRequestMap.get(rkey); + console.log(chalk.blue("-------- rkeyParity = " + rkeyParity)); + + if (rkeyParity) { + rkey = rkeyParity; + this._parityRequestMap.delete(rkeyParity); + } + const request = this._requests.get(rkey); + console.log( + chalk.blue("-------- request = " + JSON.stringify(request)) + ); + + if (request) { + this._requests.delete(rkey); + request.deferred.resolve({ + id: peerId, + address: request.peer.address, + udpPort: request.peer.udpPort, + tcpPort: request.peer.tcpPort + }); + } + break; + + /* + findNode packet (0x03) + requests a neighbors packet containing the closest know nodes to the target hash. + */ + // const findNode = { + // // console.log(chalk.yellow(`+++++ +++++`)); + // + // // console.log(chalk.yellow("******* findNode == " + info.typename)); + // // console.log(chalk.yellow(`+++++ +++++`)); + // encode: function(obj) { + // console.log(chalk.blue("******* findNode.encode == ")); + // console.log(chalk.blue(`+++++ +++++`)); + // return [endpoint.encode(obj.to), obj.hash, timestamp.encode(obj.timestamp)]; + // }, + // decode: function(payload) { + // return { + // to: endpoint.decode(payload[0]), + // hash: payload[1], + // timestamp: timestamp.decode(payload[2]) + // }; + // } + // }; + + case "neighbors": + Object.assign(rinfo, { id: peerId, udpPort: rinfo.port }); + this._send(rinfo, "neighbors", { + peers: this._dpt.getClosestPeers(info.data.id) + }); + break; + + // case "neighbors": + // this.emit("peers", info.data.peers.map(peer => peer.endpoint)); + // break; + + case "requestTicket": + Object.assign(rinfo, { id: peerId, udpPort: rinfo.port }); + this._send(rinfo, "pong", { + to: { + address: rinfo.address, + udpPort: rinfo.port, + tcpPort: info.data.from.tcpPort + }, + hash: msg.slice(0, 32) + }); + break; + + case "ticket": + Object.assign(rinfo, { id: peerId, udpPort: rinfo.port }); + this._send(rinfo, "pong", { + to: { + address: rinfo.address, + udpPort: rinfo.port, + tcpPort: info.data.from.tcpPort + }, + hash: msg.slice(0, 32) + }); + break; + + case "topicRegister": + Object.assign(rinfo, { id: peerId, udpPort: rinfo.port }); + this._send(rinfo, "pong", { + to: { + address: rinfo.address, + udpPort: rinfo.port, + tcpPort: info.data.from.tcpPort + }, + hash: msg.slice(0, 32) + }); + break; + + case "topicQuery": + Object.assign(rinfo, { id: peerId, udpPort: rinfo.port }); + this._send(rinfo, "pong", { + to: { + address: rinfo.address, + udpPort: rinfo.port, + tcpPort: info.data.from.tcpPort + }, + hash: msg.slice(0, 32) + }); + break; + + case "topicNodes": + Object.assign(rinfo, { id: peerId, udpPort: rinfo.port }); + this._send(rinfo, "pong", { + to: { + address: rinfo.address, + udpPort: rinfo.port, + tcpPort: info.data.from.tcpPort + }, + hash: msg.slice(0, 32) + }); + break; + } + } +} + +module.exports = Server; diff --git a/src/discv5/topic.js b/src/discv5/topic.js new file mode 100644 index 0000000..5312482 --- /dev/null +++ b/src/discv5/topic.js @@ -0,0 +1,5 @@ +/* + This is a placeholder. + Geth v5 protocol is a placeholder for now. + https://github.com/ethereumjs/ethereumjs-devp2p/pull/42#issuecomment-418909954 +*/ diff --git a/src/dpt/index.js b/src/dpt/index.js index 7c0fd58..d80a7b1 100644 --- a/src/dpt/index.js +++ b/src/dpt/index.js @@ -28,6 +28,7 @@ class DPT extends EventEmitter { this._server = new DPTServer(this, this._privateKey, { createSocket: options.createSocket, timeout: options.timeout, + version: options.version, endpoint: options.endpoint }) this._server.once('listening', () => this.emit('listening')) @@ -95,7 +96,7 @@ class DPT extends EventEmitter { this._kbucket.add(peer) return peer } catch (err) { - this._banlist.add(obj, ms('5m')) + this._banlist.add(obj, ms('10m')) throw err } } diff --git a/src/dpt/message.js b/src/dpt/message.js index a207afb..42883a7 100644 --- a/src/dpt/message.js +++ b/src/dpt/message.js @@ -1,142 +1,147 @@ -const ip = require('ip') -const rlp = require('rlp-encoding') -const secp256k1 = require('secp256k1') -const Buffer = require('safe-buffer').Buffer -const { keccak256, int2buffer, buffer2int, assertEq } = require('../util') - -function getTimestamp () { - return (Date.now() / 1000) | 0 +/* + This file implements Node Discovery Protocol Version 5 as defined here: + https://github.com/fjl/p2p-drafts/blob/master/discv5-packets.md +*/ +const ip = require("ip"); +const rlp = require("rlp-encoding"); +const secp256k1 = require("secp256k1"); +const Buffer = require("safe-buffer").Buffer; +const { keccak256, int2buffer, buffer2int, assertEq } = require("../util"); + +function getTimestamp() { + return (Date.now() / 1000) | 0; } const timestamp = { - encode: function (value = getTimestamp() + 60) { - const buffer = Buffer.allocUnsafe(4) - buffer.writeUInt32BE(value) - return buffer + encode: function(value = getTimestamp() + 60) { + const buffer = Buffer.allocUnsafe(4); + buffer.writeUInt32BE(value); + return buffer; }, - decode: function (buffer) { - if (buffer.length !== 4) throw new RangeError(`Invalid timestamp buffer :${buffer.toString('hex')}`) - return buffer.readUInt32BE(0) + decode: function(buffer) { + if (buffer.length !== 4) + throw new RangeError( + `Invalid timestamp buffer :${buffer.toString("hex")}` + ); + return buffer.readUInt32BE(0); } -} +}; const address = { - encode: function (value) { - if (ip.isV4Format(value)) return ip.toBuffer(value) - if (ip.isV6Format(value)) return ip.toBuffer(value) - throw new Error(`Invalid address: ${value}`) + encode: function(value) { + if (ip.isV4Format(value)) return ip.toBuffer(value); + if (ip.isV6Format(value)) return ip.toBuffer(value); + throw new Error(`Invalid address: ${value}`); }, - decode: function (buffer) { - if (buffer.length === 4) return ip.toString(buffer) - if (buffer.length === 16) return ip.toString(buffer) + decode: function(buffer) { + if (buffer.length === 4) return ip.toString(buffer); + if (buffer.length === 16) return ip.toString(buffer); - const str = buffer.toString() - if (ip.isV4Format(str) || ip.isV6Format(str)) return str + const str = buffer.toString(); + if (ip.isV4Format(str) || ip.isV6Format(str)) return str; // also can be host, but skip it right now (because need async function for resolve) - throw new Error(`Invalid address buffer: ${buffer.toString('hex')}`) + throw new Error(`Invalid address buffer: ${buffer.toString("hex")}`); } -} +}; const port = { - encode: function (value) { - if (value === null) return Buffer.allocUnsafe(0) - if ((value >>> 16) > 0) throw new RangeError(`Invalid port: ${value}`) - return Buffer.from([ (value >>> 8) & 0xff, (value >>> 0) & 0xff ]) + encode: function(value) { + if (value === null) return Buffer.allocUnsafe(0); + if (value >>> 16 > 0) throw new RangeError(`Invalid port: ${value}`); + return Buffer.from([(value >>> 8) & 0xff, (value >>> 0) & 0xff]); }, - decode: function (buffer) { - if (buffer.length === 0) return null + decode: function(buffer) { + if (buffer.length === 0) return null; // if (buffer.length !== 2) throw new RangeError(`Invalid port buffer: ${buffer.toString('hex')}`) - return buffer2int(buffer) + return buffer2int(buffer); } -} +}; const endpoint = { - encode: function (obj) { + encode: function(obj) { return [ address.encode(obj.address), port.encode(obj.udpPort), port.encode(obj.tcpPort) - ] + ]; }, - decode: function (payload) { + decode: function(payload) { return { address: address.decode(payload[0]), udpPort: port.decode(payload[1]), tcpPort: port.decode(payload[2]) - } + }; } -} +}; const ping = { - encode: function (obj) { + encode: function(obj) { return [ int2buffer(obj.version), endpoint.encode(obj.from), endpoint.encode(obj.to), timestamp.encode(obj.timestamp) - ] + ]; + + // message = _pack(CMD_PING.id, payload, self.privkey) + // self.send(node, message) + // # Return the msg hash, which is used as a token to identify pongs. + // return message[:MAC_SIZE] }, - decode: function (payload) { + decode: function(payload) { return { version: buffer2int(payload[0]), from: endpoint.decode(payload[1]), to: endpoint.decode(payload[2]), timestamp: timestamp.decode(payload[3]) - } + }; } -} +}; const pong = { - encode: function (obj) { - return [ - endpoint.encode(obj.to), - obj.hash, - timestamp.encode(obj.timestamp) - ] + encode: function(obj) { + return [endpoint.encode(obj.to), obj.hash, timestamp.encode(obj.timestamp)]; }, - decode: function (payload) { + decode: function(payload) { return { to: endpoint.decode(payload[0]), hash: payload[1], timestamp: timestamp.decode(payload[2]) - } + }; } -} +}; const findneighbours = { - encode: function (obj) { - return [ - obj.id, - timestamp.encode(obj.timestamp) - ] + encode: function(obj) { + return [obj.id, timestamp.encode(obj.timestamp)]; }, - decode: function (payload) { + decode: function(payload) { return { id: payload[0], timestamp: timestamp.decode(payload[1]) - } + }; } -} +}; const neighbours = { - encode: function (obj) { + encode: function(obj) { return [ - obj.peers.map((peer) => endpoint.encode(peer).concat(peer.id)), + obj.peers.map(peer => endpoint.encode(peer).concat(peer.id)), timestamp.encode(obj.timestamp) - ] + ]; }, - decode: function (payload) { + decode: function(payload) { return { - peers: payload[0].map((data) => { - return { endpoint: endpoint.decode(data), id: data[3] } // hack for id + peers: payload[0].map(data => { + return { endpoint: endpoint.decode(data), id: data[3] }; // hack for id }), timestamp: timestamp.decode(payload[1]) - } + }; } -} +}; -const messages = { ping, pong, findneighbours, neighbours } +const messages = { ping, pong, findneighbours, neighbours }; const types = { byName: { @@ -146,12 +151,12 @@ const types = { neighbours: 0x04 }, byType: { - 0x01: 'ping', - 0x02: 'pong', - 0x03: 'findneighbours', - 0x04: 'neighbours' + 0x01: "ping", + 0x02: "pong", + 0x03: "findneighbours", + 0x04: "neighbours" } -} +}; // [0, 32) data hash // [32, 96) signature @@ -159,35 +164,39 @@ const types = { // 97 type // [98, length) data -function encode (typename, data, privateKey) { - const type = types.byName[typename] - if (type === undefined) throw new Error(`Invalid typename: ${typename}`) - const encodedMsg = messages[typename].encode(data) - const typedata = Buffer.concat([ Buffer.from([ type ]), rlp.encode(encodedMsg) ]) - - const sighash = keccak256(typedata) - const sig = secp256k1.sign(sighash, privateKey) - const hashdata = Buffer.concat([ sig.signature, Buffer.from([ sig.recovery ]), typedata ]) - const hash = keccak256(hashdata) - return Buffer.concat([ hash, hashdata ]) +function encode(typename, data, privateKey) { + const type = types.byName[typename]; + if (type === undefined) throw new Error(`Invalid typename: ${typename}`); + const encodedMsg = messages[typename].encode(data); + const typedata = Buffer.concat([Buffer.from([type]), rlp.encode(encodedMsg)]); + + const sighash = keccak256(typedata); + const sig = secp256k1.sign(sighash, privateKey); + const hashdata = Buffer.concat([ + sig.signature, + Buffer.from([sig.recovery]), + typedata + ]); + const hash = keccak256(hashdata); + return Buffer.concat([hash, hashdata]); } -function decode (buffer) { - const hash = keccak256(buffer.slice(32)) - assertEq(buffer.slice(0, 32), hash, 'Hash verification failed') +function decode(buffer) { + const hash = keccak256(buffer.slice(32)); + assertEq(buffer.slice(0, 32), hash, "Hash verification failed"); - const typedata = buffer.slice(97) - const type = typedata[0] - const typename = types.byType[type] - if (typename === undefined) throw new Error(`Invalid type: ${type}`) - const data = messages[typename].decode(rlp.decode(typedata.slice(1))) + const typedata = buffer.slice(97); + const type = typedata[0]; + const typename = types.byType[type]; + if (typename === undefined) throw new Error(`Invalid type: ${type}`); + const data = messages[typename].decode(rlp.decode(typedata.slice(1))); - const sighash = keccak256(typedata) - const signature = buffer.slice(32, 96) - const recoverId = buffer[96] - const publicKey = secp256k1.recover(sighash, signature, recoverId, false) + const sighash = keccak256(typedata); + const signature = buffer.slice(32, 96); + const recoverId = buffer[96]; + const publicKey = secp256k1.recover(sighash, signature, recoverId, false); - return { typename, data, publicKey } + return { typename, data, publicKey }; } -module.exports = { encode, decode } +module.exports = { encode, decode }; diff --git a/src/dpt/server.js b/src/dpt/server.js index b848a5a..275c5da 100644 --- a/src/dpt/server.js +++ b/src/dpt/server.js @@ -1,175 +1,214 @@ -const { EventEmitter } = require('events') -const dgram = require('dgram') -const ms = require('ms') -const createDebugLogger = require('debug') -const LRUCache = require('lru-cache') -const message = require('./message') -const { keccak256, pk2id, createDeferred } = require('../util') - -const debug = createDebugLogger('devp2p:dpt:server') -const VERSION = 0x04 -const createSocketUDP4 = dgram.createSocket.bind(null, 'udp4') +const { EventEmitter } = require("events"); +const dgram = require("dgram"); +const ms = require("ms"); +const createDebugLogger = require("debug"); +const LRUCache = require("lru-cache"); +const message = require("./message"); +const { keccak256, pk2id, createDeferred, v4, v5 } = require("../util"); +const chalk = require("chalk"); +const debug = createDebugLogger("devp2p:dpt:server"); + +const createSocketUDP4 = dgram.createSocket.bind(null, "udp4"); class Server extends EventEmitter { - constructor (dpt, privateKey, options) { - super() - - this._dpt = dpt - this._privateKey = privateKey - - this._timeout = options.timeout || ms('10s') - this._endpoint = options.endpoint || { address: '0.0.0.0', udpPort: null, tcpPort: null } - this._requests = new Map() - this._parityRequestMap = new Map() - this._requestsCache = new LRUCache({ max: 1000, maxAge: ms('1s'), stale: false }) - - const createSocket = options.createSocket || createSocketUDP4 - this._socket = createSocket() - this._socket.once('listening', () => this.emit('listening')) - this._socket.once('close', () => this.emit('close')) - this._socket.on('error', (err) => this.emit('error', err)) - this._socket.on('message', (msg, rinfo) => { + constructor(dpt, privateKey, options) { + super(); + + this._dpt = dpt; + this._privateKey = privateKey; + + if (options.version === "5") { + this._version = v5; + } else { + this._version = v4; + } + + console.log( + chalk.green( + `Starting node discovery protocol with version: ${this._version}` + ) + ); + + this._timeout = options.timeout || ms("10s"); + this._endpoint = options.endpoint || { + address: "0.0.0.0", + udpPort: null, + tcpPort: null + }; + this._requests = new Map(); + this._parityRequestMap = new Map(); + this._requestsCache = new LRUCache({ + max: 1000, + maxAge: ms("1s"), + stale: false + }); + + const createSocket = options.createSocket || createSocketUDP4; + this._socket = createSocket(); + this._socket.once("listening", () => this.emit("listening")); + this._socket.once("close", () => this.emit("close")); + this._socket.on("error", err => this.emit("error", err)); + + this._socket.on("message", (msg, rinfo) => { try { - this._handler(msg, rinfo) + this._handler(msg, rinfo); } catch (err) { - this.emit('error', err) + this.emit("error", err); } - }) + }); } - bind (...args) { - this._isAliveCheck() - debug('call .bind') + bind(...args) { + this._isAliveCheck(); + debug("call .bind"); - this._socket.bind(...args) + this._socket.bind(...args); } - destroy (...args) { - this._isAliveCheck() - debug('call .destroy') + destroy(...args) { + this._isAliveCheck(); + debug("call .destroy"); - this._socket.close(...args) - this._socket = null + this._socket.close(...args); + this._socket = null; } - async ping (peer) { - this._isAliveCheck() + async ping(peer) { + this._isAliveCheck(); - const rckey = `${peer.address}:${peer.udpPort}` - const promise = this._requestsCache.get(rckey) - if (promise !== undefined) return promise + const rckey = `${peer.address}:${peer.udpPort}`; + const promise = this._requestsCache.get(rckey); + if (promise !== undefined) return promise; - const hash = this._send(peer, 'ping', { - version: VERSION, + const hash = this._send(peer, "ping", { + version: this._version, from: this._endpoint, to: peer - }) + }); + + const deferred = createDeferred(); + const rkey = hash.toString("hex"); - const deferred = createDeferred() - const rkey = hash.toString('hex') this._requests.set(rkey, { peer, deferred, timeoutId: setTimeout(() => { if (this._requests.get(rkey) !== undefined) { - debug(`ping timeout: ${peer.address}:${peer.udpPort} ${peer.id && peer.id.toString('hex')}`) - this._requests.delete(rkey) - deferred.reject(new Error(`Timeout error: ping ${peer.address}:${peer.udpPort}`)) + debug( + `ping timeout: ${peer.address}:${peer.udpPort} ${peer.id && + peer.id.toString("hex")}` + ); + this._requests.delete(rkey); + deferred.reject( + new Error(`Timeout error: ping ${peer.address}:${peer.udpPort}`) + ); } else { - return deferred.promise + return deferred.promise; } }, this._timeout) - }) - this._requestsCache.set(rckey, deferred.promise) - return deferred.promise + }); + + this._requestsCache.set(rckey, deferred.promise); + return deferred.promise; } - findneighbours (peer, id) { - this._isAliveCheck() - this._send(peer, 'findneighbours', { id }) + findneighbours(peer, id) { + this._isAliveCheck(); + this._send(peer, "findneighbours", { id }); } - _isAliveCheck () { - if (this._socket === null) throw new Error('Server already destroyed') + _isAliveCheck() { + if (this._socket === null) throw new Error("Server already destroyed"); } - _send (peer, typename, data) { - debug(`send ${typename} to ${peer.address}:${peer.udpPort} (peerId: ${peer.id && peer.id.toString('hex')})`) + _send(peer, typename, data) { + debug( + `send ${typename} to ${peer.address}:${peer.udpPort} (peerId: ${peer.id && + peer.id.toString("hex")})` + ); - const msg = message.encode(typename, data, this._privateKey) + const msg = message.encode(typename, data, this._privateKey); // Parity hack // There is a bug in Parity up to at lease 1.8.10 not echoing the hash from // discovery spec (hash: sha3(signature || packet-type || packet-data)) // but just hashing the RLP-encoded packet data (see discovery.rs, on_ping()) // 2018-02-28 - if (typename === 'ping') { - const rkeyParity = keccak256(msg.slice(98)).toString('hex') - this._parityRequestMap.set(rkeyParity, msg.slice(0, 32).toString('hex')) + if (typename === "ping") { + const rkeyParity = keccak256(msg.slice(98)).toString("hex"); + this._parityRequestMap.set(rkeyParity, msg.slice(0, 32).toString("hex")); setTimeout(() => { if (this._parityRequestMap.get(rkeyParity) !== undefined) { - this._parityRequestMap.delete(rkeyParity) + this._parityRequestMap.delete(rkeyParity); } - }, this._timeout) + }, this._timeout); } - this._socket.send(msg, 0, msg.length, peer.udpPort, peer.address) - return msg.slice(0, 32) // message id + this._socket.send(msg, 0, msg.length, peer.udpPort, peer.address); + return msg.slice(0, 32); // message id } - _handler (msg, rinfo) { - const info = message.decode(msg) - const peerId = pk2id(info.publicKey) - debug(`received ${info.typename} from ${rinfo.address}:${rinfo.port} (peerId: ${peerId.toString('hex')})`) + _handler(msg, rinfo) { + const info = message.decode(msg); + const peerId = pk2id(info.publicKey); + debug( + `received ${info.typename} from ${rinfo.address}:${ + rinfo.port + } (peerId: ${peerId.toString("hex")})` + ); // add peer if not in our table - const peer = this._dpt.getPeer(peerId) - if (peer === null && info.typename === 'ping' && info.data.from.udpPort !== null) { - setTimeout(() => this.emit('peers', [ info.data.from ]), ms('100ms')) + const peer = this._dpt.getPeer(peerId); + if ( + peer === null && + info.typename === "ping" && + info.data.from.udpPort !== null + ) { + setTimeout(() => this.emit("peers", [info.data.from]), ms("100ms")); } switch (info.typename) { - case 'ping': - Object.assign(rinfo, { id: peerId, udpPort: rinfo.port }) - this._send(rinfo, 'pong', { + case "ping": + Object.assign(rinfo, { id: peerId, udpPort: rinfo.port }); + this._send(rinfo, "pong", { to: { address: rinfo.address, udpPort: rinfo.port, tcpPort: info.data.from.tcpPort }, hash: msg.slice(0, 32) - }) - break + }); + break; - case 'pong': - var rkey = info.data.hash.toString('hex') - const rkeyParity = this._parityRequestMap.get(rkey) + case "pong": + var rkey = info.data.hash.toString("hex"); + const rkeyParity = this._parityRequestMap.get(rkey); if (rkeyParity) { - rkey = rkeyParity - this._parityRequestMap.delete(rkeyParity) + rkey = rkeyParity; + this._parityRequestMap.delete(rkeyParity); } - const request = this._requests.get(rkey) + const request = this._requests.get(rkey); if (request) { - this._requests.delete(rkey) + this._requests.delete(rkey); request.deferred.resolve({ id: peerId, address: request.peer.address, udpPort: request.peer.udpPort, tcpPort: request.peer.tcpPort - }) + }); } - break + break; - case 'findneighbours': - Object.assign(rinfo, { id: peerId, udpPort: rinfo.port }) - this._send(rinfo, 'neighbours', { + case "findneighbours": + Object.assign(rinfo, { id: peerId, udpPort: rinfo.port }); + this._send(rinfo, "neighbours", { peers: this._dpt.getClosestPeers(info.data.id) - }) - break + }); + break; - case 'neighbours': - this.emit('peers', info.data.peers.map((peer) => peer.endpoint)) - break + case "neighbours": + this.emit("peers", info.data.peers.map(peer => peer.endpoint)); + break; } } } -module.exports = Server +module.exports = Server; diff --git a/src/eth/index.js b/src/eth/index.js index 6c8d663..5d83ac3 100644 --- a/src/eth/index.js +++ b/src/eth/index.js @@ -1,12 +1,12 @@ -const { EventEmitter } = require('events') -const rlp = require('rlp-encoding') -const ms = require('ms') -const Buffer = require('safe-buffer').Buffer -const { int2buffer, buffer2int, assertEq } = require('../util') -const Peer = require('../rlpx/peer') +const { EventEmitter } = require("events"); +const rlp = require("rlp-encoding"); +const ms = require("ms"); +const Buffer = require("safe-buffer").Buffer; +const { int2buffer, buffer2int, assertEq } = require("../util"); +const Peer = require("../rlpx/peer"); -const createDebugLogger = require('debug') -const debug = createDebugLogger('devp2p:eth') +const createDebugLogger = require("debug"); +const debug = createDebugLogger("devp2p:eth"); const MESSAGE_CODES = { // eth62 @@ -24,40 +24,50 @@ const MESSAGE_CODES = { NODE_DATA: 0x0e, GET_RECEIPTS: 0x0f, RECEIPTS: 0x10 -} +}; class ETH extends EventEmitter { - constructor (version, peer, send) { - super() + constructor(version, peer, send) { + super(); - this._version = version - this._peer = peer - this._send = send + this._version = version; + this._peer = peer; + this._send = send; - this._status = null - this._peerStatus = null + this._status = null; + this._peerStatus = null; this._statusTimeoutId = setTimeout(() => { - this._peer.disconnect(Peer.DISCONNECT_REASONS.TIMEOUT) - }, ms('5s')) + this._peer.disconnect(Peer.DISCONNECT_REASONS.TIMEOUT); + }, ms("5s")); } - static eth62 = { name: 'eth', version: 62, length: 8, constructor: ETH } - static eth63 = { name: 'eth', version: 63, length: 17, constructor: ETH } + static eth62 = { name: "eth", version: 62, length: 8, constructor: ETH }; + static eth63 = { name: "eth", version: 63, length: 17, constructor: ETH }; - static MESSAGE_CODES = MESSAGE_CODES + static MESSAGE_CODES = MESSAGE_CODES; - _handleMessage (code, data) { - const payload = rlp.decode(data) + _handleMessage(code, data) { + const payload = rlp.decode(data); if (code !== MESSAGE_CODES.STATUS) { - debug(`Received ${this.getMsgPrefix(code)} message from ${this._peer._socket.remoteAddress}:${this._peer._socket.remotePort}: ${data.toString('hex')}`) + debug( + `Received ${this.getMsgPrefix(code)} message from ${ + this._peer._socket.remoteAddress + }:${this._peer._socket.remotePort}: ${data.toString("hex")}` + ); } switch (code) { case MESSAGE_CODES.STATUS: - assertEq(this._peerStatus, null, 'Uncontrolled status message') - this._peerStatus = payload - debug(`Received ${this.getMsgPrefix(code)} message from ${this._peer._socket.remoteAddress}:${this._peer._socket.remotePort}: : ${this._getStatusString(this._peerStatus)}`) - this._handleStatus() - break + assertEq(this._peerStatus, null, "Uncontrolled status message"); + this._peerStatus = payload; + debug( + `Received ${this.getMsgPrefix(code)} message from ${ + this._peer._socket.remoteAddress + }:${this._peer._socket.remotePort}: : ${this._getStatusString( + this._peerStatus + )}` + ); + this._handleStatus(); + break; case MESSAGE_CODES.NEW_BLOCK_HASHES: case MESSAGE_CODES.TX: @@ -66,69 +76,83 @@ class ETH extends EventEmitter { case MESSAGE_CODES.GET_BLOCK_BODIES: case MESSAGE_CODES.BLOCK_BODIES: case MESSAGE_CODES.NEW_BLOCK: - if (this._version >= ETH.eth62.version) break - return + if (this._version >= ETH.eth62.version) break; + return; case MESSAGE_CODES.GET_NODE_DATA: case MESSAGE_CODES.NODE_DATA: case MESSAGE_CODES.GET_RECEIPTS: case MESSAGE_CODES.RECEIPTS: - if (this._version >= ETH.eth63.version) break - return + if (this._version >= ETH.eth63.version) break; + return; default: - return + return; } - this.emit('message', code, payload) + this.emit("message", code, payload); } - _handleStatus () { - if (this._status === null || this._peerStatus === null) return - clearTimeout(this._statusTimeoutId) + _handleStatus() { + if (this._status === null || this._peerStatus === null) return; + clearTimeout(this._statusTimeoutId); - assertEq(this._status[0], this._peerStatus[0], 'Protocol version mismatch') - assertEq(this._status[1], this._peerStatus[1], 'NetworkId mismatch') - assertEq(this._status[4], this._peerStatus[4], 'Genesis block mismatch') + assertEq(this._status[0], this._peerStatus[0], "Protocol version mismatch"); + assertEq(this._status[1], this._peerStatus[1], "NetworkId mismatch"); + assertEq(this._status[4], this._peerStatus[4], "Genesis block mismatch"); - this.emit('status', { + this.emit("status", { networkId: this._peerStatus[1], td: Buffer.from(this._peerStatus[2]), bestHash: Buffer.from(this._peerStatus[3]), genesisHash: Buffer.from(this._peerStatus[4]) - }) + }); } - getVersion () { - return this._version + getVersion() { + return this._version; } - _getStatusString (status) { - var sStr = `[V:${buffer2int(status[0])}, NID:${buffer2int(status[1])}, TD:${buffer2int(status[2])}` - sStr += `, BestH:${status[3].toString('hex')}, GenH:${status[4].toString('hex')}]` - return sStr + _getStatusString(status) { + var sStr = `[V:${buffer2int(status[0])}, NID:${buffer2int( + status[1] + )}, TD:${buffer2int(status[2])}`; + sStr += `, BestH:${status[3].toString("hex")}, GenH:${status[4].toString( + "hex" + )}]`; + return sStr; } - sendStatus (status) { - if (this._status !== null) return + sendStatus(status) { + if (this._status !== null) return; this._status = [ int2buffer(this._version), int2buffer(status.networkId), status.td, status.bestHash, status.genesisHash - ] - - debug(`Send STATUS message to ${this._peer._socket.remoteAddress}:${this._peer._socket.remotePort} (eth${this._version}): ${this._getStatusString(this._status)}`) - this._send(MESSAGE_CODES.STATUS, rlp.encode(this._status)) - this._handleStatus() + ]; + + debug( + `Send STATUS message to ${this._peer._socket.remoteAddress}:${ + this._peer._socket.remotePort + } (eth${this._version}): ${this._getStatusString(this._status)}` + ); + this._send(MESSAGE_CODES.STATUS, rlp.encode(this._status)); + this._handleStatus(); } - sendMessage (code, payload) { - debug(`Send ${this.getMsgPrefix(code)} message to ${this._peer._socket.remoteAddress}:${this._peer._socket.remotePort}: ${rlp.encode(payload).toString('hex')}`) + sendMessage(code, payload) { + debug( + `Send ${this.getMsgPrefix(code)} message to ${ + this._peer._socket.remoteAddress + }:${this._peer._socket.remotePort}: ${rlp + .encode(payload) + .toString("hex")}` + ); switch (code) { case MESSAGE_CODES.STATUS: - throw new Error('Please send status message through .sendStatus') + throw new Error("Please send status message through .sendStatus"); case MESSAGE_CODES.NEW_BLOCK_HASHES: case MESSAGE_CODES.TX: @@ -137,26 +161,32 @@ class ETH extends EventEmitter { case MESSAGE_CODES.GET_BLOCK_BODIES: case MESSAGE_CODES.BLOCK_BODIES: case MESSAGE_CODES.NEW_BLOCK: - if (this._version >= ETH.eth62.version) break - throw new Error(`Code ${code} not allowed with version ${this._version}`) + if (this._version >= ETH.eth62.version) break; + throw new Error( + `Code ${code} not allowed with version ${this._version}` + ); case MESSAGE_CODES.GET_NODE_DATA: case MESSAGE_CODES.NODE_DATA: case MESSAGE_CODES.GET_RECEIPTS: case MESSAGE_CODES.RECEIPTS: - if (this._version >= ETH.eth63.version) break - throw new Error(`Code ${code} not allowed with version ${this._version}`) + if (this._version >= ETH.eth63.version) break; + throw new Error( + `Code ${code} not allowed with version ${this._version}` + ); default: - throw new Error(`Unknown code ${code}`) + throw new Error(`Unknown code ${code}`); } - this._send(code, rlp.encode(payload)) + this._send(code, rlp.encode(payload)); } - getMsgPrefix (msgCode) { - return Object.keys(MESSAGE_CODES).find(key => MESSAGE_CODES[key] === msgCode) + getMsgPrefix(msgCode) { + return Object.keys(MESSAGE_CODES).find( + key => MESSAGE_CODES[key] === msgCode + ); } } -module.exports = ETH +module.exports = ETH; diff --git a/src/index.js b/src/index.js index 7cccdff..29d13ac 100644 --- a/src/index.js +++ b/src/index.js @@ -1,6 +1,7 @@ -exports.DPT = require('./dpt') -exports.ETH = require('./eth') -exports.LES = require('./les') -exports.RLPx = require('./rlpx') +exports.DISCV5 = require("./discv5"); +exports.DPT = require("./dpt"); -exports._util = require('./util') +exports.ETH = require("./eth"); +exports.LES = require("./les"); +exports.RLPx = require("./rlpx"); +exports._util = require("./util"); diff --git a/src/rlpx/peer.js b/src/rlpx/peer.js index 0ab2a87..0d53f73 100644 --- a/src/rlpx/peer.js +++ b/src/rlpx/peer.js @@ -152,7 +152,9 @@ class Peer extends EventEmitter { case 'Body': const body = this._eciesSession.parseBody(data) - debug(`Received body ${this._socket.remoteAddress}:${this._socket.remotePort} ${body.toString('hex')}`) + + // TODO: uncomment if you want tx:hostname:port details (debug) + // console.log(`Received body ${this._socket.remoteAddress}:${this._socket.remotePort} ${body.toString('hex')}`) this._state = 'Header' this._nextPacketSize = 32 @@ -170,7 +172,7 @@ class Peer extends EventEmitter { const msgCode = code - obj.offset const prefix = this.getMsgPrefix(msgCode) - debug(`Received ${prefix} (message code: ${code} - ${obj.offset} = ${msgCode}) ${this._socket.remoteAddress}:${this._socket.remotePort}`) + console.log(`Received ${prefix} (message code: ${code} - ${obj.offset} = ${msgCode}) ${this._socket.remoteAddress}:${this._socket.remotePort}`) try { obj.protocol._handleMessage(msgCode, body.slice(1)) diff --git a/src/util.js b/src/util.js index 56b6661..218ed88 100644 --- a/src/util.js +++ b/src/util.js @@ -1,87 +1,95 @@ -const { randomBytes } = require('crypto') -const secp256k1 = require('secp256k1') -const Buffer = require('safe-buffer').Buffer -const createDebugLogger = require('debug') -const createKeccakHash = require('keccak') -const assert = require('assert') - -const debug = createDebugLogger('devp2p:util') - -function keccak256 (...buffers) { - const buffer = Buffer.concat(buffers) - return createKeccakHash('keccak256').update(buffer).digest() +const { randomBytes } = require("crypto"); +const secp256k1 = require("secp256k1"); +const Buffer = require("safe-buffer").Buffer; +const createDebugLogger = require("debug"); +const createKeccakHash = require("keccak"); +const assert = require("assert"); +const debug = createDebugLogger("devp2p:util"); + +// node discovery protocol versions +const v4 = "4"; +const v5 = "5"; + +// max packet size in bytes +const MAXPACKETSIZE = 1280; + +function keccak256(...buffers) { + const buffer = Buffer.concat(buffers); + return createKeccakHash("keccak256") + .update(buffer) + .digest(); } -function genPrivateKey () { +function genPrivateKey() { while (true) { - const privateKey = randomBytes(32) - if (secp256k1.privateKeyVerify(privateKey)) return privateKey + const privateKey = randomBytes(32); + if (secp256k1.privateKeyVerify(privateKey)) return privateKey; } } -function pk2id (pk) { - if (pk.length === 33) pk = secp256k1.publicKeyConvert(pk, false) - return pk.slice(1) +function pk2id(pk) { + if (pk.length === 33) pk = secp256k1.publicKeyConvert(pk, false); + return pk.slice(1); } -function id2pk (id) { - return Buffer.concat([ Buffer.from([ 0x04 ]), id ]) +function id2pk(id) { + return Buffer.concat([Buffer.from([0x04]), id]); } -function int2buffer (v) { - let hex = v.toString(16) - if (hex.length % 2 === 1) hex = '0' + hex - return Buffer.from(hex, 'hex') +function int2buffer(v) { + let hex = v.toString(16); + if (hex.length % 2 === 1) hex = "0" + hex; + return Buffer.from(hex, "hex"); } -function buffer2int (buffer) { - if (buffer.length === 0) return NaN +function buffer2int(buffer) { + if (buffer.length === 0) return NaN; - let n = 0 - for (let i = 0; i < buffer.length; ++i) n = n * 256 + buffer[i] - return n + let n = 0; + for (let i = 0; i < buffer.length; ++i) n = n * 256 + buffer[i]; + return n; } -function zfill (buffer, size, leftpad) { - if (buffer.length >= size) return buffer - if (leftpad === undefined) leftpad = true - const pad = Buffer.allocUnsafe(size - buffer.length).fill(0x00) - return leftpad ? Buffer.concat([ pad, buffer ]) : Buffer.concat([ buffer, pad ]) +function zfill(buffer, size, leftpad) { + if (buffer.length >= size) return buffer; + if (leftpad === undefined) leftpad = true; + const pad = Buffer.allocUnsafe(size - buffer.length).fill(0x00); + return leftpad ? Buffer.concat([pad, buffer]) : Buffer.concat([buffer, pad]); } -function xor (a, b) { - const length = Math.min(a.length, b.length) - const buffer = Buffer.allocUnsafe(length) - for (let i = 0; i < length; ++i) buffer[i] = a[i] ^ b[i] - return buffer +function xor(a, b) { + const length = Math.min(a.length, b.length); + const buffer = Buffer.allocUnsafe(length); + for (let i = 0; i < length; ++i) buffer[i] = a[i] ^ b[i]; + return buffer; } -function assertEq (expected, actual, msg) { - var message +function assertEq(expected, actual, msg) { + var message; if (Buffer.isBuffer(expected) && Buffer.isBuffer(actual)) { - if (expected.equals(actual)) return - message = `${msg}: ${expected.toString('hex')} / ${actual.toString('hex')}` - debug(message) + if (expected.equals(actual)) return; + message = `${msg}: ${expected.toString("hex")} / ${actual.toString("hex")}`; + debug(message); throw new assert.AssertionError({ message: message - }) + }); } - if (expected === actual) return - message = `${msg}: ${expected} / ${actual}` - debug(message) + if (expected === actual) return; + message = `${msg}: ${expected} / ${actual}`; + debug(message); throw new assert.AssertionError({ message: message - }) + }); } -function createDeferred () { - const deferred = {} +function createDeferred() { + const deferred = {}; deferred.promise = new Promise((resolve, reject) => { - deferred.resolve = resolve - deferred.reject = reject - }) - return deferred + deferred.resolve = resolve; + deferred.reject = reject; + }); + return deferred; } module.exports = { @@ -94,5 +102,14 @@ module.exports = { zfill, xor, assertEq, - createDeferred + createDeferred, + v4, + v5, + MAXPACKETSIZE +}; + +// used for v5 nonce packet. see https://github.com/fjl/p2p-drafts/blob/master/discv5-packets.md#packets +function generateNonce() { + const nonce = randomBytes(16); + return nonce; } diff --git a/test/discv5/dpt-message.js b/test/discv5/dpt-message.js new file mode 100644 index 0000000..e69de29 diff --git a/test/discv5/dpt-simulator.js b/test/discv5/dpt-simulator.js new file mode 100644 index 0000000..e69de29 diff --git a/test/discv5/eth-simulator.js b/test/discv5/eth-simulator.js new file mode 100644 index 0000000..e69de29 diff --git a/test/discv5/les-simulator.js b/test/discv5/les-simulator.js new file mode 100644 index 0000000..e69de29 diff --git a/test/integration/util.js b/test/integration/util.js index 9e6245f..29fcd1e 100644 --- a/test/integration/util.js +++ b/test/integration/util.js @@ -7,14 +7,16 @@ exports.getTestDPTs = function (numDPTs) { const dpts = [] for (let i = 0; i < numDPTs; ++i) { - const dpt = new devp2p.DPT(devp2p._util.genPrivateKey(), { - endpoint: { - address: localhost, - udpPort: basePort + i, - tcpPort: basePort + i - }, - timeout: 100 - }) + const dpt = new devp2p.DPT( + devp2p._util.genPrivateKey(), { + version: devp2p._util.v4, + endpoint: { + address: localhost, + udpPort: basePort + i, + tcpPort: basePort + i + }, + timeout: 100 + }) dpt.bind(basePort + i) dpts.push(dpt) } @@ -23,7 +25,34 @@ exports.getTestDPTs = function (numDPTs) { exports.initTwoPeerDPTSetup = function () { const dpts = exports.getTestDPTs(2) - const peer = { address: localhost, udpPort: basePort + 1 } + const peer = {address: localhost, udpPort: basePort + 1} + dpts[0].addPeer(peer) + return dpts +} + +exports.getTestDPTsV5 = function (numDPTs) { + const dpts = [] + + for (let i = 0; i < numDPTs; ++i) { + const dpt = new devp2p.DPT( + devp2p._util.genPrivateKey(), { + version: devp2p._util.v5, + endpoint: { + address: localhost, + udpPort: basePort + i, + tcpPort: basePort + i + }, + timeout: 100 + }) + dpt.bind(basePort + i) + dpts.push(dpt) + } + return dpts +} + +exports.initTwoPeerDPTSetupV5 = function () { + const dpts = exports.getTestDPTsV5(2) + const peer = {address: localhost, udpPort: basePort + 1} dpts[0].addPeer(peer) return dpts } @@ -57,7 +86,7 @@ exports.getTestRLPXs = function (numRLPXs, maxPeers, capabilities) { exports.initTwoPeerRLPXSetup = function (maxPeers, capabilities) { const rlpxs = exports.getTestRLPXs(2, maxPeers, capabilities) - const peer = { address: localhost, udpPort: basePort + 1, tcpPort: basePort + 1 } + const peer = {address: localhost, udpPort: basePort + 1, tcpPort: basePort + 1} rlpxs[0]._dpt.addPeer(peer) return rlpxs }