Skip to content
This repository was archived by the owner on Dec 28, 2022. It is now read-only.

Commit ffe3907

Browse files
Add ifAvailable flag to core.download() (#110)
* Add `ifAvailable` flag to `core.download()` * Add initial tests * Fix assertions * Check `ifAvailable` ranges in `updatePeer()` and `updateAll()` * if async work is happening, defer ifavailable decision * test a -> b -> c replication with ifavail * more tests * fix no solo Co-authored-by: Mathias Buus <[email protected]>
1 parent 70b2716 commit ffe3907

File tree

2 files changed

+139
-12
lines changed

2 files changed

+139
-12
lines changed

lib/replicator.js

Lines changed: 51 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,13 @@ class BlockRequest extends Attachable {
102102
}
103103

104104
class RangeRequest extends Attachable {
105-
constructor (ranges, start, end, linear, blocks) {
105+
constructor (ranges, start, end, linear, ifAvailable, blocks) {
106106
super()
107107

108108
this.start = start
109109
this.end = end
110110
this.linear = linear
111+
this.ifAvailable = ifAvailable
111112
this.blocks = blocks
112113
this.ranges = ranges
113114

@@ -172,6 +173,10 @@ class InflightTracker {
172173
this._free = []
173174
}
174175

176+
get idle () {
177+
return this._requests.length === this._free.length
178+
}
179+
175180
* [Symbol.iterator] () {
176181
for (const req of this._requests) {
177182
if (req !== null) yield req
@@ -260,6 +265,7 @@ class Peer {
260265

261266
this.inflight = 0
262267
this.maxInflight = DEFAULT_MAX_INFLIGHT
268+
this.dataProcessing = 0
263269

264270
this.canUpgrade = true
265271

@@ -515,6 +521,8 @@ class Peer {
515521

516522
if (reorg === true) return this.replicator._onreorgdata(this, req, data)
517523

524+
this.dataProcessing++
525+
518526
try {
519527
if (!matchingRequest(req, data) || !(await this.core.verify(data, this))) {
520528
this.replicator._onnodata(this, req)
@@ -523,6 +531,8 @@ class Peer {
523531
} catch (err) {
524532
this.replicator._onnodata(this, req)
525533
throw err
534+
} finally {
535+
this.dataProcessing--
526536
}
527537

528538
this.replicator._ondata(this, req, data)
@@ -909,13 +919,21 @@ module.exports = class Replicator {
909919
return ref
910920
}
911921

912-
addRange (session, { start = 0, end = -1, length = toLength(start, end), blocks = null, linear = false } = {}) {
922+
addRange (session, { start = 0, end = -1, length = toLength(start, end), blocks = null, linear = false, ifAvailable = false } = {}) {
913923
if (blocks !== null) { // if using blocks, start, end just acts as frames around the blocks array
914924
start = 0
915925
end = length = blocks.length
916926
}
917927

918-
const r = new RangeRequest(this._ranges, start, length === -1 ? -1 : start + length, linear, blocks)
928+
const r = new RangeRequest(
929+
this._ranges,
930+
start,
931+
length === -1 ? -1 : start + length,
932+
linear,
933+
ifAvailable,
934+
blocks
935+
)
936+
919937
const ref = r.attach(session)
920938

921939
this._ranges.push(r)
@@ -1115,6 +1133,14 @@ module.exports = class Replicator {
11151133
return true
11161134
}
11171135

1136+
_resolveRangeRequest (req, index) {
1137+
const head = this._ranges.pop()
1138+
1139+
if (index < this._ranges.length) this._ranges[index] = head
1140+
1141+
req.resolve(true)
1142+
}
1143+
11181144
_clearInflightBlock (tracker, req) {
11191145
const isBlock = tracker === this._blocks
11201146
const index = isBlock === true ? req.block.index : req.hash.index / 2
@@ -1168,14 +1194,9 @@ module.exports = class Replicator {
11681194
while (r.start < r.end && this.core.bitfield.get(mapIndex(r.blocks, r.start)) === true) r.start++
11691195
while (r.start < r.end && this.core.bitfield.get(mapIndex(r.blocks, r.end - 1)) === true) r.end--
11701196

1171-
if (r.end === -1 || r.start < r.end) continue
1172-
1173-
if (i < this._ranges.length - 1) this._ranges[i] = this._ranges.pop()
1174-
else this._ranges.pop()
1175-
1176-
i--
1177-
1178-
r.resolve(true)
1197+
if (r.end !== -1 && r.start >= r.end) {
1198+
this._resolveRangeRequest(r, i--)
1199+
}
11791200
}
11801201

11811202
for (let i = 0; i < this._seeks.length; i++) {
@@ -1201,7 +1222,7 @@ module.exports = class Replicator {
12011222
else s.resolve(res)
12021223
}
12031224

1204-
this.updateAll()
1225+
if (this._inflight.idle) this.updateAll()
12051226

12061227
// No additional updates scheduled - return
12071228
if (--this._updatesPending === 0) return
@@ -1210,6 +1231,22 @@ module.exports = class Replicator {
12101231
}
12111232
}
12121233

1234+
_maybeResolveIfAvailableRanges () {
1235+
if (this._ifAvailable > 0 || !this._inflight.idle || !this._ranges.length) return
1236+
1237+
for (let i = 0; i < this.peers.length; i++) {
1238+
if (this.peers[i].dataProcessing > 0) return
1239+
}
1240+
1241+
for (let i = 0; i < this._ranges.length; i++) {
1242+
const r = this._ranges[i]
1243+
1244+
if (r.ifAvailable) {
1245+
this._resolveRangeRequest(r, i--)
1246+
}
1247+
}
1248+
}
1249+
12131250
_clearRequest (peer, req) {
12141251
if (req.block !== null) {
12151252
this._clearInflightBlock(this._blocks, req)
@@ -1407,6 +1444,7 @@ module.exports = class Replicator {
14071444
while (this._updatePeerNonPrimary(peer) === true);
14081445

14091446
this._checkUpgradeIfAvailable()
1447+
this._maybeResolveIfAvailableRanges()
14101448
}
14111449

14121450
updateAll () {
@@ -1434,6 +1472,7 @@ module.exports = class Replicator {
14341472
}
14351473

14361474
this._checkUpgradeIfAvailable()
1475+
this._maybeResolveIfAvailableRanges()
14371476
}
14381477

14391478
attachTo (protomux) {

test/replicate.js

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -638,3 +638,91 @@ test('non-sparse replication', async function (t) {
638638

639639
t.is(contiguousLength, b.length)
640640
})
641+
642+
test('download blocks if available', async function (t) {
643+
const a = await create()
644+
const b = await create(a.key)
645+
646+
replicate(a, b, t)
647+
648+
await a.append(['a', 'b', 'c', 'd', 'e'])
649+
650+
let d = 0
651+
b.on('download', () => d++)
652+
653+
const r = b.download({ blocks: [1, 3, 6], ifAvailable: true })
654+
await r.downloaded()
655+
656+
t.is(d, 2)
657+
})
658+
659+
test('download range if available', async function (t) {
660+
const a = await create()
661+
const b = await create(a.key)
662+
663+
replicate(a, b, t)
664+
665+
await a.append(['a', 'b', 'c', 'd', 'e'])
666+
667+
let d = 0
668+
b.on('download', () => d++)
669+
670+
const r = b.download({ start: 2, end: 6, ifAvailable: true })
671+
await r.downloaded()
672+
673+
t.is(d, 3)
674+
})
675+
676+
test('download blocks if available, destroy midway', async function (t) {
677+
const a = await create()
678+
const b = await create(a.key)
679+
680+
const s = replicate(a, b, t)
681+
682+
await a.append(['a', 'b', 'c', 'd', 'e'])
683+
684+
let d = 0
685+
b.on('download', () => {
686+
if (d++ === 0) unreplicate(s)
687+
})
688+
689+
const r = b.download({ blocks: [1, 3, 6], ifAvailable: true })
690+
await r.downloaded()
691+
692+
t.pass('range resolved')
693+
})
694+
695+
test('download blocks available from when only a partial set is available', async function (t) {
696+
const a = await create()
697+
const b = await create(a.key)
698+
const c = await create(a.key)
699+
700+
replicate(a, b, t)
701+
replicate(b, c, t)
702+
703+
await a.append(['a', 'b', 'c', 'd', 'e'])
704+
705+
await b.get(2)
706+
await b.get(3)
707+
708+
const r = c.download({ start: 0, end: -1, ifAvailable: true })
709+
await r.downloaded()
710+
711+
t.ok(!(await c.has(0)))
712+
t.ok(!(await c.has(1)))
713+
t.ok(await c.has(2))
714+
t.ok(await c.has(3))
715+
t.ok(!(await c.has(4)))
716+
})
717+
718+
test('download range resolves immediately if no peers', async function (t) {
719+
const a = await create()
720+
const b = await create(a.key)
721+
722+
// no replication
723+
724+
const r = b.download({ start: 0, end: 5, ifAvailable: true })
725+
await r.downloaded()
726+
727+
t.pass('range resolved')
728+
})

0 commit comments

Comments
 (0)