Skip to content
This repository was archived by the owner on Dec 28, 2022. It is now read-only.

Commit ea913cd

Browse files
authored
fix reentry on contig update (#119)
* fix reentry on contig update * fix non-sparse truncations * move events further down for safety
1 parent e75b05e commit ea913cd

File tree

3 files changed

+79
-55
lines changed

3 files changed

+79
-55
lines changed

index.js

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -312,8 +312,7 @@ module.exports = class Hypercore extends EventEmitter {
312312
crypto: this.crypto,
313313
legacy: opts.legacy,
314314
auth: opts.auth,
315-
onupdate: this._oncoreupdate.bind(this),
316-
oncontigupdate: this._oncorecontigupdate.bind(this)
315+
onupdate: this._oncoreupdate.bind(this)
317316
})
318317

319318
if (opts.userData) {
@@ -496,30 +495,38 @@ module.exports = class Hypercore extends EventEmitter {
496495

497496
_oncoreupdate (status, bitfield, value, from) {
498497
if (status !== 0) {
499-
const truncated = (status & 0b10) !== 0
500-
const appended = (status & 0b01) !== 0
498+
const truncatedNonSparse = (status & 0b1000) !== 0
499+
const appendedNonSparse = (status & 0b0100) !== 0
500+
const truncated = (status & 0b0010) !== 0
501+
const appended = (status & 0b0001) !== 0
501502

502503
if (truncated) {
503504
this.replicator.ontruncate(bitfield.start)
504505
}
505506

507+
if ((status & 0b0011) !== 0) {
508+
this.replicator.onupgrade()
509+
}
510+
506511
for (let i = 0; i < this.sessions.length; i++) {
507512
const s = this.sessions[i]
508513

509514
if (truncated) {
510515
if (s.cache) s.cache.clear()
511-
s.emit('truncate', bitfield.start, this.core.tree.fork)
516+
512517
// If snapshotted, make sure to update our compat so we can fail gets
513518
if (s._snapshot && bitfield.start < s._snapshot.compatLength) s._snapshot.compatLength = bitfield.start
514519
}
515520

516-
// For sparse sessions, immediately emit appends. Non-sparse sessions
517-
// are handled separately and only emit appends when their contiguous
518-
// length is updated.
519-
if (appended && s.sparse) s.emit('append')
520-
}
521+
if (s.sparse ? truncated : truncatedNonSparse) {
522+
s.emit('truncate', bitfield.start, this.core.tree.fork)
523+
}
521524

522-
this.replicator.onupgrade()
525+
// For sparse sessions, immediately emit appends. If non-sparse, emit if contig length has updated
526+
if (s.sparse ? appended : appendedNonSparse) {
527+
s.emit('append')
528+
}
529+
}
523530
}
524531

525532
if (bitfield) {
@@ -535,16 +542,6 @@ module.exports = class Hypercore extends EventEmitter {
535542
}
536543
}
537544

538-
_oncorecontigupdate () {
539-
// For non-sparse sessions, emit appends only when the contiguous length is
540-
// updated.
541-
for (let i = 0; i < this.sessions.length; i++) {
542-
const s = this.sessions[i]
543-
544-
if (!s.sparse) s.emit('append')
545-
}
546-
}
547-
548545
_onpeerupdate (added, peer) {
549546
const name = added ? 'peer-add' : 'peer-remove'
550547

lib/core.js

Lines changed: 54 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,8 @@ const { BAD_ARGUMENT, STORAGE_EMPTY, STORAGE_CONFLICT, INVALID_SIGNATURE } = req
99
const m = require('./messages')
1010

1111
module.exports = class Core {
12-
constructor (header, crypto, oplog, tree, blocks, bitfield, auth, legacy, onupdate, oncontigupdate) {
12+
constructor (header, crypto, oplog, tree, blocks, bitfield, auth, legacy, onupdate) {
1313
this.onupdate = onupdate
14-
this.oncontigupdate = oncontigupdate
1514
this.header = header
1615
this.crypto = crypto
1716
this.oplog = oplog
@@ -27,8 +26,6 @@ module.exports = class Core {
2726
this._verifiesFlushed = null
2827
this._mutex = new Mutex()
2928
this._legacy = legacy
30-
31-
this._updateContiguousLength(header.contiguousLength)
3229
}
3330

3431
static async open (storage, opts = {}) {
@@ -134,6 +131,11 @@ module.exports = class Core {
134131
await bitfield.clear()
135132
}
136133

134+
// compat from earlier version that do not store contig length
135+
if (header.contiguousLength === 0) {
136+
while (bitfield.get(header.contiguousLength)) header.contiguousLength++
137+
}
138+
137139
const auth = opts.auth || this.createAuth(crypto, header.signer)
138140

139141
for (const e of entries) {
@@ -149,6 +151,7 @@ module.exports = class Core {
149151

150152
if (e.bitfield) {
151153
bitfield.setRange(e.bitfield.start, e.bitfield.length, !e.bitfield.drop)
154+
updateContig(header, e.bitfield, bitfield)
152155
}
153156

154157
if (e.treeUpgrade) {
@@ -165,7 +168,7 @@ module.exports = class Core {
165168
}
166169
}
167170

168-
return new this(header, crypto, oplog, tree, blocks, bitfield, auth, !!opts.legacy, opts.onupdate || noop, opts.oncontigupdate || noop)
171+
return new this(header, crypto, oplog, tree, blocks, bitfield, auth, !!opts.legacy, opts.onupdate || noop)
169172
}
170173

171174
_shouldFlush () {
@@ -196,17 +199,6 @@ module.exports = class Core {
196199
await this.blocks.put(index, value, byteOffset)
197200
}
198201

199-
_updateContiguousLength (index, length = 0) {
200-
if (index === this.header.contiguousLength) {
201-
let i = this.header.contiguousLength + length
202-
203-
while (this.bitfield.get(i)) i++
204-
205-
this.header.contiguousLength = i
206-
this.oncontigupdate()
207-
}
208-
}
209-
210202
async userData (key, value) {
211203
// TODO: each oplog append can set user data, so we should have a way
212204
// to just hitch a ride on one of the other ongoing appends?
@@ -286,12 +278,12 @@ module.exports = class Core {
286278
this.bitfield.setRange(batch.ancestors, batch.length - batch.ancestors, true)
287279
batch.commit()
288280

289-
this.header.contiguousLength = batch.length
290-
this.oncontigupdate()
291281
this.header.tree.length = batch.length
292282
this.header.tree.rootHash = hash
293283
this.header.tree.signature = batch.signature
294-
this.onupdate(0b01, entry.bitfield, null, null)
284+
285+
const status = 0b0001 | updateContig(this.header, entry.bitfield, this.bitfield)
286+
this.onupdate(status, entry.bitfield, null, null)
295287

296288
if (this._shouldFlush()) await this._flushOplog()
297289

@@ -329,9 +321,11 @@ module.exports = class Core {
329321

330322
await this.oplog.append([entry], false)
331323

324+
let status = 0b0001
325+
332326
if (bitfield) {
333327
this.bitfield.set(bitfield.start, true)
334-
this._updateContiguousLength(bitfield.start, bitfield.length)
328+
status |= updateContig(this.header, bitfield, this.bitfield)
335329
}
336330

337331
batch.commit()
@@ -340,7 +334,8 @@ module.exports = class Core {
340334
this.header.tree.length = batch.length
341335
this.header.tree.rootHash = batch.rootHash
342336
this.header.tree.signature = batch.signature
343-
this.onupdate(0b01, bitfield, value, from)
337+
338+
this.onupdate(status, bitfield, value, from)
344339

345340
if (this._shouldFlush()) await this._flushOplog()
346341
} finally {
@@ -387,14 +382,16 @@ module.exports = class Core {
387382
continue
388383
}
389384

385+
let status = 0
386+
390387
if (bitfield) {
391388
this.bitfield.set(bitfield.start, true)
392-
this._updateContiguousLength(bitfield.start, bitfield.length)
389+
status = updateContig(this.header, bitfield, this.bitfield)
393390
}
394391

395392
batch.commit()
396393

397-
this.onupdate(0, bitfield, value, from)
394+
this.onupdate(status, bitfield, value, from)
398395
}
399396

400397
if (this._shouldFlush()) await this._flushOplog()
@@ -472,15 +469,15 @@ module.exports = class Core {
472469
addReorgHint(this.header.hints.reorgs, this.tree, batch)
473470
batch.commit()
474471

475-
const appended = batch.length > batch.ancestors
472+
const contigStatus = updateContig(this.header, entry.bitfield, this.bitfield)
473+
const status = ((batch.length > batch.ancestors) ? 0b0011 : 0b0010) | contigStatus
476474

477-
this.header.contiguousLength = Math.min(batch.ancestors, this.header.contiguousLength)
478-
this.oncontigupdate()
479475
this.header.tree.fork = batch.fork
480476
this.header.tree.length = batch.length
481477
this.header.tree.rootHash = batch.hash()
482478
this.header.tree.signature = batch.signature
483-
this.onupdate(appended ? 0b11 : 0b10, entry.bitfield, null, from)
479+
480+
this.onupdate(status, entry.bitfield, null, from)
484481

485482
// TODO: there is a bug in the merkle tree atm where it cannot handle unflushed
486483
// truncates if we append or download anything after the truncation point later on
@@ -501,6 +498,36 @@ module.exports = class Core {
501498
}
502499
}
503500

501+
function updateContig (header, upd, bitfield) {
502+
const end = upd.start + upd.length
503+
504+
let c = header.contiguousLength
505+
506+
if (upd.drop) {
507+
// If we dropped a block in the current contig range, "downgrade" it
508+
if (c <= end && c > upd.start) {
509+
c = upd.start
510+
}
511+
} else {
512+
if (c <= end && c >= upd.start) {
513+
c = end
514+
while (bitfield.get(c)) c++
515+
}
516+
}
517+
518+
if (c === header.contiguousLength) {
519+
return 0b0000
520+
}
521+
522+
if (c > header.contiguousLength) {
523+
header.contiguousLength = c
524+
return 0b0100
525+
}
526+
527+
header.contiguousLength = c
528+
return 0b1000
529+
}
530+
504531
function addReorgHint (list, tree, batch) {
505532
if (tree.length === 0 || tree.fork === batch.fork) return
506533

test/core.js

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ test('core - update hook is triggered', async function (t) {
178178
let ran = 0
179179

180180
core.onupdate = (status, bitfield, value, from) => {
181-
t.is(status, 0b01, 'was appended')
181+
t.ok(status & 0b01, 'was appended')
182182
t.is(from, null, 'was local')
183183
t.alike(bitfield, { drop: false, start: 0, length: 4 })
184184
ran |= 1
@@ -189,7 +189,7 @@ test('core - update hook is triggered', async function (t) {
189189
const peer = {}
190190

191191
clone.onupdate = (status, bitfield, value, from) => {
192-
t.is(status, 0b01, 'was appended')
192+
t.ok(status & 0b01, 'was appended')
193193
t.is(from, peer, 'was remote')
194194
t.alike(bitfield, { drop: false, start: 1, length: 1 })
195195
t.alike(value, Buffer.from('b'))
@@ -217,7 +217,7 @@ test('core - update hook is triggered', async function (t) {
217217
}
218218

219219
core.onupdate = (status, bitfield, value, from) => {
220-
t.is(status, 0b10, 'was truncated')
220+
t.ok(status & 0b10, 'was truncated')
221221
t.is(from, null, 'was local')
222222
t.alike(bitfield, { drop: true, start: 1, length: 3 })
223223
ran |= 8
@@ -226,7 +226,7 @@ test('core - update hook is triggered', async function (t) {
226226
await core.truncate(1, 1)
227227

228228
core.onupdate = (status, bitfield, value, from) => {
229-
t.is(status, 0b01, 'was appended')
229+
t.ok(status & 0b01, 'was appended')
230230
t.is(from, null, 'was local')
231231
t.alike(bitfield, { drop: false, start: 1, length: 1 })
232232
ran |= 16
@@ -235,7 +235,7 @@ test('core - update hook is triggered', async function (t) {
235235
await core.append([Buffer.from('e')])
236236

237237
clone.onupdate = (status, bitfield, value, from) => {
238-
t.is(status, 0b11, 'was appended and truncated')
238+
t.ok(status & 0b11, 'was appended and truncated')
239239
t.is(from, peer, 'was remote')
240240
t.alike(bitfield, { drop: true, start: 1, length: 3 })
241241
ran |= 32
@@ -248,7 +248,7 @@ test('core - update hook is triggered', async function (t) {
248248
}
249249

250250
core.onupdate = (status, bitfield, value, from) => {
251-
t.is(status, 0b10, 'was truncated')
251+
t.ok(status & 0b10, 'was truncated')
252252
t.is(from, null, 'was local')
253253
t.alike(bitfield, { drop: true, start: 1, length: 1 })
254254
ran |= 64
@@ -257,7 +257,7 @@ test('core - update hook is triggered', async function (t) {
257257
await core.truncate(1, 2)
258258

259259
clone.onupdate = (status, bitfield, value, from) => {
260-
t.is(status, 0b10, 'was truncated')
260+
t.ok(status & 0b10, 'was truncated')
261261
t.is(from, peer, 'was remote')
262262
t.alike(bitfield, { drop: true, start: 1, length: 1 })
263263
ran |= 128

0 commit comments

Comments
 (0)