Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

Commit 9bf80b8

Browse files
committed
feat: add pingReadableStream and pingPullStream
License: MIT Signed-off-by: Alan Shaw <[email protected]>
1 parent 08da824 commit 9bf80b8

File tree

8 files changed

+145
-122
lines changed

8 files changed

+145
-122
lines changed

src/cli/commands/ping.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,13 @@ module.exports = {
2222
const count = argv.count || 10
2323
pull(
2424
argv.ipfs.pingPullStream(peerId, { count }),
25-
pull.drain(({ Time, Text }) => {
25+
pull.drain(({ time, text }) => {
2626
// Check if it's a pong
27-
if (Time) {
28-
print(`Pong received: time=${Time} ms`)
27+
if (time) {
28+
print(`Pong received: time=${time} ms`)
2929
// Status response
3030
} else {
31-
print(Text)
31+
print(text)
3232
}
3333
})
3434
)

src/core/components/index.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ exports.dag = require('./dag')
1616
exports.libp2p = require('./libp2p')
1717
exports.swarm = require('./swarm')
1818
exports.ping = require('./ping')
19+
exports.pingPullStream = require('./ping-pull-stream')
20+
exports.pingReadableStream = require('./ping-readable-stream')
1921
exports.files = require('./files')
2022
exports.bitswap = require('./bitswap')
2123
exports.pubsub = require('./pubsub')
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
'use strict'
2+
3+
const debug = require('debug')
4+
const OFFLINE_ERROR = require('../utils').OFFLINE_ERROR
5+
const PeerId = require('peer-id')
6+
const pull = require('pull-stream')
7+
const Pushable = require('pull-pushable')
8+
const waterfall = require('async/waterfall')
9+
10+
const log = debug('jsipfs:pingPullStream')
11+
log.error = debug('jsipfs:pingPullStream:error')
12+
13+
module.exports = function pingPullStream (self) {
14+
return (peerId, opts) => {
15+
if (!self.isOnline()) {
16+
return pull.error(new Error(OFFLINE_ERROR))
17+
}
18+
19+
opts = Object.assign({ count: 10 }, opts)
20+
21+
const source = Pushable()
22+
23+
waterfall([
24+
(cb) => getPeer(self._libp2pNode, source, peerId, cb),
25+
(peer, cb) => runPing(self._libp2pNode, source, opts.count, peer, cb)
26+
], (err) => {
27+
if (err) {
28+
log.error(err)
29+
source.push(getPacket({ text: err.toString() }))
30+
source.end(err)
31+
}
32+
})
33+
34+
return source
35+
}
36+
}
37+
38+
function getPacket (msg) {
39+
// Default msg
40+
const basePacket = { success: true, time: 0, text: '' }
41+
return Object.assign(basePacket, msg)
42+
}
43+
44+
function getPeer (libp2pNode, statusStream, peerId, cb) {
45+
let peer
46+
try {
47+
peer = libp2pNode.peerBook.get(peerId)
48+
} catch (err) {
49+
log('Peer not found in peer book, trying peer routing')
50+
// Share lookup status just as in the go implemmentation
51+
statusStream.push(getPacket({ text: `Looking up peer ${peerId}` }))
52+
// Try to use peerRouting
53+
return libp2pNode.peerRouting.findPeer(PeerId.createFromB58String(peerId), cb)
54+
}
55+
return cb(null, peer)
56+
}
57+
58+
function runPing (libp2pNode, statusStream, count, peer, cb) {
59+
libp2pNode.ping(peer, (err, p) => {
60+
if (err) {
61+
return cb(err)
62+
}
63+
64+
log('Got peer', peer)
65+
66+
let packetCount = 0
67+
let totalTime = 0
68+
statusStream.push(getPacket({ text: `PING ${peer.id.toB58String()}` }))
69+
70+
p.on('ping', (time) => {
71+
statusStream.push(getPacket({ time: time }))
72+
totalTime += time
73+
packetCount++
74+
if (packetCount >= count) {
75+
const average = totalTime / count
76+
p.stop()
77+
statusStream.push(getPacket({ text: `Average latency: ${average}ms` }))
78+
statusStream.end()
79+
}
80+
})
81+
82+
p.on('error', (err) => {
83+
log.error(err)
84+
p.stop()
85+
statusStream.push(getPacket({ success: false, text: err.toString() }))
86+
statusStream.end(err)
87+
})
88+
89+
p.start()
90+
91+
return cb()
92+
})
93+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
'use strict'
2+
3+
const toStream = require('pull-stream-to-stream')
4+
5+
module.exports = function pingReadableStream (self) {
6+
return (peerId, opts) => toStream.source(self.pingPullStream(peerId, opts))
7+
}

src/core/components/ping.js

Lines changed: 4 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -1,97 +1,13 @@
11
'use strict'
22

33
const promisify = require('promisify-es6')
4-
const debug = require('debug')
5-
const OFFLINE_ERROR = require('../utils').OFFLINE_ERROR
6-
const PeerId = require('peer-id')
74
const pull = require('pull-stream/pull')
8-
const Pushable = require('pull-pushable')
9-
const ndjson = require('pull-ndjson')
10-
const waterfall = require('async/waterfall')
11-
12-
const log = debug('jsipfs:ping')
13-
log.error = debug('jsipfs:ping:error')
145

156
module.exports = function ping (self) {
16-
return promisify((peerId, count, cb) => {
17-
if (!self.isOnline()) {
18-
return cb(new Error(OFFLINE_ERROR))
19-
}
20-
21-
const source = Pushable()
22-
23-
const response = pull(
24-
source,
25-
ndjson.serialize()
7+
return promisify((peerId, opts, cb) => {
8+
pull(
9+
self.pingPullStream(peerId, opts),
10+
pull.collect(cb)
2611
)
27-
waterfall([
28-
getPeer.bind(null, self._libp2pNode, source, peerId),
29-
runPing.bind(null, self._libp2pNode, source, count)
30-
], (err) => {
31-
if (err) {
32-
log.error(err)
33-
source.push(getPacket({Text: err.toString()}))
34-
source.end(err)
35-
}
36-
})
37-
38-
cb(null, response)
39-
})
40-
}
41-
42-
function getPacket (msg) {
43-
// Default msg
44-
const basePacket = {Success: false, Time: 0, Text: ''}
45-
return Object.assign(basePacket, msg)
46-
}
47-
48-
function getPeer (libp2pNode, statusStream, peerId, cb) {
49-
let peer
50-
try {
51-
peer = libp2pNode.peerBook.get(peerId)
52-
} catch (err) {
53-
log('Peer not found in peer book, trying peer routing')
54-
// Share lookup status just as in the go implemmentation
55-
statusStream.push(getPacket({Success: true, Text: `Looking up peer ${peerId}`}))
56-
// Try to use peerRouting
57-
return libp2pNode.peerRouting.findPeer(PeerId.createFromB58String(peerId), cb)
58-
}
59-
return cb(null, peer)
60-
}
61-
62-
function runPing (libp2pNode, statusStream, count, peer, cb) {
63-
libp2pNode.ping(peer, (err, p) => {
64-
if (err) {
65-
return cb(err)
66-
}
67-
68-
log('Got peer', peer)
69-
70-
let packetCount = 0
71-
let totalTime = 0
72-
statusStream.push(getPacket({Success: true, Text: `PING ${peer.id.toB58String()}`}))
73-
74-
p.on('ping', (time) => {
75-
statusStream.push(getPacket({ Success: true, Time: time }))
76-
totalTime += time
77-
packetCount++
78-
if (packetCount >= count) {
79-
const average = totalTime / count
80-
p.stop()
81-
statusStream.push(getPacket({ Success: true, Text: `Average latency: ${average}ms` }))
82-
statusStream.end()
83-
}
84-
})
85-
86-
p.on('error', (err) => {
87-
log.error(err)
88-
p.stop()
89-
statusStream.push(getPacket({Text: err.toString()}))
90-
statusStream.end(err)
91-
})
92-
93-
p.start()
94-
95-
return cb()
9612
})
9713
}

src/core/index.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ class IPFS extends EventEmitter {
102102
this.files = components.files(this)
103103
this.bitswap = components.bitswap(this)
104104
this.ping = components.ping(this)
105+
this.pingPullStream = components.pingPullStream(this)
106+
this.pingReadableStream = components.pingReadableStream(this)
105107
this.pubsub = components.pubsub(this)
106108
this.dht = components.dht(this)
107109
this.dns = components.dns(this)

src/http/api/resources/ping.js

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
'use strict'
22

33
const Joi = require('joi')
4-
const boom = require('boom')
4+
const pull = require('pull-stream')
55
const toStream = require('pull-stream-to-stream')
6+
const ndjson = require('pull-ndjson')
67
const PassThrough = require('readable-stream').PassThrough
78
const pump = require('pump')
89

9-
exports = module.exports
10-
1110
exports.get = {
1211
validate: {
1312
query: Joi.object().keys({
@@ -26,18 +25,24 @@ exports.get = {
2625
const peerId = request.query.arg
2726
// Default count to 10
2827
const count = request.query.n || request.query.count || 10
29-
ipfs.ping(peerId, count, (err, pullStream) => {
30-
if (err) {
31-
return reply(boom.badRequest(err))
32-
}
33-
// Streams from pull-stream-to-stream don't seem to be compatible
34-
// with the stream2 readable interface
35-
// see: https://github.com/hapijs/hapi/blob/c23070a3de1b328876d5e64e679a147fafb04b38/lib/response.js#L533
36-
// and: https://github.com/pull-stream/pull-stream-to-stream/blob/e436acee18b71af8e71d1b5d32eee642351517c7/index.js#L28
37-
const responseStream = toStream.source(pullStream)
38-
const stream2 = new PassThrough()
39-
pump(responseStream, stream2)
40-
return reply(stream2).type('application/json').header('X-Chunked-Output', '1')
41-
})
28+
29+
const source = pull(
30+
ipfs.pingPullStream(peerId, { count: count }),
31+
pull.map((chunk) => ({
32+
Success: chunk.success,
33+
Time: chunk.time,
34+
Text: chunk.text
35+
})),
36+
ndjson.serialize()
37+
)
38+
39+
// Streams from pull-stream-to-stream don't seem to be compatible
40+
// with the stream2 readable interface
41+
// see: https://github.com/hapijs/hapi/blob/c23070a3de1b328876d5e64e679a147fafb04b38/lib/response.js#L533
42+
// and: https://github.com/pull-stream/pull-stream-to-stream/blob/e436acee18b71af8e71d1b5d32eee642351517c7/index.js#L28
43+
const responseStream = toStream.source(source)
44+
const stream2 = new PassThrough()
45+
pump(responseStream, stream2)
46+
return reply(stream2).type('application/json').header('X-Chunked-Output', '1')
4247
}
4348
}

test/core/ping.spec.js

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ function spawnNode ({ dht = false }, cb) {
3232
}, cb)
3333
}
3434

35-
describe('ping', function () {
35+
describe.only('ping', function () {
3636
this.timeout(60 * 1000)
3737

3838
if (!isNode) return
@@ -85,10 +85,10 @@ describe('ping', function () {
8585
const count = 3
8686
pull(
8787
ipfsdA.api.pingPullStream(ipfsdBId, { count }),
88-
drain(({ Success, Time, Text }) => {
89-
expect(Success).to.be.true()
88+
drain(({ success, time, text }) => {
89+
expect(success).to.be.true()
9090
// It's a pong
91-
if (Time) {
91+
if (time) {
9292
packetNum++
9393
}
9494
}, (err) => {
@@ -104,16 +104,16 @@ describe('ping', function () {
104104
const count = 2
105105
pull(
106106
ipfsdA.api.pingPullStream('unknown', { count }),
107-
drain(({ Success, Time, Text }) => {
107+
drain(({ success, time, text }) => {
108108
messageNum++
109109
// Assert that the ping command falls back to the peerRouting
110110
if (messageNum === 1) {
111-
expect(Text).to.include('Looking up')
111+
expect(text).to.include('Looking up')
112112
}
113113

114114
// Fails accordingly while trying to use peerRouting
115115
if (messageNum === 2) {
116-
expect(Success).to.be.false()
116+
expect(success).to.be.false()
117117
}
118118
}, (err) => {
119119
expect(err).to.not.exist()
@@ -175,11 +175,9 @@ describe('ping', function () {
175175
parallel([
176176
ipfsdA.api.swarm.connect.bind(ipfsdA.api, bMultiaddr),
177177
ipfsdB.api.swarm.connect.bind(ipfsdB.api, cMultiaddr)
178-
], (err) => setTimeout(() => done(err), 500)) // FIXME timeout needed for connections to succeed
178+
], (err) => setTimeout(() => done(err), 500)) // FIXME timeout needed for connections to succeed
179179
})
180180

181-
182-
183181
after((done) => ipfsdA.stop(done))
184182
after((done) => ipfsdB.stop(done))
185183
after((done) => ipfsdC.stop(done))
@@ -190,15 +188,15 @@ describe('ping', function () {
190188
const count = 3
191189
pull(
192190
ipfsdA.api.pingPullStream(ipfsdCId, { count }),
193-
drain(({ Success, Time, Text }) => {
191+
drain(({ success, time, text }) => {
194192
messageNum++
195-
expect(Success).to.be.true()
193+
expect(success).to.be.true()
196194
// Assert that the ping command falls back to the peerRouting
197195
if (messageNum === 1) {
198-
expect(Text).to.include('Looking up')
196+
expect(text).to.include('Looking up')
199197
}
200198
// It's a pong
201-
if (Time) {
199+
if (time) {
202200
packetNum++
203201
}
204202
}, (err) => {

0 commit comments

Comments
 (0)