Skip to content

Commit 98105ae

Browse files
2colorSgtPooki
andauthored
feat: request deduplication and caching in client (#151)
* feat: add request deduplication fix #148 * test: add test for request deduplication * chore: remove console .log * feat: use cache api to cache responses * ci: add browser test scripts * fix: new headers object as its immutable * feat: add ability to disable cache with ttl * docs: update with example * Update client.ts Co-authored-by: Russell Dempsey <[email protected]> * fix: remove unnecessary response clone * fix: use major helia version in interop package * docs: refine wording on how the cache works --------- Co-authored-by: Daniel N <[email protected]> Co-authored-by: Russell Dempsey <[email protected]>
1 parent 53ac586 commit 98105ae

File tree

6 files changed

+212
-10
lines changed

6 files changed

+212
-10
lines changed

packages/client/package.json

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,10 @@
130130
"dep-check": "aegir dep-check",
131131
"build": "aegir build",
132132
"test": "aegir test",
133+
"test:chrome": "aegir test -t browser --cov",
134+
"test:chrome-webworker": "aegir test -t webworker",
135+
"test:firefox": "aegir test -t browser -- --browser firefox",
136+
"test:firefox-webworker": "aegir test -t webworker -- --browser firefox",
133137
"test:node": "aegir test -t node --cov",
134138
"release": "aegir release"
135139
},
@@ -153,7 +157,8 @@
153157
"@libp2p/crypto": "^5.0.1",
154158
"aegir": "^45.0.1",
155159
"body-parser": "^1.20.3",
156-
"it-all": "^3.0.6"
160+
"it-all": "^3.0.6",
161+
"wherearewe": "^2.0.1"
157162
},
158163
"sideEffects": false
159164
}

packages/client/src/client.ts

Lines changed: 88 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ const log = logger('delegated-routing-v1-http-api-client')
2020

2121
const defaultValues = {
2222
concurrentRequests: 4,
23-
timeout: 30e3
23+
timeout: 30e3,
24+
cacheTTL: 5 * 60 * 1000 // 5 minutes default as per https://specs.ipfs.tech/routing/http-routing-v1/#response-headers
2425
}
2526

2627
export class DefaultDelegatedRoutingV1HttpApiClient implements DelegatedRoutingV1HttpApiClient {
@@ -33,7 +34,9 @@ export class DefaultDelegatedRoutingV1HttpApiClient implements DelegatedRoutingV
3334
private readonly peerRouting: PeerRouting
3435
private readonly filterAddrs?: string[]
3536
private readonly filterProtocols?: string[]
36-
37+
private readonly inFlightRequests: Map<string, Promise<Response>>
38+
private cache?: Cache
39+
private readonly cacheTTL: number
3740
/**
3841
* Create a new DelegatedContentRouting instance
3942
*/
@@ -44,12 +47,25 @@ export class DefaultDelegatedRoutingV1HttpApiClient implements DelegatedRoutingV
4447
this.httpQueue = new PQueue({
4548
concurrency: init.concurrentRequests ?? defaultValues.concurrentRequests
4649
})
50+
this.inFlightRequests = new Map() // Tracks in-flight requests to avoid duplicate requests
4751
this.clientUrl = url instanceof URL ? url : new URL(url)
4852
this.timeout = init.timeout ?? defaultValues.timeout
4953
this.filterAddrs = init.filterAddrs
5054
this.filterProtocols = init.filterProtocols
5155
this.contentRouting = new DelegatedRoutingV1HttpApiClientContentRouting(this)
5256
this.peerRouting = new DelegatedRoutingV1HttpApiClientPeerRouting(this)
57+
58+
this.cacheTTL = init.cacheTTL ?? defaultValues.cacheTTL
59+
const cacheEnabled = (typeof globalThis.caches !== 'undefined') && (this.cacheTTL > 0)
60+
61+
if (cacheEnabled) {
62+
log('cache enabled with ttl %d', this.cacheTTL)
63+
globalThis.caches.open('delegated-routing-v1-cache').then(cache => {
64+
this.cache = cache
65+
}).catch(() => {
66+
this.cache = undefined
67+
})
68+
}
5369
}
5470

