Skip to content

Commit 6f62f70

Browse files
mafintoshandrewosh
authored andcommitted
add unconfigure
1 parent f756581 commit 6f62f70

File tree

3 files changed

+48
-27
lines changed

3 files changed

+48
-27
lines changed

index.js

Lines changed: 39 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,12 @@
11
const crypto = require('crypto')
2-
const { EventEmitter } = require('events')
32
const { promisify } = require('util')
43

54
const { NanoresourcePromise: Nanoresource } = require('nanoresource-promise/emitter')
65
const HypercoreProtocol = require('hypercore-protocol')
76
const hyperswarm = require('hyperswarm')
87
const codecs = require('codecs')
98
const pump = require('pump')
10-
const eos = require('end-of-stream')
119

12-
const log = require('debug')('corestore:network')
13-
14-
const OUTER_STREAM = Symbol('networker-outer-stream')
1510
const STREAM_PEER = Symbol('networker-stream-peer')
1611

1712
class CorestoreNetworker extends Nanoresource {
@@ -56,7 +51,7 @@ class CorestoreNetworker extends Nanoresource {
5651
}
5752

5853
async _join (discoveryKey, opts = {}) {
59-
const keyString = (typeof discoveryKey === 'string') ? discoveryKey : discoveryKey.toString('hex')
54+
const keyString = toString(discoveryKey)
6055
const keyBuf = (discoveryKey instanceof Buffer) ? discoveryKey : Buffer.from(discoveryKey, 'hex')
6156

6257
this._joined.add(keyString)
@@ -76,7 +71,7 @@ class CorestoreNetworker extends Nanoresource {
7671
this.emit('flushed', keyBuf)
7772
} else {
7873
// Wait until the stream processing has caught up.
79-
const processedListener = () => {
74+
const processedListener = () => {
8075
if (!this._joined.has(keyString)) {
8176
this.removeListener('stream-processed', processedListener)
8277
return
@@ -93,7 +88,7 @@ class CorestoreNetworker extends Nanoresource {
9388
}
9489

9590
async _leave (discoveryKey) {
96-
const keyString = (typeof discoveryKey === 'string') ? discoveryKey : discoveryKey.toString('hex')
91+
const keyString = toString(discoveryKey)
9792
const keyBuf = (discoveryKey instanceof Buffer) ? discoveryKey : Buffer.from(discoveryKey, 'hex')
9893

9994
this._joined.delete(keyString)
@@ -105,7 +100,7 @@ class CorestoreNetworker extends Nanoresource {
105100
})
106101
})
107102

108-
for (let stream of this.streams) {
103+
for (const stream of this.streams) {
109104
stream.close(keyBuf)
110105
}
111106
}
@@ -159,8 +154,6 @@ class CorestoreNetworker extends Nanoresource {
159154
this.swarm.on('connection', (socket, info) => {
160155
const isInitiator = !!info.client
161156
if (socket.remoteAddress === '::ffff:127.0.0.1' || socket.remoteAddress === '127.0.0.1') return null
162-
const peerInfo = info.peer
163-
const discoveryKey = peerInfo && peerInfo.topic
164157

165158
var finishedHandshake = false
166159
var processed = false
@@ -231,22 +224,48 @@ class CorestoreNetworker extends Nanoresource {
231224
return this._configurations
232225
}
233226

234-
async configure (discoveryKey, opts = {}) {
227+
configure (discoveryKey, opts = {}) {
228+
if (!this.swarm) this.open() // it is sync, which makes this easier below inregards to race conditions
229+
if (this.swarm && this.swarm.destroyed) return Promise.resolve()
230+
231+
const id = Symbol('id')
232+
const prom = this._configure(discoveryKey, opts, id)
233+
const keyString = toString(discoveryKey)
234+
const prev = this._configurations.get(keyString) || { lookup: false, announce: false, id: null }
235+
236+
prom.configureId = id
237+
prom.discoveryKey = discoveryKey
238+
prom.previous = prev
239+
240+
return prom
241+
}
242+
243+
async unconfigure (prom) {
235244
if (this.swarm && this.swarm.destroyed) return null
236245
if (!this.swarm) {
237246
await this.listen()
238-
return this.configure(discoveryKey, opts)
247+
return this.unconfigure(prom)
239248
}
240-
const self = this
241249

250+
const discoveryKey = prom.discoveryKey
251+
const keyString = toString(discoveryKey)
252+
const conf = this._configurations.get(keyString)
253+
254+
if (!conf || conf.id !== prom.configureId) return
255+
return this._configure(discoveryKey, prom.previous, prom.previous.id)
256+
}
257+
258+
async _configure (discoveryKey, opts = {}, id) {
242259
const config = {
243260
announce: opts.announce !== false,
244261
lookup: opts.lookup !== false
245262
}
246-
opts = { ...opts, ...config }
263+
opts = { ...opts, ...config, id }
247264

248-
const keyString = (typeof discoveryKey === 'string') ? discoveryKey : discoveryKey.toString('hex')
249-
this._configurations.set(keyString, opts)
265+
const keyString = toString(discoveryKey)
266+
267+
if (id) this._configurations.set(keyString, opts)
268+
else this._configurations.delete(keyString)
250269

251270
const joining = config.announce || config.lookup
252271
if (joining) {
@@ -275,7 +294,6 @@ class CorestoreNetworker extends Nanoresource {
275294
}
276295
return ext
277296
}
278-
279297
}
280298

281299
module.exports = CorestoreNetworker
@@ -342,4 +360,6 @@ function intoPeer (stream) {
342360
}
343361
}
344362

345-
function noop () {}
363+
function toString (dk) {
364+
return typeof dk === 'string' ? dk : dk.toString('hex')
365+
}

package.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,16 @@
44
"description": "A corestore networking module based on Hyperswarm.",
55
"main": "index.js",
66
"dependencies": {
7-
"debug": "^4.1.1",
8-
"end-of-stream": "^1.4.4",
7+
"codecs": "^2.1.0",
98
"hypercore-protocol": "^8.0.0",
109
"hyperswarm": "^2.14.1",
1110
"nanoresource-promise": "^1.2.2",
1211
"pump": "^3.0.0"
1312
},
1413
"devDependencies": {
14+
"@hyperswarm/dht": "^4.0.1",
1515
"corestore": "^5.0.0",
16+
"hypercore-crypto": "^2.1.1",
1617
"random-access-memory": "^3.1.1",
1718
"standard": "^12.0.1",
1819
"tape": "^4.9.0"

test/all.js

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ test('replicate sub-cores', async t => {
8787
await networker2.configure(core3.discoveryKey)
8888

8989
const core2 = store1.get({ parents: [core1.key] })
90-
const core4 = store2.get({ key: core2.key, parents: [core3.key]})
90+
const core4 = store2.get({ key: core2.key, parents: [core3.key] })
9191

9292
await append(core1, 'hello')
9393
await append(core2, 'world')
@@ -257,7 +257,7 @@ test('can register stream-wide extensions', async t => {
257257
const extension = {
258258
name: 'test-extension',
259259
encoding: 'utf8',
260-
onmessage,
260+
onmessage
261261
}
262262
networker1.registerExtension(extension)
263263
const n2Ext = networker2.registerExtension(extension)
@@ -308,7 +308,7 @@ test('can register extensions with the same name', async t => {
308308
const extensionTwo = {
309309
name: 'test-extension',
310310
encoding: 'utf8',
311-
onmessage,
311+
onmessage
312312
}
313313
networker1.registerExtension(extensionOne)
314314
networker1.registerExtension(extensionTwo)
@@ -406,9 +406,9 @@ async function create (opts = {}) {
406406
return bootstrap.once('listening', resolve)
407407
})
408408
}
409-
const store = new Corestore(ram)
409+
const store = new Corestore(ram)
410410
await store.ready()
411-
const networker = new CorestoreNetworker(store, { ...opts, bootstrap: `localhost:${BOOTSTRAP_PORT}` })
411+
const networker = new CorestoreNetworker(store, { ...opts, bootstrap: `localhost:${BOOTSTRAP_PORT}` })
412412
return { store, networker }
413413
}
414414

@@ -431,7 +431,7 @@ function get (core, idx, opts = {}) {
431431
}
432432

433433
async function cleanup (networkers) {
434-
for (let networker of networkers) {
434+
for (const networker of networkers) {
435435
await networker.close()
436436
}
437437
if (bootstrap) {

0 commit comments

Comments
 (0)