Skip to content
This repository was archived by the owner on Aug 1, 2023. It is now read-only.

Commit 6a04d95

Browse files
committed
fix: ipns pubsub race condition
1 parent e683d81 commit 6a04d95

File tree

1 file changed

+37
-8
lines changed

1 file changed

+37
-8
lines changed

test/ipns-pubsub.js

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -123,47 +123,76 @@ describe('ipns-pubsub', () => {
123123
it('should publish the received record to a go node and a js subscriber should receive it', function (done) {
124124
this.timeout(300 * 1000)
125125

126-
subscribeToReceiveByPubsub(nodes[0], nodes[1], nodeAId.id, done)
126+
subscribeToReceiveByPubsub(nodes[0], nodes[1], nodeAId.id, nodeBId.id, done)
127127
})
128128

129129
it('should publish the received record to a js node and a go subscriber should receive it', function (done) {
130130
this.timeout(350 * 1000)
131131

132-
subscribeToReceiveByPubsub(nodes[1], nodes[0], nodeBId.id, done)
132+
subscribeToReceiveByPubsub(nodes[1], nodes[0], nodeBId.id, nodeAId.id, done)
133133
})
134134
})
135135

136-
const subscribeToReceiveByPubsub = (nodeA, nodeB, id, callback) => {
136+
// * IPNS resolve subscription test
137+
// * 1) name.resolve() , which subscribes the topic
138+
// * 2) wait to guarantee the subscription
139+
// * 3) subscribe again just to know until when to wait (inside the scope of the test)
140+
// * 4) wait for the other peer to get notified of the subscription
141+
// * 5) publish new ipns record
142+
// * 6) wait until the record is received in the test scope subscribe
143+
// * 7) resolve ipns record
144+
const subscribeToReceiveByPubsub = (nodeA, nodeB, idA, idB, callback) => {
137145
let subscribed = false
138146
function checkMessage (msg) {
139147
subscribed = true
140148
}
141149

142-
const keys = ipns.getIdKeys(fromB58String(id))
150+
const keys = ipns.getIdKeys(fromB58String(idA))
143151
const topic = `${namespace}${base64url.encode(keys.routingKey.toBuffer())}`
144152

145153
// try to resolve a unpublished record (will subscribe it)
146-
nodeB.api.name.resolve(id, (err) => {
154+
nodeB.api.name.resolve(idA, (err) => {
147155
expect(err).to.exist() // not found
148156

149157
series([
150158
(cb) => waitForPeerToSubscribe(nodeB.api, topic, cb),
151159
(cb) => nodeB.api.pubsub.subscribe(topic, checkMessage, cb),
160+
(cb) => waitForNotificationOfSubscription(nodeA.api, topic, idB, cb),
152161
(cb) => nodeA.api.name.publish(ipfsRef, { resolve: false }, cb),
153162
(cb) => waitFor(() => subscribed === true, (50 * 1000), cb),
154-
(cb) => nodeB.api.name.resolve(id, cb)
163+
(cb) => nodeB.api.name.resolve(idA, cb)
155164
], (err, res) => {
156165
expect(err).to.not.exist()
157166
expect(res).to.exist()
158167

159-
expect(res[2].name).to.equal(id) // Published to Node A ID
160-
expect(res[4]).to.equal(ipfsRef)
168+
expect(res[3].name).to.equal(idA) // Published to Node A ID
169+
expect(res[5]).to.equal(ipfsRef)
161170

162171
callback()
163172
})
164173
})
165174
}
166175

176+
// wait until a peer know about other peer to subscribe a topic
177+
const waitForNotificationOfSubscription = (daemon, topic, peerId, callback) => {
178+
retry({
179+
times: 5,
180+
interval: 2000
181+
}, (next) => {
182+
daemon.pubsub.peers(topic, (error, res) => {
183+
if (error) {
184+
return next(error)
185+
}
186+
187+
if (!res || !res.length || !res.includes(peerId)) {
188+
return next(new Error('Could not find peer subscribing'))
189+
}
190+
191+
return next(null)
192+
})
193+
}, callback)
194+
}
195+
167196
// Wait until a peer subscribes a topic
168197
const waitForPeerToSubscribe = (daemon, topic, callback) => {
169198
retry({

0 commit comments

Comments
 (0)