Skip to content

Commit d904ba9

Browse files
authored
Bug/remove_pubsub (#652)
* remove pubsub * remove broadcast command & cleanup
1 parent af75e1e commit d904ba9

File tree

10 files changed

+39
-178
lines changed

10 files changed

+39
-178
lines changed

docs/API.md

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -404,32 +404,6 @@ returns list of logs
404404

405405
---
406406

407-
## Broadcast Command
408-
409-
### `HTTP` POST /broadcastCommand
410-
411-
#### Description
412-
413-
returns an empty if command is valid
414-
415-
#### Parameters
416-
417-
| name | type | required | description |
418-
| ------- | ------ | -------- | ---------------------------- |
419-
| command | string | v | command name |
420-
| ... | any | | any other command parameters |
421-
422-
#### Request
423-
424-
```json
425-
{
426-
"command": "echo",
427-
"...": "..."
428-
}
429-
```
430-
431-
---
432-
433407
## Advertise Did
434408

435409
### `HTTP` GET /advertiseDid/?did=did:op:123"

src/@types/OceanNode.ts

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -125,12 +125,6 @@ export interface OceanNodeStatus {
125125
supportedSchemas?: Schema[]
126126
}
127127

128-
export interface P2PBroadcastResponse {
129-
command: string // original broadcast command
130-
message: any // original broadcast message
131-
response: any // the actual response to the original command and message
132-
}
133-
134128
export interface FindDDOResponse {
135129
provider: string
136130
id: string

src/@types/commands.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -137,11 +137,6 @@ export interface ICommandHandler {
137137
validate(command: Command): ValidateParams
138138
}
139139

140-
export interface BroadcastCommand {
141-
command: string // the name of the command
142-
message: any // the message to broadcast
143-
}
144-
145140
export interface ComputeGetEnvironmentsCommand extends Command {
146141
chainId?: number
147142
}

src/components/P2P/handleBroadcasts.ts

Lines changed: 0 additions & 29 deletions
This file was deleted.

src/components/P2P/handlers.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1 @@
1-
export * from './handleBroadcasts.js'
21
export * from './handleProtocolCommands.js'

src/components/P2P/index.ts

Lines changed: 37 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import EventEmitter from 'node:events'
44
import clone from 'lodash.clonedeep'
55

66
import {
7-
// handleBroadcasts,
87
// handlePeerConnect,
98
// handlePeerDiscovery,
109
// handlePeerDisconnect,
@@ -19,7 +18,7 @@ import { mdns } from '@libp2p/mdns'
1918
import { yamux } from '@chainsafe/libp2p-yamux'
2019
import { peerIdFromString } from '@libp2p/peer-id'
2120
import { pipe } from 'it-pipe'
22-
import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery'
21+
// import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery'
2322

2423
import { tcp } from '@libp2p/tcp'
2524
import { webSockets } from '@libp2p/websockets'
@@ -31,7 +30,7 @@ import { uPnPNAT } from '@libp2p/upnp-nat'
3130
import { ping } from '@libp2p/ping'
3231
import { dcutr } from '@libp2p/dcutr'
3332
import { kadDHT, passthroughMapper } from '@libp2p/kad-dht'
34-
import { gossipsub } from '@chainsafe/libp2p-gossipsub'
33+
// import { gossipsub } from '@chainsafe/libp2p-gossipsub'
3534

3635
import { EVENTS, cidFromRawString } from '../../utils/index.js'
3736
import { Transform } from 'stream'
@@ -40,11 +39,7 @@ import { OceanNodeConfig, FindDDOResponse } from '../../@types/OceanNode'
4039
// eslint-disable-next-line camelcase
4140
import is_ip_private from 'private-ip'
4241
import ip from 'ip'
43-
import {
44-
GENERIC_EMOJIS,
45-
LOG_LEVELS_STR,
46-
getLoggerLevelEmoji
47-
} from '../../utils/logging/Logger.js'
42+
import { GENERIC_EMOJIS, LOG_LEVELS_STR } from '../../utils/logging/Logger.js'
4843
import { INDEXER_DDO_EVENT_EMITTER } from '../Indexer/index.js'
4944
import { P2P_LOGGER } from '../../utils/logging/common.js'
5045
import { CoreHandlersRegistry } from '../core/handler/coreHandlersRegistry'
@@ -119,9 +114,10 @@ export class OceanP2P extends EventEmitter {
119114
this._libp2p.addEventListener('peer:disconnect', (evt: any) => {
120115
this.handlePeerDisconnect(evt)
121116
})
122-
this._libp2p.addEventListener('peer:discovery', (evt: any) => {
123-
this.handlePeerDiscovery(evt)
117+
this._libp2p.addEventListener('peer:discovery', (details: any) => {
118+
this.handlePeerDiscovery(details)
124119
})
120+
125121
this._options = Object.assign({}, clone(DEFAULT_OPTIONS), clone(options))
126122
this._peers = []
127123
this._connections = {}
@@ -161,9 +157,22 @@ export class OceanP2P extends EventEmitter {
161157
P2P_LOGGER.debug('Connection closed to:' + peerId.toString()) // Emitted when a peer has been found
162158
}
163159

164-
handlePeerDiscovery(details: any) {
165-
const peerInfo = details.detail
166-
P2P_LOGGER.debug('Discovered new peer:' + peerInfo.id.toString())
160+
async handlePeerDiscovery(details: any) {
161+
try {
162+
const peerInfo = details.detail
163+
P2P_LOGGER.debug('Discovered new peer:' + peerInfo.id.toString())
164+
if (peerInfo.multiaddrs) {
165+
await this._libp2p.peerStore.save(peerInfo.id, {
166+
multiaddrs: peerInfo.multiaddrs
167+
})
168+
await this._libp2p.peerStore.patch(peerInfo.id, {
169+
multiaddrs: peerInfo.multiaddrs
170+
})
171+
}
172+
} catch (e) {
173+
// no panic if it failed
174+
// console.error(e)
175+
}
167176
}
168177

169178
handlePeerJoined(details: any) {
@@ -233,7 +242,6 @@ export class OceanP2P extends EventEmitter {
233242
this._privateKey = config.keys.privateKey
234243
/** @type {import('libp2p').Libp2pOptions} */
235244
// start with some default, overwrite based on config later
236-
let doPx = false
237245
const bindInterfaces = []
238246
if (config.p2pConfig.enableIPV4) {
239247
P2P_LOGGER.info('Binding P2P sockets to IPV4')
@@ -258,7 +266,6 @@ export class OceanP2P extends EventEmitter {
258266
config.p2pConfig.announceAddresses &&
259267
config.p2pConfig.announceAddresses.length > 0
260268
) {
261-
doPx = true
262269
addresses = {
263270
listen: bindInterfaces,
264271
announceFilter: (multiaddrs: any[]) =>
@@ -274,6 +281,7 @@ export class OceanP2P extends EventEmitter {
274281
}
275282
let servicesConfig = {
276283
identify: identify(),
284+
/*
277285
pubsub: gossipsub({
278286
fallbackToFloodsub: false,
279287
batchPublish: false,
@@ -286,7 +294,7 @@ export class OceanP2P extends EventEmitter {
286294
// canRelayMessage: true,
287295
// enabled: true
288296
allowedTopics: ['oceanprotocol._peer-discovery._p2p._pubsub', 'oceanprotocol']
289-
}),
297+
}), */
290298
dht: kadDHT({
291299
// this is necessary because this node is not connected to the public network
292300
// it can be removed if, for example bootstrappers are configured
@@ -372,7 +380,7 @@ export class OceanP2P extends EventEmitter {
372380
}),
373381
mdns({
374382
interval: config.p2pConfig.mDNSInterval
375-
}),
383+
}) /*,
376384
pubsubPeerDiscovery({
377385
interval: config.p2pConfig.pubsubPeerDiscoveryInterval,
378386
topics: [
@@ -381,7 +389,7 @@ export class OceanP2P extends EventEmitter {
381389
// '_peer-discovery._p2p._pubsub' // Include if you want to participate in the global space
382390
],
383391
listenOnly: false
384-
})
392+
}) */
385393
]
386394
}
387395
}
@@ -393,7 +401,7 @@ export class OceanP2P extends EventEmitter {
393401
peerDiscovery: [
394402
mdns({
395403
interval: config.p2pConfig.mDNSInterval
396-
}),
404+
}) /*,
397405
pubsubPeerDiscovery({
398406
interval: config.p2pConfig.pubsubPeerDiscoveryInterval,
399407
topics: [
@@ -402,37 +410,14 @@ export class OceanP2P extends EventEmitter {
402410
// '_peer-discovery._p2p._pubsub' // Include if you want to participate in the global space
403411
],
404412
listenOnly: false
405-
})
413+
}) */
406414
]
407415
}
408416
}
409417
}
410418
const node = await createLibp2p(options)
411419
await node.start()
412420

413-
// node.services.pubsub.addEventListener( 'peer joined', (evt:any) => {handlePeerJoined(evt)})
414-
// node.services.pubsub.addEventListener('peer left', (evt:any) => {handlePeerLeft(evt)})
415-
// node.services.pubsub.addEventListener('subscription-change', (evt:any) => { handleSubscriptionCHange(evt)})
416-
417-
// this._libp2p.services.pubsub.on('peer joined', (peer:any) => {
418-
// console.log('New peer joined us:', peer)
419-
// })
420-
// this._libp2p.services.pubsub.addEventListener('peer left', (evt:any) => {
421-
// console.log('Peer left...', evt)
422-
// })
423-
// this._libp2p.services.pubsub.on('peer left', (peer:any) => {
424-
// console.log('Peer left...', peer)
425-
// })
426-
427-
/* since we don't have broadcasts implemented, comment this part of the code
428-
node.services.pubsub.addEventListener('message', (message: any) => {
429-
handleBroadcasts(this._topic, message)
430-
})
431-
*/
432-
433-
node.services.pubsub.subscribe(this._topic)
434-
node.services.pubsub.publish(this._topic, encoding('online'))
435-
436421
const upnpService = (node.services as any).upnpNAT
437422
if (config.p2pConfig.upnp && upnpService) {
438423
this._upnp_interval = setInterval(this.UPnpCron.bind(this), 3000)
@@ -489,24 +474,24 @@ export class OceanP2P extends EventEmitter {
489474

490475
async getOceanPeers(running: boolean = true, known: boolean = true) {
491476
const peers: string[] = []
492-
if (running) {
477+
/* if (running) {
493478
// get pubsub peers
494479
const node = <any>this._libp2p
495480
const newPeers = (await node.services.pubsub.getSubscribers(this._topic)).sort()
496481
for (const peer of newPeers.slice(0)) {
497482
if (!peers.includes(peer.toString)) peers.push(peer.toString())
498483
}
499-
}
484+
} */
500485
if (known) {
501486
// get p2p peers and filter them by protocol
502487
for (const peer of await this._libp2p.peerStore.all()) {
503-
if (peer && peer.protocols) {
504-
for (const protocol of peer.protocols) {
505-
if (protocol === this._protocol) {
506-
if (!peers.includes(peer.id.toString())) peers.push(peer.id.toString())
507-
}
508-
}
509-
}
488+
// if (peer && peer.protocols) {
489+
// for (const protocol of peer.protocols) {
490+
// if (protocol === this._protocol) {
491+
if (!peers.includes(peer.id.toString())) peers.push(peer.id.toString())
492+
// }
493+
// }
494+
// }
510495
}
511496
}
512497

@@ -518,18 +503,6 @@ export class OceanP2P extends EventEmitter {
518503
return Boolean(s.find((p: any) => p.toString() === peer.toString()))
519504
}
520505

521-
async broadcast(_message: any) {
522-
P2P_LOGGER.logMessage('Broadcasting:', true)
523-
P2P_LOGGER.logMessageWithEmoji(
524-
_message,
525-
true,
526-
getLoggerLevelEmoji(LOG_LEVELS_STR.LEVEL_INFO),
527-
LOG_LEVELS_STR.LEVEL_INFO
528-
)
529-
const message = encoding(_message)
530-
await this._libp2p.services.pubsub.publish(this._topic, message)
531-
}
532-
533506
async getPeerDetails(peerName: string) {
534507
try {
535508
const peerId = peerIdFromString(peerName)
@@ -1013,11 +986,3 @@ export class OceanP2P extends EventEmitter {
1013986
this._upnp_interval = setInterval(this.UPnpCron.bind(this), 3000)
1014987
}
1015988
}
1016-
1017-
function encoding(message: any) {
1018-
if (!(message instanceof Uint8Array)) {
1019-
return uint8ArrayFromString(message)
1020-
}
1021-
1022-
return message
1023-
}

src/components/httpRoutes/commands.ts

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,35 +3,10 @@ import express, { Request, Response } from 'express'
33
import { P2PCommandResponse } from '../../@types'
44
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
55

6-
import { getDefaultLevel } from '../../utils/logging/Logger.js'
7-
86
import { HTTP_LOGGER } from '../../utils/logging/common.js'
9-
import { hasP2PInterface, sendMissingP2PResponse } from './index.js'
7+
import { hasP2PInterface } from './index.js'
108
import { validateCommandParameters } from './validateCommands.js'
119

12-
export const broadcastCommandRoute = express.Router()
13-
14-
broadcastCommandRoute.post(
15-
'/broadcastCommand',
16-
express.json(),
17-
async (req: Request, res: Response): Promise<void> => {
18-
const validate = validateCommandParameters(req.body, [])
19-
if (!validate.valid) {
20-
res.status(validate.status).send(validate.reason)
21-
return
22-
}
23-
24-
HTTP_LOGGER.log(getDefaultLevel(), `broadcastCommand received ${req.body}`, true)
25-
26-
if (hasP2PInterface) {
27-
await req.oceanNode.getP2PNode().broadcast(JSON.stringify(req.body))
28-
res.sendStatus(200)
29-
} else {
30-
sendMissingP2PResponse(res)
31-
}
32-
}
33-
)
34-
3510
export const directCommandRoute = express.Router()
3611
directCommandRoute.post(
3712
'/directCommand',

src/components/httpRoutes/index.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import express, { Response } from 'express'
22
import { getOceanPeersRoute, getP2PPeersRoute, getP2PPeerRoute } from './getOceanPeers.js'
33
import { advertiseDidRoute, getProvidersForDidRoute } from './dids.js'
4-
import { broadcastCommandRoute, directCommandRoute } from './commands.js'
4+
import { directCommandRoute } from './commands.js'
55
import { logRoutes } from './logs.js'
66
import { providerRoutes } from './provider.js'
77
import { aquariusRoutes } from './aquarius.js'
@@ -34,8 +34,6 @@ httpRoutes.use(getP2PPeerRoute)
3434
httpRoutes.use(advertiseDidRoute)
3535
// /getProvidersForDid
3636
httpRoutes.use(getProvidersForDidRoute)
37-
// /broadcastCommand
38-
httpRoutes.use(broadcastCommandRoute)
3937
// /directCommand
4038
httpRoutes.use(directCommandRoute)
4139
// /logs

src/components/httpRoutes/routeUtils.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,6 @@ routesNames.set('directCommand', {
9292
method: 'post'
9393
})
9494

95-
routesNames.set('broadcastCommand', {
96-
path: '/broadcastCommand',
97-
method: 'post'
98-
})
9995
// fileInfo
10096
routesNames.set('fileInfo', {
10197
path: `${SERVICES_API_BASE_PATH}/fileInfo`,

0 commit comments

Comments
 (0)