Skip to content

Commit aa770ab

Browse files
authored
fix: avoid wasteful reprovides outside threshold (#3238)
1 parent 7ce083d commit aa770ab

File tree

6 files changed

+169
-18
lines changed

6 files changed

+169
-18
lines changed

.github/dictionary.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,8 @@ additionals
1414
SECG
1515
Certicom
1616
RSAES
17+
reprovide
18+
reprovider
19+
reproviding
20+
reprovides
21+
reprovided

packages/kad-dht/src/constants.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ export const second = 1000
44
export const minute = 60 * second
55
export const hour = 60 * minute
66

7-
export const MAX_RECORD_AGE = 36 * hour
8-
97
export const PROTOCOL = '/ipfs/kad/1.0.0'
108

119
/**

packages/kad-dht/src/reprovider.ts

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,10 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
9999
setMaxListeners(Infinity, this.shutdownController.signal)
100100

101101
this.timeout = setTimeout(() => {
102-
this.cleanUp({
102+
this.processRecords({
103103
signal: AbortSignal.timeout(REPROVIDE_TIMEOUT)
104104
}).catch(err => {
105-
this.log.error('error running reprovide/cleanup - %e', err)
105+
this.log.error('error running process to reprovide/cleanup - %e', err)
106106
})
107107
}, this.interval)
108108
}
@@ -118,10 +118,10 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
118118
* Check all provider records. Delete them if they have expired, reprovide
119119
* them if the provider is us and the expiry is within the reprovide window.
120120
*/
121-
private async cleanUp (options?: AbortOptions): Promise<void> {
121+
private async processRecords (options?: AbortOptions): Promise<void> {
122122
try {
123123
this.safeDispatchEvent('reprovide:start')
124-
124+
this.log('starting reprovide/cleanup')
125125
// Get all provider entries from the datastore
126126
for await (const entry of this.datastore.query({
127127
prefix: this.datastorePrefix
@@ -133,17 +133,20 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
133133
const expires = created + this.validity
134134
const now = Date.now()
135135
const expired = now > expires
136+
const isSelf = this.peerId.equals(peerId)
136137

137-
this.log.trace('comparing: %d < %d = %s %s', created, now - this.validity, expired, expired ? '(expired)' : '')
138+
this.log.trace('comparing: %d (now) < %d (expires) = %s %s', now, expires, expired, expired ? '(expired)' : '(valid)')
138139

139-
// delete the record if it has expired
140-
if (expired) {
140+
// delete the record if it has expired and isn't us
141+
// so that if user node is down for a while, we still persist provide intent
142+
if (expired && !isSelf) {
141143
await this.datastore.delete(entry.key, options)
142144
}
143145

144146
// if the provider is us and we are within the reprovide threshold,
145147
// reprovide the record
146-
if (this.peerId.equals(peerId) && (now - expires) < this.reprovideThreshold) {
148+
if (this.shouldReprovide(isSelf, expires)) {
149+
this.log('reproviding %c as it is within the reprovide threshold (%d)', cid, this.reprovideThreshold)
147150
this.queueReprovide(cid)
148151
.catch(err => {
149152
this.log.error('could not reprovide %c - %e', cid, err)
@@ -159,8 +162,9 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
159162
this.safeDispatchEvent('reprovide:end')
160163

161164
if (this.running) {
165+
this.log('queuing next re-provide/cleanup run in %d ms', this.interval)
162166
this.timeout = setTimeout(() => {
163-
this.cleanUp({
167+
this.processRecords({
164168
signal: AbortSignal.timeout(REPROVIDE_TIMEOUT)
165169
}).catch(err => {
166170
this.log.error('error running re-provide - %e', err)
@@ -170,6 +174,24 @@ export class Reprovider extends TypedEventEmitter<ReprovideEvents> {
170174
}
171175
}
172176

177+
/**
178+
* Determines if a record should be reprovided
179+
*/
180+
private shouldReprovide (isSelf: boolean, expires: number): boolean {
181+
if (!isSelf) {
182+
return false
183+
}
184+
const now = Date.now()
185+
186+
if (expires < now) {
187+
// If the record has already expired, reprovide irrespective of the threshold
188+
return true
189+
}
190+
191+
// if the record is approaching expiration within the reprovide threshold
192+
return expires - now < this.reprovideThreshold
193+
}
194+
173195
private async queueReprovide (cid: CID, options?: AbortOptions): Promise<void> {
174196
if (!this.running) {
175197
return

packages/kad-dht/src/rpc/handlers/get-value.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { publicKeyToProtobuf } from '@libp2p/crypto/keys'
22
import { InvalidMessageError, NotFoundError } from '@libp2p/interface'
33
import { Libp2pRecord } from '@libp2p/record'
44
import {
5-
MAX_RECORD_AGE
5+
PROVIDERS_VALIDITY
66
} from '../../constants.js'
77
import { MessageType } from '../../message/dht.js'
88
import { bufferToRecordKey, isPublicKeyKey, fromPublicKeyKey } from '../../utils.js'
@@ -107,7 +107,7 @@ export class GetValueHandler implements DHTMessageHandler {
107107
* Try to fetch a given record by from the local datastore.
108108
* Returns the record if it is still valid, meaning
109109
* - it was either authored by this node, or
110-
* - it was received less than `MAX_RECORD_AGE` ago.
110+
* - it was received less than `PROVIDERS_VALIDITY` ago.
111111
*/
112112
async _checkLocalDatastore (key: Uint8Array): Promise<Libp2pRecord | undefined> {
113113
this.log('checkLocalDatastore looking for %b', key)
@@ -129,7 +129,7 @@ export class GetValueHandler implements DHTMessageHandler {
129129

130130
// Check validity: compare time received with max record age
131131
if (record.timeReceived == null ||
132-
Date.now() - record.timeReceived.getTime() > MAX_RECORD_AGE) {
132+
Date.now() - record.timeReceived.getTime() > PROVIDERS_VALIDITY) {
133133
// If record is bad delete it and return
134134
await this.datastore.delete(dsKey)
135135
return undefined

packages/kad-dht/test/reprovider.spec.ts

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,9 @@ describe('reprovider', () => {
6060
contentRouting,
6161
threshold: 100,
6262
validity: 200,
63-
interval: 100,
63+
interval: 200,
6464
operationMetrics: {}
6565
})
66-
67-
await start(reprovider)
6866
})
6967

7068
afterEach(async () => {
@@ -74,6 +72,8 @@ describe('reprovider', () => {
7472
it('should reprovide', async () => {
7573
const cid = CID.parse('QmZ8eiDPqQqWR17EPxiwCDgrKPVhCHLcyn6xSCNpFAdAZb')
7674

75+
await start(reprovider)
76+
7777
await providers.addProvider(cid, components.peerId)
7878

7979
expect(contentRouting.provide).to.have.property('callCount', 0)
@@ -88,6 +88,8 @@ describe('reprovider', () => {
8888
it('should cancel reprovide', async () => {
8989
const cid = CID.parse('QmZ8eiDPqQqWR17EPxiwCDgrKPVhCHLcyn6xSCNpFAdAZb')
9090

91+
await start(reprovider)
92+
9193
await providers.addProvider(cid, components.peerId)
9294

9395
expect(contentRouting.provide).to.have.property('callCount', 0)
@@ -110,6 +112,9 @@ describe('reprovider', () => {
110112

111113
it('should remove expired provider records', async () => {
112114
const cid = CID.parse('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n')
115+
116+
await start(reprovider)
117+
113118
await Promise.all([
114119
providers.addProvider(cid, peers[0].peerId),
115120
providers.addProvider(cid, peers[1].peerId)
@@ -121,9 +126,67 @@ describe('reprovider', () => {
121126
expect(provs[0].toString()).to.be.equal(peers[0].peerId.toString())
122127
expect(provs[1].toString()).to.be.deep.equal(peers[1].peerId.toString())
123128

124-
await delay(400)
129+
await delay(450)
125130

126131
const provsAfter = await providers.getProviders(cid)
127132
expect(provsAfter).to.have.length(0)
128133
})
134+
135+
it('should delete expired records from other peers but preserve own expired records', async () => {
136+
const cid = CID.parse('QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n')
137+
138+
await start(reprovider)
139+
140+
// Add provider records - one from us, one from another peer
141+
await providers.addProvider(cid, components.peerId)
142+
await providers.addProvider(cid, peers[0].peerId)
143+
144+
const provsBefore = await providers.getProviders(cid)
145+
expect(provsBefore).to.have.length(2)
146+
147+
// Wait for records to expire (validity is 200ms)
148+
await delay(250)
149+
150+
// Trigger reprovide cycle to process expired records
151+
await pEvent(reprovider, 'reprovide:start')
152+
await pEvent(reprovider, 'reprovide:end')
153+
154+
const provsAfter = await providers.getProviders(cid)
155+
156+
// Only our own record should remain, other peer's expired record should be deleted
157+
expect(provsAfter).to.have.length(1)
158+
expect(provsAfter[0].toString()).to.equal(components.peerId.toString())
159+
})
160+
161+
describe('shouldReprovide', () => {
162+
it('should return false for non-self providers', () => {
163+
const expires = Date.now() + 50
164+
const result = (reprovider as any).shouldReprovide(false, expires)
165+
expect(result).to.be.false()
166+
})
167+
168+
it('should return true when within reprovide threshold before expiration', () => {
169+
const expires = Date.now() + 50
170+
const result = (reprovider as any).shouldReprovide(true, expires)
171+
expect(result).to.be.true()
172+
})
173+
174+
it('should return true when within reprovide threshold after expiration', () => {
175+
const expires = Date.now() - 50
176+
const result = (reprovider as any).shouldReprovide(true, expires)
177+
expect(result).to.be.true()
178+
})
179+
180+
it('should return false when outside reprovide threshold before expiration', () => {
181+
const expires = Date.now() + 150
182+
const result = (reprovider as any).shouldReprovide(true, expires)
183+
expect(result).to.be.false()
184+
})
185+
186+
it('should return true when outside reprovide threshold after expiration', () => {
187+
const expires = Date.now() - 150
188+
const result = (reprovider as any).shouldReprovide(true, expires)
189+
expect(result).to.be.true()
190+
})
191+
})
129192
})

packages/kad-dht/test/rpc/handlers/get-value.spec.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import { MemoryDatastore } from 'datastore-core'
99
import { TypedEventEmitter } from 'main-event'
1010
import Sinon from 'sinon'
1111
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
12+
import { PROVIDERS_VALIDITY } from '../../../src/constants.js'
1213
import { MessageType } from '../../../src/message/dht.js'
1314
import { PeerRouting } from '../../../src/peer-routing/index.js'
1415
import { GetValueHandler } from '../../../src/rpc/handlers/get-value.js'
@@ -186,4 +187,66 @@ describe('rpc - handlers - GetValue', () => {
186187
expect(response.record).to.not.be.ok()
187188
})
188189
})
190+
191+
describe('record expiration', () => {
192+
it('should return valid record within PROVIDERS_VALIDITY period', async () => {
193+
const key = uint8ArrayFromString('hello')
194+
const value = uint8ArrayFromString('world')
195+
const record = new Libp2pRecord(key, value, new Date())
196+
197+
await datastore.put(utils.bufferToRecordKey('/dht/record', key), record.serialize())
198+
199+
const msg: Message = {
200+
type: T,
201+
key,
202+
closer: [],
203+
providers: []
204+
}
205+
206+
peerRouting.getClosestPeersOffline.withArgs(msg.key).resolves([])
207+
208+
const response = await handler.handle(sourcePeer.peerId, msg)
209+
210+
expect(response).to.not.be.undefined()
211+
expect(response.record).to.not.be.undefined()
212+
213+
if (response.record != null) {
214+
const responseRecord = Libp2pRecord.deserialize(response.record)
215+
expect(responseRecord.value).to.equalBytes(value)
216+
}
217+
})
218+
219+
it('should delete and return no record when expired beyond PROVIDERS_VALIDITY', async () => {
220+
const key = uint8ArrayFromString('hello')
221+
const value = uint8ArrayFromString('world')
222+
// Create record with old timestamp (beyond PROVIDERS_VALIDITY)
223+
const oldTimestamp = new Date(Date.now() - PROVIDERS_VALIDITY - 1000)
224+
const record = new Libp2pRecord(key, value, oldTimestamp)
225+
226+
const dsKey = utils.bufferToRecordKey('/dht/record', key)
227+
await datastore.put(dsKey, record.serialize())
228+
229+
// Verify record exists before the test
230+
const existsBefore = await datastore.has(dsKey)
231+
expect(existsBefore).to.be.true()
232+
233+
const msg: Message = {
234+
type: T,
235+
key,
236+
closer: [],
237+
providers: []
238+
}
239+
240+
peerRouting.getClosestPeersOffline.withArgs(msg.key).resolves([])
241+
242+
const response = await handler.handle(sourcePeer.peerId, msg)
243+
244+
expect(response).to.not.be.undefined()
245+
expect(response.record).to.be.undefined()
246+
247+
// Verify the expired record was deleted from datastore
248+
const existsAfter = await datastore.has(dsKey)
249+
expect(existsAfter).to.be.false()
250+
})
251+
})
189252
})

0 commit comments

Comments
 (0)