5571
get [contentRoutingSymbol] (): ContentRouting {
@@ -72,6 +88,11 @@ export class DefaultDelegatedRoutingV1HttpApiClient implements DelegatedRoutingV
7288
this.httpQueue.clear()
7389
this.shutDownController.abort()
7490
this.started = false
91+
92+
// Clear the cache when stopping
93+
if (this.cache != null) {
94+
void this.cache.delete('delegated-routing-v1-cache')
95+
}
7596
}
7697

7798
async * getProviders (cid: CID, options: GetProvidersOptions = {}): AsyncGenerator<PeerRecord> {
@@ -95,7 +116,7 @@ export class DefaultDelegatedRoutingV1HttpApiClient implements DelegatedRoutingV
95116
const url = new URL(`${this.clientUrl}routing/v1/providers/${cid.toString()}`)
96117
this.#addFilterParams(url, options.filterAddrs, options.filterProtocols)
97118
const getOptions = { headers: { Accept: 'application/x-ndjson' }, signal }
98-
const res = await fetch(url, getOptions)
119+
const res = await this.#makeRequest(url.toString(), getOptions)
99120

100121
if (res.status === 404) {
101122
// https://specs.ipfs.tech/routing/http-routing-v1/#response-status-codes
@@ -162,7 +183,7 @@ export class DefaultDelegatedRoutingV1HttpApiClient implements DelegatedRoutingV
162183
this.#addFilterParams(url, options.filterAddrs, options.filterProtocols)
163184

164185
const getOptions = { headers: { Accept: 'application/x-ndjson' }, signal }
165-
const res = await fetch(url, getOptions)
186+
const res = await this.#makeRequest(url.toString(), getOptions)
166187

167188
if (res.status === 404) {
168189
// https://specs.ipfs.tech/routing/http-routing-v1/#response-status-codes
@@ -228,7 +249,7 @@ export class DefaultDelegatedRoutingV1HttpApiClient implements DelegatedRoutingV
228249
await onStart.promise
229250

230251
const getOptions = { headers: { Accept: 'application/vnd.ipfs.ipns-record' }, signal }
231-
const res = await fetch(resource, getOptions)
252+
const res = await this.#makeRequest(resource, getOptions)
232253

233254
log('getIPNS GET %s %d', resource, res.status)
234255

@@ -290,7 +311,7 @@ export class DefaultDelegatedRoutingV1HttpApiClient implements DelegatedRoutingV
290311
const body = marshalIPNSRecord(record)
291312

292313
const getOptions = { method: 'PUT', headers: { 'Content-Type': 'application/vnd.ipfs.ipns-record' }, body, signal }
293-
const res = await fetch(resource, getOptions)
314+
const res = await this.#makeRequest(resource, getOptions)
294315

295316
log('putIPNS PUT %s %d', resource, res.status)
296317

@@ -349,4 +370,65 @@ export class DefaultDelegatedRoutingV1HttpApiClient implements DelegatedRoutingV
349370
}
350371
}
351372
}
373+
374+
/**
375+
* makeRequest has two features:
376+
* - Ensures only one concurrent request is made for the same URL
377+
* - Caches GET requests if the Cache API is available
378+
*/
379+
async #makeRequest (url: string, options: RequestInit): Promise<Response> {
380+
const requestMethod = options.method ?? 'GET'
381+
const key = `${requestMethod}-${url}`
382+
383+
// Only try to use cache for GET requests
384+
if (requestMethod === 'GET') {
385+
const cachedResponse = await this.cache?.match(url)
386+
if (cachedResponse != null) {
387+
// Check if the cached response has expired
388+
const expires = parseInt(cachedResponse.headers.get('x-cache-expires') ?? '0', 10)
389+
if (expires > Date.now()) {
390+
log('returning cached response for %s', key)
391+
return cachedResponse
392+
} else {
393+
// Remove expired response from cache
394+
await this.cache?.delete(url)
395+
}
396+
}
397+
}
398+
399+
// Check if there's already an in-flight request for this URL
400+
const existingRequest = this.inFlightRequests.get(key)
401+
if (existingRequest != null) {
402+
const response = await existingRequest
403+
log('deduplicating outgoing request for %s', key)
404+
return response.clone()
405+
}
406+
407+
// Create new request and track it
408+
const requestPromise = fetch(url, options).then(async response => {
409+
// Only cache successful GET requests
410+
if (this.cache != null && response.ok && requestMethod === 'GET') {
411+
const expires = Date.now() + this.cacheTTL
412+
const headers = new Headers(response.headers)
413+
headers.set('x-cache-expires', expires.toString())
414+
415+
// Create a new response with expiration header
416+
const cachedResponse = new Response(response.clone().body, {
417+
status: response.status,
418+
statusText: response.statusText,
419+
headers
420+
})
421+
422+
await this.cache.put(url, cachedResponse)
423+
}
424+
return response
425+
}).finally(() => {
426+
// Clean up the tracked request when it completes
427+
this.inFlightRequests.delete(key)
428+
})
429+
430+
this.inFlightRequests.set(key, requestPromise)
431+
const response = await requestPromise
432+
return response
433+
}
352434
}

