Skip to content

Commit a5d538f

Browse files
committed
Move flush logic to _flush
1 parent a68a6f3 commit a5d538f

File tree

1 file changed

+32
-32
lines changed

1 file changed

+32
-32
lines changed

index.js

Lines changed: 32 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,37 @@ class CorestoreNetworker extends Nanoresource {
4646
})
4747
}
4848

49+
async _flush (keyString, keyBuf) {
50+
await new Promise((resolve, reject) => {
51+
this.swarm.flush(err => {
52+
if (err) reject(err)
53+
else resolve()
54+
})
55+
})
56+
if (!this._joined.has(keyString)) {
57+
return
58+
}
59+
const processingAfterFlush = this._streamsProcessing
60+
if (this._streamsProcessed >= processingAfterFlush) {
61+
this._flushed.add(keyString)
62+
this.emit('flushed', keyBuf)
63+
} else {
64+
// Wait until the stream processing has caught up.
65+
const processedListener = () => {
66+
if (!this._joined.has(keyString)) {
67+
this.removeListener('stream-processed', processedListener)
68+
return
69+
}
70+
if (this._streamsProcessed >= processingAfterFlush) {
71+
this._flushed.add(keyString)
72+
this.emit('flushed', keyBuf)
73+
this.removeListener('stream-processed', processedListener)
74+
}
75+
}
76+
this.on('stream-processed', processedListener)
77+
}
78+
}
79+
4980
async _join (discoveryKey, opts = {}) {
5081
const keyString = toString(discoveryKey)
5182
const keyBuf = (discoveryKey instanceof Buffer) ? discoveryKey : Buffer.from(discoveryKey, 'hex')
@@ -57,40 +88,9 @@ class CorestoreNetworker extends Nanoresource {
5788
lookup: opts.lookup
5889
})
5990

60-
const flushedProm = flush.bind(this)()
91+
const flushedProm = this._flush(keyString, keyBuf)
6192
if (opts.flush !== false) await flushedProm
6293
else flushedProm.catch(() => {})
63-
64-
async function flush () {
65-
await new Promise((resolve, reject) => {
66-
this.swarm.flush(err => {
67-
if (err) reject(err)
68-
else resolve()
69-
})
70-
})
71-
if (!this._joined.has(keyString)) {
72-
return
73-
}
74-
const processingAfterFlush = this._streamsProcessing
75-
if (this._streamsProcessed >= processingAfterFlush) {
76-
this._flushed.add(keyString)
77-
this.emit('flushed', keyBuf)
78-
} else {
79-
// Wait until the stream processing has caught up.
80-
const processedListener = () => {
81-
if (!this._joined.has(keyString)) {
82-
this.removeListener('stream-processed', processedListener)
83-
return
84-
}
85-
if (this._streamsProcessed >= processingAfterFlush) {
86-
this._flushed.add(keyString)
87-
this.emit('flushed', keyBuf)
88-
this.removeListener('stream-processed', processedListener)
89-
}
90-
}
91-
this.on('stream-processed', processedListener)
92-
}
93-
}
9494
}
9595

9696
async _leave (discoveryKey) {

0 commit comments

Comments
 (0)