Skip to content
This repository was archived by the owner on Aug 1, 2023. It is now read-only.

Commit 59b19c1

Browse files
authored
Merge pull request #33 from ipfs/re-enable-pubsub-tests
test: re-enable pubsub tests
2 parents de333b8 + 99b5bef commit 59b19c1

File tree

1 file changed

+82
-37
lines changed

1 file changed

+82
-37
lines changed

test/pubsub.js

Lines changed: 82 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ chai.use(dirtyChai)
88

99
const series = require('async/series')
1010
const parallel = require('async/parallel')
11+
const retry = require('async/retry')
12+
const auto = require('async/auto')
1113

1214
const DaemonFactory = require('ipfsd-ctl')
1315

@@ -28,7 +30,60 @@ function waitFor (predicate, callback) {
2830
}, 500)
2931
}
3032

31-
describe.skip('pubsub', function () {
33+
const connect = (jsD, goD, callback) => {
34+
parallel([
35+
(cb) => jsD.api.id(cb),
36+
(cb) => goD.api.id(cb)
37+
], (error, ids) => {
38+
if (error) {
39+
return callback(error)
40+
}
41+
42+
const jsLocalAddr = ids[0].addresses.find(a => a.includes('127.0.0.1'))
43+
const goLocalAddr = ids[1].addresses.find(a => a.includes('127.0.0.1'))
44+
45+
parallel([
46+
(cb) => jsD.api.swarm.connect(goLocalAddr, cb),
47+
(cb) => goD.api.swarm.connect(jsLocalAddr, cb)
48+
], callback)
49+
})
50+
}
51+
52+
const waitForTopicPeer = (topic, peer, daemon, callback) => {
53+
retry({
54+
times: 5,
55+
interval: 1000
56+
}, (next) => {
57+
daemon.api.pubsub.peers(topic, (error, peers) => {
58+
if (error) {
59+
return next(error)
60+
}
61+
62+
if (!peers.includes(peer.id)) {
63+
return next(new Error(`Could not find peer ${peer.id}`))
64+
}
65+
66+
return next()
67+
})
68+
}, callback)
69+
}
70+
71+
const subscribe = (topic, onMessage, localDaemon, remoteDaemon, callback) => {
72+
auto({
73+
// get the ID of the local daemon
74+
localDaemonId: (cb) => localDaemon.api.id(cb),
75+
76+
// subscribe to the topic on our local daemon
77+
subscribeLocalDaemon: (cb) => localDaemon.api.pubsub.subscribe(topic, onMessage, cb),
78+
79+
// wait for the local daemon to appear in the peer list for the remote daemon
80+
waitForRemotePeer: ['localDaemonId', 'subscribeLocalDaemon', (results, cb) => {
81+
waitForTopicPeer(topic, results.localDaemonId, remoteDaemon, cb)
82+
}]
83+
}, (error) => callback(error))
84+
}
85+
86+
describe('pubsub', function () {
3287
this.timeout(10 * 1000)
3388

3489
let jsD
@@ -54,7 +109,20 @@ describe.skip('pubsub', function () {
54109
nodes = n
55110
goD = nodes[0]
56111
jsD = nodes[1]
57-
done()
112+
113+
parallel([
114+
(cb) => jsD.api.id(cb),
115+
(cb) => goD.api.id(cb)
116+
], (error, ids) => {
117+
if (error) {
118+
return done(error)
119+
}
120+
121+
jsId = ids[0].id
122+
goId = ids[1].id
123+
124+
done()
125+
})
58126
})
59127
})
60128

@@ -63,29 +131,6 @@ describe.skip('pubsub', function () {
63131
parallel(nodes.map((node) => (cb) => node.stop(cb)), done)
64132
})
65133

66-
it('make connections', (done) => {
67-
series([
68-
(cb) => jsD.api.id(cb),
69-
(cb) => goD.api.id(cb)
70-
], (err, ids) => {
71-
expect(err).to.not.exist()
72-
73-
jsId = ids[0].id
74-
goId = ids[1].id
75-
76-
const jsLocalAddr = ids[0].addresses.find(a => a.includes('127.0.0.1'))
77-
const goLocalAddr = ids[1].addresses.find(a => a.includes('127.0.0.1'))
78-
79-
parallel([
80-
(cb) => jsD.api.swarm.connect(goLocalAddr, cb),
81-
(cb) => goD.api.swarm.connect(jsLocalAddr, cb),
82-
(cb) => setTimeout(() => {
83-
cb()
84-
}, 1000)
85-
], done)
86-
})
87-
})
88-
89134
describe('ascii data', () => {
90135
const data = Buffer.from('hello world')
91136

@@ -143,8 +188,8 @@ describe.skip('pubsub', function () {
143188
}
144189

145190
series([
146-
(cb) => goD.api.pubsub.subscribe(topic, checkMessage, cb),
147-
(cb) => setTimeout(() => { cb() }, 500),
191+
(cb) => connect(jsD, goD, cb),
192+
(cb) => subscribe(topic, checkMessage, goD, jsD, cb),
148193
(cb) => jsD.api.pubsub.publish(topic, data, cb),
149194
(cb) => waitFor(() => n === 1, cb)
150195
], done)
@@ -164,8 +209,8 @@ describe.skip('pubsub', function () {
164209
}
165210

166211
series([
167-
(cb) => jsD.api.pubsub.subscribe(topic, checkMessage, cb),
168-
(cb) => setTimeout(() => { cb() }, 1000),
212+
(cb) => connect(jsD, goD, cb),
213+
(cb) => subscribe(topic, checkMessage, jsD, goD, cb),
169214
(cb) => goD.api.pubsub.publish(topic, data, cb),
170215
(cb) => waitFor(() => n === 1, cb)
171216
], done)
@@ -229,8 +274,8 @@ describe.skip('pubsub', function () {
229274
}
230275

231276
series([
232-
(cb) => goD.api.pubsub.subscribe(topic, checkMessage, cb),
233-
(cb) => setTimeout(() => { cb() }, 500),
277+
(cb) => connect(jsD, goD, cb),
278+
(cb) => subscribe(topic, checkMessage, goD, jsD, cb),
234279
(cb) => jsD.api.pubsub.publish(topic, data, cb),
235280
(cb) => waitFor(() => n === 1, cb)
236281
], done)
@@ -250,8 +295,8 @@ describe.skip('pubsub', function () {
250295
}
251296

252297
series([
253-
(cb) => jsD.api.pubsub.subscribe(topic, checkMessage, cb),
254-
(cb) => setTimeout(() => { cb() }, 500),
298+
(cb) => connect(jsD, goD, cb),
299+
(cb) => subscribe(topic, checkMessage, jsD, goD, cb),
255300
(cb) => goD.api.pubsub.publish(topic, data, cb),
256301
(cb) => waitFor(() => n === 1, cb)
257302
], done)
@@ -296,8 +341,8 @@ describe.skip('pubsub', function () {
296341
}
297342

298343
series([
299-
(cb) => jsD.api.pubsub.subscribe(topic, checkMessage, cb),
300-
(cb) => setTimeout(() => { cb() }, 500),
344+
(cb) => connect(jsD, goD, cb),
345+
(cb) => subscribe(topic, checkMessage, jsD, goD, cb),
301346
(cb) => goD.api.pubsub.publish(topic, data, cb),
302347
(cb) => waitFor(() => n === 1, cb)
303348
], done)
@@ -317,8 +362,8 @@ describe.skip('pubsub', function () {
317362
}
318363

319364
series([
320-
(cb) => goD.api.pubsub.subscribe(topic, checkMessage, cb),
321-
(cb) => setTimeout(() => { cb() }, 500),
365+
(cb) => connect(jsD, goD, cb),
366+
(cb) => subscribe(topic, checkMessage, goD, jsD, cb),
322367
(cb) => jsD.api.pubsub.publish(topic, data, cb),
323368
(cb) => waitFor(() => n === 1, cb)
324369
], done)

0 commit comments

Comments
 (0)