Skip to content

Commit c94b0dd

Browse files
authored
add findPeer & dht refactor (#793)
* add findPeer & dht refactor
1 parent e05ab46 commit c94b0dd

File tree

7 files changed

+116
-49
lines changed

7 files changed

+116
-49
lines changed

.github/workflows/ci.yml

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -132,15 +132,6 @@ jobs:
132132
- run: docker image ls
133133
- name: Delete default runner images
134134
run: |
135-
docker image rm node:20
136-
docker image rm node:20-alpine
137-
docker image rm node:18
138-
docker image rm node:18-alpine
139-
docker image rm debian:10
140-
docker image rm debian:11
141-
docker image rm ubuntu:22.04
142-
docker image rm ubuntu:20.04
143-
docker image rm moby/buildkit:latest
144135
rm -rf /usr/share/swift/
145136
- name: Wait for contracts deployment and C2D cluster to be ready
146137
working-directory: ${{ github.workspace }}/barge
@@ -226,15 +217,6 @@ jobs:
226217
- run: docker image ls
227218
- name: Delete default runner images
228219
run: |
229-
docker image rm node:20
230-
docker image rm node:20-alpine
231-
docker image rm node:18
232-
docker image rm node:18-alpine
233-
docker image rm debian:10
234-
docker image rm debian:11
235-
docker image rm ubuntu:22.04
236-
docker image rm ubuntu:20.04
237-
docker image rm moby/buildkit:latest
238220
rm -rf /usr/share/swift/
239221
240222
- name: Wait for contracts deployment and C2D cluster to be ready

docs/API.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -449,6 +449,38 @@ returns P2P peer
449449

450450
---
451451

452+
## find peer multiaddress
453+
454+
### `HTTP` GET /findPeer/?
455+
456+
#### Description
457+
458+
returns P2P peer multiaddresses if found in DHT
459+
460+
#### Query Parameters
461+
462+
| name | type | required | description |
463+
| ------- | ------ | -------- | ----------- |
464+
| peerId | string | v | peer id |
465+
| timeout | int | optional | timeout |
466+
467+
#### Response
468+
469+
```
470+
{
471+
"id": "16Uiu2HAmLhRDqfufZiQnxvQs2XHhd6hwkLSPfjAQg1gH8wgRixiP",
472+
"multiaddrs": [
473+
"/ip4/127.0.0.1/tcp/9000",
474+
"/ip4/127.0.0.1/tcp/9001/ws",
475+
"/ip4/172.18.0.2/tcp/9000",
476+
"/ip4/172.18.0.2/tcp/9001/ws",
477+
"/ip6/::1/tcp/9002"
478+
]
479+
}
480+
```
481+
482+
---
483+
452484
## Get P2P Peers
453485

454486
### `HTTP` GET /getP2PPeers

docs/env.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ Environmental variables are also tracked in `ENVIRONMENT_VARIABLES` within `src/
5555
- `P2P_pubsubPeerDiscoveryInterval`: Interval (in ms) for discovery using pubsub. Defaults to `10000` (three seconds). Example: `10000`
5656
- `P2P_dhtMaxInboundStreams`: Maximum number of DHT inbound streams. Defaults to `500`. Example: `500`
5757
- `P2P_dhtMaxOutboundStreams`: Maximum number of DHT outbound streams. Defaults to `500`. Example: `500`
58-
- `P2P_ENABLE_DHT_SERVER`: Enable DHT server mode. This should be enabled for bootstrapers & well established nodes. Default: `false`
58+
- `P2P_DHT_FILTER`: Filter address in DHT. 0 = (Default) No filter 1. Filter private ddresses. 2. Filter public addresses
5959
- `P2P_mDNSInterval`: Interval (in ms) for discovery using mDNS. Defaults to `20000` (20 seconds). Example: `20000`
6060
- `P2P_connectionsMaxParallelDials`: Maximum number of parallel dials. Defaults to `150`. Example: `150`
6161
- `P2P_connectionsDialTimeout`: Timeout for dial commands. Defaults to `10000` (10 seconds). Example: `10000`

src/@types/OceanNode.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,12 @@ export interface OceanNodeKeys {
2323
privateKey: any
2424
ethAddress: string
2525
}
26-
26+
/* eslint-disable no-unused-vars */
27+
export enum dhtFilterMethod {
28+
filterPrivate = 'filterPrivate', // default, remove all private addresses from DHT
29+
filterPublic = 'filterPublic', // remove all public addresses from DHT
30+
filterNone = 'filterNone' // do not remove all any addresses from DHT
31+
}
2732
export interface OceanNodeP2PConfig {
2833
bootstrapNodes: string[]
2934
bootstrapTimeout: number
@@ -41,7 +46,7 @@ export interface OceanNodeP2PConfig {
4146
pubsubPeerDiscoveryInterval: number
4247
dhtMaxInboundStreams: number
4348
dhtMaxOutboundStreams: number
44-
enableDHTServer: boolean
49+
dhtFilter: dhtFilterMethod
4550
mDNSInterval: number
4651
connectionsMaxParallelDials: number
4752
connectionsDialTimeout: number

src/components/P2P/index.ts

Lines changed: 42 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,22 @@ import { autoNAT } from '@libp2p/autonat'
2929
import { uPnPNAT } from '@libp2p/upnp-nat'
3030
import { ping } from '@libp2p/ping'
3131
import { dcutr } from '@libp2p/dcutr'
32-
import { kadDHT, passthroughMapper } from '@libp2p/kad-dht'
32+
import {
33+
kadDHT,
34+
passthroughMapper,
35+
removePrivateAddressesMapper,
36+
removePublicAddressesMapper
37+
} from '@libp2p/kad-dht'
3338
// import { gossipsub } from '@chainsafe/libp2p-gossipsub'
3439

3540
import { EVENTS, cidFromRawString } from '../../utils/index.js'
3641
import { Transform } from 'stream'
3742
import { Database } from '../database'
38-
import { OceanNodeConfig, FindDDOResponse } from '../../@types/OceanNode'
43+
import {
44+
OceanNodeConfig,
45+
FindDDOResponse,
46+
dhtFilterMethod
47+
} from '../../@types/OceanNode.js'
3948
// eslint-disable-next-line camelcase
4049
import is_ip_private from 'private-ip'
4150
import ip from 'ip'
@@ -275,8 +284,22 @@ export class OceanP2P extends EventEmitter {
275284
multiaddrs.filter((m) => this.shouldAnnounce(m))
276285
}
277286
}
287+
const dhtOptions = {
288+
allowQueryWithZeroPeers: false,
289+
maxInboundStreams: config.p2pConfig.dhtMaxInboundStreams,
290+
maxOutboundStreams: config.p2pConfig.dhtMaxOutboundStreams,
291+
clientMode: false, // always be a server
292+
kBucketSize: 20,
293+
protocol: '/ocean/nodes/1.0.0/kad/1.0.0',
294+
peerInfoMapper: passthroughMapper // see below
295+
}
296+
if (config.p2pConfig.dhtFilter === dhtFilterMethod.filterPrivate)
297+
dhtOptions.peerInfoMapper = removePrivateAddressesMapper
298+
if (config.p2pConfig.dhtFilter === dhtFilterMethod.filterPublic)
299+
dhtOptions.peerInfoMapper = removePublicAddressesMapper
278300
let servicesConfig = {
279301
identify: identify(),
302+
dht: kadDHT(dhtOptions),
280303
identifyPush: identifyPush(),
281304
/*
282305
pubsub: gossipsub({
@@ -292,27 +315,10 @@ export class OceanP2P extends EventEmitter {
292315
// enabled: true
293316
allowedTopics: ['oceanprotocol._peer-discovery._p2p._pubsub', 'oceanprotocol']
294317
}), */
295-
dht: kadDHT({
296-
// this is necessary because this node is not connected to the public network
297-
// it can be removed if, for example bootstrappers are configured
298-
allowQueryWithZeroPeers: true,
299-
maxInboundStreams: config.p2pConfig.dhtMaxInboundStreams,
300-
maxOutboundStreams: config.p2pConfig.dhtMaxOutboundStreams,
301-
302-
clientMode: false,
303-
kBucketSize: 20,
304-
protocol: '/ocean/nodes/1.0.0/kad/1.0.0',
305-
peerInfoMapper: passthroughMapper
306-
// protocolPrefix: '/ocean/nodes/1.0.0'
307-
// randomWalk: {
308-
// enabled: true, // Allows to disable discovery (enabled by default)
309-
// interval: 300e3,
310-
// timeout: 10e3
311-
// }
312-
}),
313318
ping: ping(),
314319
dcutr: dcutr()
315320
}
321+
316322
// eslint-disable-next-line no-constant-condition, no-self-compare
317323
if (config.p2pConfig.enableCircuitRelayServer) {
318324
P2P_LOGGER.info('Enabling Circuit Relay Server')
@@ -420,13 +426,6 @@ export class OceanP2P extends EventEmitter {
420426
this._upnp_interval = setInterval(this.UPnpCron.bind(this), 3000)
421427
}
422428

423-
if (config.p2pConfig.enableDHTServer) {
424-
try {
425-
await node.services.dht.setMode('server')
426-
} catch (e) {
427-
P2P_LOGGER.warn(`Failed to set mode server for DHT`)
428-
}
429-
}
430429
return node
431430
} catch (e) {
432431
P2P_LOGGER.logMessageWithEmoji(
@@ -598,6 +597,22 @@ export class OceanP2P extends EventEmitter {
598597
return finalmultiaddrs
599598
}
600599

600+
async findPeerInDht(peerName: string, timeout?: number) {
601+
try {
602+
const peer = peerIdFromString(peerName)
603+
const data = await this._libp2p.peerRouting.findPeer(peer, {
604+
signal:
605+
isNaN(timeout) || timeout === 0
606+
? AbortSignal.timeout(5000)
607+
: AbortSignal.timeout(timeout),
608+
useCache: true,
609+
useNetwork: true
610+
})
611+
return data
612+
} catch (e) {}
613+
return null
614+
}
615+
601616
async sendTo(
602617
peerName: string,
603618
message: string,

src/components/httpRoutes/getOceanPeers.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,26 @@ getOceanPeersRoute.get(
1818
}
1919
}
2020
)
21+
22+
getOceanPeersRoute.get(
23+
'/findPeer',
24+
express.urlencoded({ extended: true }),
25+
async (req: Request, res: Response): Promise<void> => {
26+
if (!req.query.peerId) {
27+
res.sendStatus(400)
28+
return
29+
}
30+
if (hasP2PInterface) {
31+
const peers = await req.oceanNode
32+
.getP2PNode()
33+
.findPeerInDht(String(req.query.peerId), parseInt(String(req.query.timeout)))
34+
if (peers) res.json(peers)
35+
else res.sendStatus(404).send('Cannot find peer')
36+
} else {
37+
sendMissingP2PResponse(res)
38+
}
39+
}
40+
)
2141
getOceanPeersRoute.get(
2242
'/getOceanPeers',
2343
async (req: Request, res: Response): Promise<void> => {

src/utils/config.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import type {
44
OceanNodeKeys,
55
OceanNodeDockerConfig
66
} from '../@types/OceanNode'
7+
import { dhtFilterMethod } from '../@types/OceanNode.js'
78
import type { C2DClusterInfo } from '../@types/C2D.js'
89
import { C2DClusterType } from '../@types/C2D.js'
910
import { createFromPrivKey } from '@libp2p/peer-id-factory'
@@ -548,6 +549,18 @@ async function getEnvConfig(isStartup?: boolean): Promise<OceanNodeConfig> {
548549
const interfaces = getNodeInterfaces(isStartup)
549550
let bootstrapTtl = getIntEnvValue(process.env.P2P_BOOTSTRAP_TTL, 120000)
550551
if (bootstrapTtl === 0) bootstrapTtl = Infinity
552+
let dhtFilterOption
553+
switch (getIntEnvValue(process.env.P2P_DHT_FILTER, 0)) {
554+
case 1:
555+
dhtFilterOption = dhtFilterMethod.filterPrivate
556+
break
557+
case 2:
558+
dhtFilterOption = dhtFilterMethod.filterPublic
559+
break
560+
default:
561+
dhtFilterOption = dhtFilterMethod.filterNone
562+
}
563+
551564
const config: OceanNodeConfig = {
552565
authorizedDecrypters: getAuthorizedDecrypters(isStartup),
553566
allowedValidators: getAllowedValidators(isStartup),
@@ -584,7 +597,7 @@ async function getEnvConfig(isStartup?: boolean): Promise<OceanNodeConfig> {
584597
),
585598
dhtMaxInboundStreams: getIntEnvValue(process.env.P2P_dhtMaxInboundStreams, 500),
586599
dhtMaxOutboundStreams: getIntEnvValue(process.env.P2P_dhtMaxOutboundStreams, 500),
587-
enableDHTServer: getBoolEnvValue('P2P_ENABLE_DHT_SERVER', false),
600+
dhtFilter: dhtFilterOption,
588601
mDNSInterval: getIntEnvValue(process.env.P2P_mDNSInterval, 20e3), // 20 seconds
589602
connectionsMaxParallelDials: getIntEnvValue(
590603
process.env.P2P_connectionsMaxParallelDials,

0 commit comments

Comments
 (0)