diff --git a/.github/dictionary.txt b/.github/dictionary.txt index 71e5ed28d0..9ee454ee8c 100644 --- a/.github/dictionary.txt +++ b/.github/dictionary.txt @@ -14,3 +14,8 @@ additionals SECG Certicom RSAES +reprovide +reprovider +reproviding +reprovides +reprovided diff --git a/packages/kad-dht/src/constants.ts b/packages/kad-dht/src/constants.ts index 520772c189..652af37f28 100644 --- a/packages/kad-dht/src/constants.ts +++ b/packages/kad-dht/src/constants.ts @@ -4,8 +4,6 @@ export const second = 1000 export const minute = 60 * second export const hour = 60 * minute -export const MAX_RECORD_AGE = 36 * hour - export const PROTOCOL = '/ipfs/kad/1.0.0' /** diff --git a/packages/kad-dht/src/reprovider.ts b/packages/kad-dht/src/reprovider.ts index 4420da5997..3cc3328abf 100644 --- a/packages/kad-dht/src/reprovider.ts +++ b/packages/kad-dht/src/reprovider.ts @@ -99,10 +99,10 @@ export class Reprovider extends TypedEventEmitter { setMaxListeners(Infinity, this.shutdownController.signal) this.timeout = setTimeout(() => { - this.cleanUp({ + this.processRecords({ signal: AbortSignal.timeout(REPROVIDE_TIMEOUT) }).catch(err => { - this.log.error('error running reprovide/cleanup - %e', err) + this.log.error('error running process to reprovide/cleanup - %e', err) }) }, this.interval) } @@ -118,10 +118,10 @@ export class Reprovider extends TypedEventEmitter { * Check all provider records. Delete them if they have expired, reprovide * them if the provider is us and the expiry is within the reprovide window. */ - private async cleanUp (options?: AbortOptions): Promise { + private async processRecords (options?: AbortOptions): Promise { try { this.safeDispatchEvent('reprovide:start') - + this.log('starting reprovide/cleanup') // Get all provider entries from the datastore for await (const entry of this.datastore.query({ prefix: this.datastorePrefix @@ -133,17 +133,20 @@ export class Reprovider extends TypedEventEmitter { const expires = created + this.validity const now = Date.now() const expired = now > expires + const isSelf = this.peerId.equals(peerId) - this.log.trace('comparing: %d < %d = %s %s', created, now - this.validity, expired, expired ? '(expired)' : '') + this.log.trace('comparing: %d (now) < %d (expires) = %s %s', now, expires, expired, expired ? '(expired)' : '(valid)') - // delete the record if it has expired - if (expired) { + // delete the record if it has expired and isn't us + // so that if user node is down for a while, we still persist provide intent + if (expired && !isSelf) { await this.datastore.delete(entry.key, options) } // if the provider is us and we are within the reprovide threshold, // reprovide the record - if (this.peerId.equals(peerId) && (now - expires) < this.reprovideThreshold) { + if (this.shouldReprovide(isSelf, expires)) { + this.log('reproviding %c as it is within the reprovide threshold (%d)', cid, this.reprovideThreshold) this.queueReprovide(cid) .catch(err => { this.log.error('could not reprovide %c - %e', cid, err) @@ -159,8 +162,9 @@ export class Reprovider extends TypedEventEmitter { this.safeDispatchEvent('reprovide:end') if (this.running) { + this.log('queuing next re-provide/cleanup run in %d ms', this.interval) this.timeout = setTimeout(() => { - this.cleanUp({ + this.processRecords({ signal: AbortSignal.timeout(REPROVIDE_TIMEOUT) }).catch(err => { this.log.error('error running re-provide - %e', err) @@ -170,6 +174,24 @@ export class Reprovider extends TypedEventEmitter { } } + /** + * Determines if a record should be reprovided + */ + private shouldReprovide (isSelf: boolean, expires: number): boolean { + if (!isSelf) { + return false + } + const now = Date.now() + + if (expires < now) { + // If the record has already expired, reprovide irrespective of the threshold + return true + } + + // if the record is approaching expiration within the reprovide threshold + return expires - now < this.reprovideThreshold + } + private async queueReprovide (cid: CID, options?: AbortOptions): Promise { if (!this.running) { return diff --git a/packages/kad-dht/src/rpc/handlers/get-value.ts b/packages/kad-dht/src/rpc/handlers/get-value.ts index cde2678190..5d8eb7a256 100644 --- a/packages/kad-dht/src/rpc/handlers/get-value.ts +++ b/packages/kad-dht/src/rpc/handlers/get-value.ts @@ -2,7 +2,7 @@ import { publicKeyToProtobuf } from '@libp2p/crypto/keys' import { InvalidMessageError, NotFoundError } from '@libp2p/interface' import { Libp2pRecord } from '@libp2p/record' import { - MAX_RECORD_AGE + PROVIDERS_VALIDITY } from '../../constants.js' import { MessageType } from '../../message/dht.js' import { bufferToRecordKey, isPublicKeyKey, fromPublicKeyKey } from '../../utils.js' @@ -107,7 +107,7 @@ export class GetValueHandler implements DHTMessageHandler { * Try to fetch a given record by from the local datastore. * Returns the record if it is still valid, meaning * - it was either authored by this node, or - * - it was received less than `MAX_RECORD_AGE` ago. + * - it was received less than `PROVIDERS_VALIDITY` ago. */ async _checkLocalDatastore (key: Uint8Array): Promise { this.log('checkLocalDatastore looking for %b', key) @@ -129,7 +129,7 @@ export class GetValueHandler implements DHTMessageHandler { // Check validity: compare time received with max record age if (record.timeReceived == null || - Date.now() - record.timeReceived.getTime() > MAX_RECORD_AGE) { + Date.now() - record.timeReceived.getTime() > PROVIDERS_VALIDITY) { // If record is bad delete it and return await this.datastore.delete(dsKey) return undefined diff --git a/packages/kad-dht/test/reprovider.spec.ts b/packages/kad-dht/test/reprovider.spec.ts index 80f4b6a0d3..803599bd9b 100644 --- a/packages/kad-dht/test/reprovider.spec.ts +++ b/packages/kad-dht/test/reprovider.spec.ts @@ -60,11 +60,9 @@ describe('reprovider', () => { contentRouting, threshold: 100, validity: 200, - interval: 100, + interval: 200, operationMetrics: {} }) - - await start(reprovider) }) afterEach(async () => { @@ -74,6 +72,8 @@ describe('reprovider', () => { it('should reprovide', async () => { const cid = CID.parse('QmZ8eiDPqQqWR17EPxiwCDgrKPVhCHLcyn6xSCNpFAdAZb') + await start(reprovider) + await providers.addProvider(cid, components.peerId) expect(contentRouting.provide).to.have.property('callCount', 0) @@ -88,6 +88,8 @@ describe('reprovider', () => { it('should cancel reprovide', async () => { const cid = CID.parse('QmZ8eiDPqQqWR17EPxiwCDgrKPVhCHLcyn6xSCNpFAdAZb') + await start(reprovider) + await providers.addProvider(cid, components.peerId) expect(contentRouting.provide).to.have.property('callCount', 0) @@ -110,6 +112,9 @@ describe('reprovider', () => { it('should remove expired provider records', async () => { const cid = CID.parse('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n') + + await start(reprovider) + await Promise.all([ providers.addProvider(cid, peers[0].peerId), providers.addProvider(cid, peers[1].peerId) @@ -121,9 +126,67 @@ describe('reprovider', () => { expect(provs[0].toString()).to.be.equal(peers[0].peerId.toString()) expect(provs[1].toString()).to.be.deep.equal(peers[1].peerId.toString()) - await delay(400) + await delay(450) const provsAfter = await providers.getProviders(cid) expect(provsAfter).to.have.length(0) }) + + it('should delete expired records from other peers but preserve own expired records', async () => { + const cid = CID.parse('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n') + + await start(reprovider) + + // Add provider records - one from us, one from another peer + await providers.addProvider(cid, components.peerId) + await providers.addProvider(cid, peers[0].peerId) + + const provsBefore = await providers.getProviders(cid) + expect(provsBefore).to.have.length(2) + + // Wait for records to expire (validity is 200ms) + await delay(250) + + // Trigger reprovide cycle to process expired records + await pEvent(reprovider, 'reprovide:start') + await pEvent(reprovider, 'reprovide:end') + + const provsAfter = await providers.getProviders(cid) + + // Only our own record should remain, other peer's expired record should be deleted + expect(provsAfter).to.have.length(1) + expect(provsAfter[0].toString()).to.equal(components.peerId.toString()) + }) + + describe('shouldReprovide', () => { + it('should return false for non-self providers', () => { + const expires = Date.now() + 50 + const result = (reprovider as any).shouldReprovide(false, expires) + expect(result).to.be.false() + }) + + it('should return true when within reprovide threshold before expiration', () => { + const expires = Date.now() + 50 + const result = (reprovider as any).shouldReprovide(true, expires) + expect(result).to.be.true() + }) + + it('should return true when within reprovide threshold after expiration', () => { + const expires = Date.now() - 50 + const result = (reprovider as any).shouldReprovide(true, expires) + expect(result).to.be.true() + }) + + it('should return false when outside reprovide threshold before expiration', () => { + const expires = Date.now() + 150 + const result = (reprovider as any).shouldReprovide(true, expires) + expect(result).to.be.false() + }) + + it('should return true when outside reprovide threshold after expiration', () => { + const expires = Date.now() - 150 + const result = (reprovider as any).shouldReprovide(true, expires) + expect(result).to.be.true() + }) + }) }) diff --git a/packages/kad-dht/test/rpc/handlers/get-value.spec.ts b/packages/kad-dht/test/rpc/handlers/get-value.spec.ts index 961089f8ef..cf1a9dc77e 100644 --- a/packages/kad-dht/test/rpc/handlers/get-value.spec.ts +++ b/packages/kad-dht/test/rpc/handlers/get-value.spec.ts @@ -9,6 +9,7 @@ import { MemoryDatastore } from 'datastore-core' import { TypedEventEmitter } from 'main-event' import Sinon from 'sinon' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' +import { PROVIDERS_VALIDITY } from '../../../src/constants.js' import { MessageType } from '../../../src/message/dht.js' import { PeerRouting } from '../../../src/peer-routing/index.js' import { GetValueHandler } from '../../../src/rpc/handlers/get-value.js' @@ -186,4 +187,66 @@ describe('rpc - handlers - GetValue', () => { expect(response.record).to.not.be.ok() }) }) + + describe('record expiration', () => { + it('should return valid record within PROVIDERS_VALIDITY period', async () => { + const key = uint8ArrayFromString('hello') + const value = uint8ArrayFromString('world') + const record = new Libp2pRecord(key, value, new Date()) + + await datastore.put(utils.bufferToRecordKey('/dht/record', key), record.serialize()) + + const msg: Message = { + type: T, + key, + closer: [], + providers: [] + } + + peerRouting.getClosestPeersOffline.withArgs(msg.key).resolves([]) + + const response = await handler.handle(sourcePeer.peerId, msg) + + expect(response).to.not.be.undefined() + expect(response.record).to.not.be.undefined() + + if (response.record != null) { + const responseRecord = Libp2pRecord.deserialize(response.record) + expect(responseRecord.value).to.equalBytes(value) + } + }) + + it('should delete and return no record when expired beyond PROVIDERS_VALIDITY', async () => { + const key = uint8ArrayFromString('hello') + const value = uint8ArrayFromString('world') + // Create record with old timestamp (beyond PROVIDERS_VALIDITY) + const oldTimestamp = new Date(Date.now() - PROVIDERS_VALIDITY - 1000) + const record = new Libp2pRecord(key, value, oldTimestamp) + + const dsKey = utils.bufferToRecordKey('/dht/record', key) + await datastore.put(dsKey, record.serialize()) + + // Verify record exists before the test + const existsBefore = await datastore.has(dsKey) + expect(existsBefore).to.be.true() + + const msg: Message = { + type: T, + key, + closer: [], + providers: [] + } + + peerRouting.getClosestPeersOffline.withArgs(msg.key).resolves([]) + + const response = await handler.handle(sourcePeer.peerId, msg) + + expect(response).to.not.be.undefined() + expect(response.record).to.be.undefined() + + // Verify the expired record was deleted from datastore + const existsAfter = await datastore.has(dsKey) + expect(existsAfter).to.be.false() + }) + }) })