Skip to content

fix: avoid wasteful reprovides outside threshold #3238

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
5 changes: 5 additions & 0 deletions .github/dictionary.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,8 @@ additionals
SECG
Certicom
RSAES
reprovide
reprovider
reproviding
reprovides
reprovided
2 changes: 0 additions & 2 deletions packages/kad-dht/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ export const second = 1000
export const minute = 60 * second
export const hour = 60 * minute

export const MAX_RECORD_AGE = 36 * hour

export const PROTOCOL = '/ipfs/kad/1.0.0'

/**
Expand Down
40 changes: 31 additions & 9 deletions packages/kad-dht/src/reprovider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
setMaxListeners(Infinity, this.shutdownController.signal)

this.timeout = setTimeout(() => {
this.cleanUp({
this.processRecords({
signal: AbortSignal.timeout(REPROVIDE_TIMEOUT)
}).catch(err => {
this.log.error('error running reprovide/cleanup - %e', err)
this.log.error('error running process to reprovide/cleanup - %e', err)
})
}, this.interval)
}
Expand All @@ -118,10 +118,10 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
* Check all provider records. Delete them if they have expired, reprovide
* them if the provider is us and the expiry is within the reprovide window.
*/
private async cleanUp (options?: AbortOptions): Promise<void> {
private async processRecords (options?: AbortOptions): Promise<void> {
try {
this.safeDispatchEvent('reprovide:start')

this.log('Starting reprovide/cleanup')
// Get all provider entries from the datastore
for await (const entry of this.datastore.query({
prefix: this.datastorePrefix
Expand All @@ -133,17 +133,20 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
const expires = created + this.validity
const now = Date.now()
const expired = now > expires
const isSelf = this.peerId.equals(peerId)

this.log.trace('comparing: %d < %d = %s %s', created, now - this.validity, expired, expired ? '(expired)' : '')
this.log.trace('comparing: %d (now) < %d (expires) = %s %s', now, expires, expired, expired ? '(expired)' : '(valid)')

// delete the record if it has expired
if (expired) {
// delete the record if it has expired and isn't us
// so that if user node is down for a while, we still persist provide intent
if (expired && !isSelf) {
await this.datastore.delete(entry.key, options)
}

// if the provider is us and we are within the reprovide threshold,
// reprovide the record
if (this.peerId.equals(peerId) && (now - expires) < this.reprovideThreshold) {
if (this.shouldReprovide(isSelf, expires)) {
this.log('reproviding %c as it is within the reprovide threshold (%d)', cid, this.reprovideThreshold)
this.queueReprovide(cid)
.catch(err => {
this.log.error('could not reprovide %c - %e', cid, err)
Expand All @@ -159,8 +162,9 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
this.safeDispatchEvent('reprovide:end')

if (this.running) {
this.log('queuing next re-provide/cleanup run in %d ms', this.interval)
this.timeout = setTimeout(() => {
this.cleanUp({
this.processRecords({
signal: AbortSignal.timeout(REPROVIDE_TIMEOUT)
}).catch(err => {
this.log.error('error running re-provide - %e', err)
Expand All @@ -170,6 +174,24 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
}
}

/**
* Determines if a record should be reprovided
*/
private shouldReprovide (isSelf: boolean, expires: number): boolean {
if (!isSelf) {
return false
}
const now = Date.now()

if (expires < now) {
// If the record has already expired, reprovide irrespective of the threshold
return true
}

// if the record is approaching expiration within the reprovide threshold
return expires - now < this.reprovideThreshold
}

private async queueReprovide (cid: CID, options?: AbortOptions): Promise<void> {
if (!this.running) {
return
Expand Down
6 changes: 3 additions & 3 deletions packages/kad-dht/src/rpc/handlers/get-value.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { publicKeyToProtobuf } from '@libp2p/crypto/keys'
import { InvalidMessageError, NotFoundError } from '@libp2p/interface'
import { Libp2pRecord } from '@libp2p/record'
import {
MAX_RECORD_AGE
PROVIDERS_VALIDITY
} from '../../constants.js'
import { MessageType } from '../../message/dht.js'
import { bufferToRecordKey, isPublicKeyKey, fromPublicKeyKey } from '../../utils.js'
Expand Down Expand Up @@ -107,7 +107,7 @@ export class GetValueHandler implements DHTMessageHandler {
* Try to fetch a given record by from the local datastore.
* Returns the record if it is still valid, meaning
* - it was either authored by this node, or
* - it was received less than `MAX_RECORD_AGE` ago.
* - it was received less than `PROVIDERS_VALIDITY` ago.
*/
async _checkLocalDatastore (key: Uint8Array): Promise<Libp2pRecord | undefined> {
this.log('checkLocalDatastore looking for %b', key)
Expand All @@ -129,7 +129,7 @@ export class GetValueHandler implements DHTMessageHandler {

// Check validity: compare time received with max record age
if (record.timeReceived == null ||
Date.now() - record.timeReceived.getTime() > MAX_RECORD_AGE) {
Date.now() - record.timeReceived.getTime() > PROVIDERS_VALIDITY) {
// If record is bad delete it and return
await this.datastore.delete(dsKey)
return undefined
Expand Down
71 changes: 67 additions & 4 deletions packages/kad-dht/test/reprovider.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,9 @@ describe('reprovider', () => {
contentRouting,
threshold: 100,
validity: 200,
interval: 100,
interval: 200,
operationMetrics: {}
})

await start(reprovider)
})

afterEach(async () => {
Expand All @@ -74,6 +72,8 @@ describe('reprovider', () => {
it('should reprovide', async () => {
const cid = CID.parse('QmZ8eiDPqQqWR17EPxiwCDgrKPVhCHLcyn6xSCNpFAdAZb')

await start(reprovider)

await providers.addProvider(cid, components.peerId)

expect(contentRouting.provide).to.have.property('callCount', 0)
Expand All @@ -88,6 +88,8 @@ describe('reprovider', () => {
it('should cancel reprovide', async () => {
const cid = CID.parse('QmZ8eiDPqQqWR17EPxiwCDgrKPVhCHLcyn6xSCNpFAdAZb')

await start(reprovider)

await providers.addProvider(cid, components.peerId)

expect(contentRouting.provide).to.have.property('callCount', 0)
Expand All @@ -110,6 +112,9 @@ describe('reprovider', () => {

it('should remove expired provider records', async () => {
const cid = CID.parse('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n')

await start(reprovider)

await Promise.all([
providers.addProvider(cid, peers[0].peerId),
providers.addProvider(cid, peers[1].peerId)
Expand All @@ -121,9 +126,67 @@ describe('reprovider', () => {
expect(provs[0].toString()).to.be.equal(peers[0].peerId.toString())
expect(provs[1].toString()).to.be.deep.equal(peers[1].peerId.toString())

await delay(400)
await delay(450)

const provsAfter = await providers.getProviders(cid)
expect(provsAfter).to.have.length(0)
})

it('should delete expired records from other peers but preserve own expired records', async () => {
const cid = CID.parse('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n')

await start(reprovider)

// Add provider records - one from us, one from another peer
await providers.addProvider(cid, components.peerId)
await providers.addProvider(cid, peers[0].peerId)

const provsBefore = await providers.getProviders(cid)
expect(provsBefore).to.have.length(2)

// Wait for records to expire (validity is 200ms)
await delay(250)

// Trigger reprovide cycle to process expired records
await pEvent(reprovider, 'reprovide:start')
await pEvent(reprovider, 'reprovide:end')

const provsAfter = await providers.getProviders(cid)

// Only our own record should remain, other peer's expired record should be deleted
expect(provsAfter).to.have.length(1)
expect(provsAfter[0].toString()).to.equal(components.peerId.toString())
})

describe('shouldReprovide', () => {
it('should return false for non-self providers', () => {
const expires = Date.now() + 50
const result = (reprovider as any).shouldReprovide(false, expires)
expect(result).to.be.false()
})

it('should return true when within reprovide threshold before expiration', () => {
const expires = Date.now() + 50
const result = (reprovider as any).shouldReprovide(true, expires)
expect(result).to.be.true()
})

it('should return true when within reprovide threshold after expiration', () => {
const expires = Date.now() - 50
const result = (reprovider as any).shouldReprovide(true, expires)
expect(result).to.be.true()
})

it('should return false when outside reprovide threshold before expiration', () => {
const expires = Date.now() + 150
const result = (reprovider as any).shouldReprovide(true, expires)
expect(result).to.be.false()
})

it('should return true when outside reprovide threshold after expiration', () => {
const expires = Date.now() - 150
const result = (reprovider as any).shouldReprovide(true, expires)
expect(result).to.be.true()
})
})
})
63 changes: 63 additions & 0 deletions packages/kad-dht/test/rpc/handlers/get-value.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { MemoryDatastore } from 'datastore-core'
import { TypedEventEmitter } from 'main-event'
import Sinon from 'sinon'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { PROVIDERS_VALIDITY } from '../../../src/constants.js'
import { MessageType } from '../../../src/message/dht.js'
import { PeerRouting } from '../../../src/peer-routing/index.js'
import { GetValueHandler } from '../../../src/rpc/handlers/get-value.js'
Expand Down Expand Up @@ -186,4 +187,66 @@ describe('rpc - handlers - GetValue', () => {
expect(response.record).to.not.be.ok()
})
})

describe('record expiration', () => {
it('should return valid record within PROVIDERS_VALIDITY period', async () => {
const key = uint8ArrayFromString('hello')
const value = uint8ArrayFromString('world')
const record = new Libp2pRecord(key, value, new Date())

await datastore.put(utils.bufferToRecordKey('/dht/record', key), record.serialize())

const msg: Message = {
type: T,
key,
closer: [],
providers: []
}

peerRouting.getClosestPeersOffline.withArgs(msg.key).resolves([])

const response = await handler.handle(sourcePeer.peerId, msg)

expect(response).to.not.be.undefined()
expect(response.record).to.not.be.undefined()

if (response.record != null) {
const responseRecord = Libp2pRecord.deserialize(response.record)
expect(responseRecord.value).to.equalBytes(value)
}
})

it('should delete and return no record when expired beyond PROVIDERS_VALIDITY', async () => {
const key = uint8ArrayFromString('hello')
const value = uint8ArrayFromString('world')
// Create record with old timestamp (beyond PROVIDERS_VALIDITY)
const oldTimestamp = new Date(Date.now() - PROVIDERS_VALIDITY - 1000)
const record = new Libp2pRecord(key, value, oldTimestamp)

const dsKey = utils.bufferToRecordKey('/dht/record', key)
await datastore.put(dsKey, record.serialize())

// Verify record exists before the test
const existsBefore = await datastore.has(dsKey)
expect(existsBefore).to.be.true()

const msg: Message = {
type: T,
key,
closer: [],
providers: []
}

peerRouting.getClosestPeersOffline.withArgs(msg.key).resolves([])

const response = await handler.handle(sourcePeer.peerId, msg)

expect(response).to.not.be.undefined()
expect(response.record).to.be.undefined()

// Verify the expired record was deleted from datastore
const existsAfter = await datastore.has(dsKey)
expect(existsAfter).to.be.false()
})
})
})
Loading