packages/client/src/index.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,20 @@
3939
* await libp2p.peerRouting.findPeer(peerIdFromString('QmFoo'))
4040
* ```
4141
*
42+
* ### Caching
43+
*
44+
* By default, the client caches successful (200) delegated routing responses in browser environments (that support the [Cache API](https://developer.mozilla.org/en-US/docs/Web/API/Cache)) for a duration of 5 minutes. The client does this by adding an `x-cache-expires` header to the response object.
45+
*
46+
* If caching is enabled, the client will cache responses for the duration of `cacheTTL` milliseconds.
47+
* If `cacheTTL` is 0, caching is disabled:
48+
*
49+
* @example
50+
*
51+
* ```typescript
52+
* // disable caching
53+
* const client = createDelegatedRoutingV1HttpApiClient('https://example.org', { cacheTTL: 0 })
54+
* ```
55+
*
4256
* ### Filtering with IPIP-484
4357
*
4458
* The client can be configured to pass filter options to the delegated routing server as defined in IPIP-484.
@@ -124,6 +138,12 @@ export interface DelegatedRoutingV1HttpApiClientInit extends FilterOptions {
124138
* How long a request is allowed to take in ms (default: 30 seconds)
125139
*/
126140
timeout?: number
141+
142+
/**
143+
* How long to cache responses for in ms (default: 5 minutes)
144+
* If 0, caching is disabled
145+
*/
146+
cacheTTL?: number
127147
}
128148

129149
export interface GetIPNSOptions extends AbortOptions {

packages/client/test/index.spec.ts

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,21 @@ import { expect } from 'aegir/chai'
77
import { createIPNSRecord, marshalIPNSRecord } from 'ipns'
88
import all from 'it-all'
99
import { CID } from 'multiformats/cid'
10+
import { isBrowser } from 'wherearewe'
1011
import { createDelegatedRoutingV1HttpApiClient, type DelegatedRoutingV1HttpApiClient } from '../src/index.js'
1112

1213
if (process.env.ECHO_SERVER == null) {
1314
throw new Error('Echo server not configured correctly')
1415
}
1516

1617
const serverUrl = process.env.ECHO_SERVER
18+
const itBrowser = (isBrowser ? it : it.skip)
1719

1820
describe('delegated-routing-v1-http-api-client', () => {
1921
let client: DelegatedRoutingV1HttpApiClient
2022

2123
beforeEach(() => {
22-
client = createDelegatedRoutingV1HttpApiClient(new URL(serverUrl))
24+
client = createDelegatedRoutingV1HttpApiClient(new URL(serverUrl), { cacheTTL: 0 })
2325
})
2426

2527
afterEach(async () => {
@@ -302,4 +304,97 @@ describe('delegated-routing-v1-http-api-client', () => {
302304
const receivedRecord = new Uint8Array(await res.arrayBuffer())
303305
expect(marshalIPNSRecord(record)).to.equalBytes(receivedRecord)
304306
})
307+
308+
it('should deduplicate concurrent requests to the same URL', async () => {
309+
const cid = CID.parse('QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn')
310+
const providers = [{
311+
Protocol: 'transport-bitswap',
312+
Schema: 'bitswap',
313+
Metadata: 'gBI=',
314+
ID: (await generateKeyPair('Ed25519')).publicKey.toString(),
315+
Addrs: ['/ip4/41.41.41.41/tcp/1234']
316+
}]
317+
318+
// load providers for the router to fetch
319+
await fetch(`${process.env.ECHO_SERVER}/add-providers/${cid.toString()}`, {
320+
method: 'POST',
321+
body: providers.map(prov => JSON.stringify(prov)).join('\n')
322+
})
323+
324+
// Reset call count before our test
325+
await fetch(`${process.env.ECHO_SERVER}/reset-call-count`)
326+
327+
// Make multiple concurrent requests
328+
const results = await Promise.all([
329+
all(client.getProviders(cid)),
330+
all(client.getProviders(cid)),
331+
all(client.getProviders(cid)),
332+
all(client.getProviders(cid))
333+
])
334+
335+
// Get the number of times the server was called
336+
const callCountRes = await fetch(`${process.env.ECHO_SERVER}/get-call-count`)
337+
const callCount = parseInt(await callCountRes.text(), 10)
338+
339+
// Verify server was only called once
340+
expect(callCount).to.equal(1)
341+
342+
// Verify all results are the same
343+
results.forEach(resultProviders => {
344+
expect(resultProviders.map(prov => ({
345+
id: prov.ID.toString(),
346+
// eslint-disable-next-line max-nested-callbacks
347+
addrs: prov.Addrs?.map(ma => ma.toString())
348+
}))).to.deep.equal(providers.map(prov => ({
349+
id: prov.ID,
350+
addrs: prov.Addrs
351+
})))
352+
})
353+
})
354+
355+
itBrowser('should respect cache TTL', async () => {
356+
const shortTTL = 100 // 100ms TTL for testing
357+
const clientWithShortTTL = createDelegatedRoutingV1HttpApiClient(new URL(serverUrl), {
358+
cacheTTL: shortTTL
359+
})
360+
361+
const cid = CID.parse('QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn')
362+
const providers = [{
363+
Protocol: 'transport-bitswap',
364+
Schema: 'bitswap',
365+
Metadata: 'gBI=',
366+
ID: (await generateKeyPair('Ed25519')).publicKey.toString(),
367+
Addrs: ['/ip4/41.41.41.41/tcp/1234']
368+
}]
369+
370+
// load providers for the router to fetch
371+
await fetch(`${process.env.ECHO_SERVER}/add-providers/${cid.toString()}`, {
372+
method: 'POST',
373+
body: providers.map(prov => JSON.stringify(prov)).join('\n')
374+
})
375+
376+
// Reset call count
377+
await fetch(`${process.env.ECHO_SERVER}/reset-call-count`)
378+
379+
// First request should hit the server
380+
await all(clientWithShortTTL.getProviders(cid))
381+
382+
// Second and third request should use cache
383+
await all(clientWithShortTTL.getProviders(cid))
384+
await all(clientWithShortTTL.getProviders(cid))
385+
386+
let callCount = parseInt(await (await fetch(`${process.env.ECHO_SERVER}/get-call-count`)).text(), 10)
387+
expect(callCount).to.equal(1) // Only one server call so far
388+
389+
// Wait for cache to expire
390+
await new Promise(resolve => setTimeout(resolve, shortTTL + 50))
391+
392+
// This request should hit the server again because cache expired
393+
await all(clientWithShortTTL.getProviders(cid))
394+
395+
callCount = parseInt(await (await fetch(`${process.env.ECHO_SERVER}/get-call-count`)).text(), 10)
396+
expect(callCount).to.equal(2) // Second server call after cache expired
397+
398+
clientWithShortTTL.stop()
399+
})
305400
})

packages/client/test/routings.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ describe('libp2p content-routing', () => {
2323
let client: DelegatedRoutingV1HttpApiClient
2424

2525
beforeEach(() => {
26-
client = createDelegatedRoutingV1HttpApiClient(new URL(serverUrl))
26+
client = createDelegatedRoutingV1HttpApiClient(new URL(serverUrl), { cacheTTL: 0 })
2727
})
2828

2929
afterEach(async () => {

packages/interop/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@
125125
"@libp2p/kad-dht": "^14.0.1",
126126
"aegir": "^45.0.1",
127127
"fastify": "^5.0.0",
128-
"helia": "next",
128+
"helia": "^5.1.0",
129129
"ipns": "^10.0.0",
130130
"it-first": "^3.0.6",
131131
"multiformats": "^13.3.0"

0 commit comments

Comments
 (0)