Skip to content
7 changes: 6 additions & 1 deletion packages/client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,10 @@
"dep-check": "aegir dep-check",
"build": "aegir build",
"test": "aegir test",
"test:chrome": "aegir test -t browser --cov",
"test:chrome-webworker": "aegir test -t webworker",
"test:firefox": "aegir test -t browser -- --browser firefox",
"test:firefox-webworker": "aegir test -t webworker -- --browser firefox",
"test:node": "aegir test -t node --cov",
"release": "aegir release"
},
Expand All @@ -153,7 +157,8 @@
"@libp2p/crypto": "^5.0.1",
"aegir": "^45.0.1",
"body-parser": "^1.20.3",
"it-all": "^3.0.6"
"it-all": "^3.0.6",
"wherearewe": "^2.0.1"
},
"sideEffects": false
}
94 changes: 88 additions & 6 deletions packages/client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

const defaultValues = {
concurrentRequests: 4,
timeout: 30e3
timeout: 30e3,
cacheTTL: 5 * 60 * 1000 // 5 minutes default as per https://specs.ipfs.tech/routing/http-routing-v1/#response-headers
}

export class DefaultDelegatedRoutingV1HttpApiClient implements DelegatedRoutingV1HttpApiClient {
Expand All @@ -33,7 +34,9 @@
private readonly peerRouting: PeerRouting
private readonly filterAddrs?: string[]
private readonly filterProtocols?: string[]

private readonly inFlightRequests: Map<string, Promise<Response>>
private cache?: Cache
private readonly cacheTTL: number
/**
* Create a new DelegatedContentRouting instance
*/
Expand All @@ -44,12 +47,25 @@
this.httpQueue = new PQueue({
concurrency: init.concurrentRequests ?? defaultValues.concurrentRequests
})
this.inFlightRequests = new Map() // Tracks in-flight requests to avoid duplicate requests
this.clientUrl = url instanceof URL ? url : new URL(url)
this.timeout = init.timeout ?? defaultValues.timeout
this.filterAddrs = init.filterAddrs
this.filterProtocols = init.filterProtocols
this.contentRouting = new DelegatedRoutingV1HttpApiClientContentRouting(this)
this.peerRouting = new DelegatedRoutingV1HttpApiClientPeerRouting(this)

this.cacheTTL = init.cacheTTL ?? defaultValues.cacheTTL
const cacheEnabled = (typeof globalThis.caches !== 'undefined') && (this.cacheTTL > 0)

if (cacheEnabled) {
log('cache enabled with ttl %d', this.cacheTTL)
globalThis.caches.open('delegated-routing-v1-cache').then(cache => {
this.cache = cache
}).catch(() => {
this.cache = undefined

Check warning on line 66 in packages/client/src/client.ts

View check run for this annotation

Codecov / codecov/patch

packages/client/src/client.ts#L66

Added line #L66 was not covered by tests
})
}
}

get [contentRoutingSymbol] (): ContentRouting {
Expand All @@ -72,6 +88,11 @@
this.httpQueue.clear()
this.shutDownController.abort()
this.started = false

// Clear the cache when stopping
if (this.cache != null) {
void this.cache.delete('delegated-routing-v1-cache')
Copy link
Member Author

@2color 2color Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realised that this can be a problem if you have multiple instances of the client in scope as we do in Verified Fetch.

But even then, it's most likely that they all get stopped at the same time rather than individually.

}
}

async * getProviders (cid: CID, options: GetProvidersOptions = {}): AsyncGenerator<PeerRecord> {
Expand All @@ -95,7 +116,7 @@
const url = new URL(`${this.clientUrl}routing/v1/providers/${cid.toString()}`)
this.#addFilterParams(url, options.filterAddrs, options.filterProtocols)
const getOptions = { headers: { Accept: 'application/x-ndjson' }, signal }
const res = await fetch(url, getOptions)
const res = await this.#makeRequest(url.toString(), getOptions)

if (res.status === 404) {
// https://specs.ipfs.tech/routing/http-routing-v1/#response-status-codes
Expand Down Expand Up @@ -162,7 +183,7 @@
this.#addFilterParams(url, options.filterAddrs, options.filterProtocols)

const getOptions = { headers: { Accept: 'application/x-ndjson' }, signal }
const res = await fetch(url, getOptions)
const res = await this.#makeRequest(url.toString(), getOptions)

if (res.status === 404) {
// https://specs.ipfs.tech/routing/http-routing-v1/#response-status-codes
Expand Down Expand Up @@ -228,7 +249,7 @@
await onStart.promise

const getOptions = { headers: { Accept: 'application/vnd.ipfs.ipns-record' }, signal }
const res = await fetch(resource, getOptions)
const res = await this.#makeRequest(resource, getOptions)

log('getIPNS GET %s %d', resource, res.status)

Expand Down Expand Up @@ -290,7 +311,7 @@
const body = marshalIPNSRecord(record)

const getOptions = { method: 'PUT', headers: { 'Content-Type': 'application/vnd.ipfs.ipns-record' }, body, signal }
const res = await fetch(resource, getOptions)
const res = await this.#makeRequest(resource, getOptions)

log('putIPNS PUT %s %d', resource, res.status)

Expand Down Expand Up @@ -349,4 +370,65 @@
}
}
}

/**
* makeRequest has two features:
* - Ensures only one concurrent request is made for the same URL
* - Caches GET requests if the Cache API is available
*/
async #makeRequest (url: string, options: RequestInit): Promise<Response> {
const requestMethod = options.method ?? 'GET'
const key = `${requestMethod}-${url}`

// Only try to use cache for GET requests
if (requestMethod === 'GET') {
const cachedResponse = await this.cache?.match(url)
if (cachedResponse != null) {
// Check if the cached response has expired
const expires = parseInt(cachedResponse.headers.get('x-cache-expires') ?? '0', 10)
if (expires > Date.now()) {
log('returning cached response for %s', key)
return cachedResponse
} else {
// Remove expired response from cache
await this.cache?.delete(url)
}
}
}

// Check if there's already an in-flight request for this URL
const existingRequest = this.inFlightRequests.get(key)
if (existingRequest != null) {
const response = await existingRequest
log('deduplicating outgoing request for %s', key)
return response.clone()
}

// Create new request and track it
const requestPromise = fetch(url, options).then(async response => {
// Only cache successful GET requests
if (this.cache != null && response.ok && requestMethod === 'GET') {
const expires = Date.now() + this.cacheTTL
const headers = new Headers(response.headers)
headers.set('x-cache-expires', expires.toString())

// Create a new response with expiration header
const cachedResponse = new Response(response.clone().body, {
status: response.status,
statusText: response.statusText,
headers
})

await this.cache.put(url, cachedResponse)
}
return response
}).finally(() => {
// Clean up the tracked request when it completes
this.inFlightRequests.delete(key)
})

this.inFlightRequests.set(key, requestPromise)
const response = await requestPromise
return response
}
}
20 changes: 20 additions & 0 deletions packages/client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,20 @@
* await libp2p.peerRouting.findPeer(peerIdFromString('QmFoo'))
* ```
*
* ### Caching
*
* By default, the client caches successful (200) delegated routing responses in browser environments (that support the [Cache API](https://developer.mozilla.org/en-US/docs/Web/API/Cache)) for a duration of 5 minutes. The client does this by adding an `x-cache-expires` header to the response object.
*
* If caching is enabled, the client will cache responses for the duration of `cacheTTL` milliseconds.
* If `cacheTTL` is 0, caching is disabled:
*
* @example
*
* ```typescript
* // disable caching
* const client = createDelegatedRoutingV1HttpApiClient('https://example.org', { cacheTTL: 0 })
* ```
*
* ### Filtering with IPIP-484
*
* The client can be configured to pass filter options to the delegated routing server as defined in IPIP-484.
Expand Down Expand Up @@ -124,6 +138,12 @@ export interface DelegatedRoutingV1HttpApiClientInit extends FilterOptions {
* How long a request is allowed to take in ms (default: 30 seconds)
*/
timeout?: number

/**
* How long to cache responses for in ms (default: 5 minutes)
* If 0, caching is disabled
*/
cacheTTL?: number
}

export interface GetIPNSOptions extends AbortOptions {
Expand Down
97 changes: 96 additions & 1 deletion packages/client/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,21 @@
import { createIPNSRecord, marshalIPNSRecord } from 'ipns'
import all from 'it-all'
import { CID } from 'multiformats/cid'
import { isBrowser } from 'wherearewe'
import { createDelegatedRoutingV1HttpApiClient, type DelegatedRoutingV1HttpApiClient } from '../src/index.js'

if (process.env.ECHO_SERVER == null) {
throw new Error('Echo server not configured correctly')
}

const serverUrl = process.env.ECHO_SERVER
const itBrowser = (isBrowser ? it : it.skip)

describe('delegated-routing-v1-http-api-client', () => {
let client: DelegatedRoutingV1HttpApiClient

beforeEach(() => {
client = createDelegatedRoutingV1HttpApiClient(new URL(serverUrl))
client = createDelegatedRoutingV1HttpApiClient(new URL(serverUrl), { cacheTTL: 0 })
})

afterEach(async () => {
Expand Down Expand Up @@ -302,4 +304,97 @@
const receivedRecord = new Uint8Array(await res.arrayBuffer())
expect(marshalIPNSRecord(record)).to.equalBytes(receivedRecord)
})

it('should deduplicate concurrent requests to the same URL', async () => {
const cid = CID.parse('QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn')
const providers = [{
Protocol: 'transport-bitswap',
Schema: 'bitswap',
Metadata: 'gBI=',
ID: (await generateKeyPair('Ed25519')).publicKey.toString(),
Addrs: ['/ip4/41.41.41.41/tcp/1234']
}]

// load providers for the router to fetch
await fetch(`${process.env.ECHO_SERVER}/add-providers/${cid.toString()}`, {
method: 'POST',
body: providers.map(prov => JSON.stringify(prov)).join('\n')
})

// Reset call count before our test
await fetch(`${process.env.ECHO_SERVER}/reset-call-count`)

// Make multiple concurrent requests
const results = await Promise.all([
all(client.getProviders(cid)),
all(client.getProviders(cid)),
all(client.getProviders(cid)),
all(client.getProviders(cid))
])

// Get the number of times the server was called
const callCountRes = await fetch(`${process.env.ECHO_SERVER}/get-call-count`)
const callCount = parseInt(await callCountRes.text(), 10)

// Verify server was only called once
expect(callCount).to.equal(1)

// Verify all results are the same
results.forEach(resultProviders => {
expect(resultProviders.map(prov => ({
id: prov.ID.toString(),
// eslint-disable-next-line max-nested-callbacks
addrs: prov.Addrs?.map(ma => ma.toString())
}))).to.deep.equal(providers.map(prov => ({
id: prov.ID,
addrs: prov.Addrs
})))
})
})

itBrowser('should respect cache TTL', async () => {
const shortTTL = 100 // 100ms TTL for testing
const clientWithShortTTL = createDelegatedRoutingV1HttpApiClient(new URL(serverUrl), {
cacheTTL: shortTTL
})

const cid = CID.parse('QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn')
const providers = [{
Protocol: 'transport-bitswap',
Schema: 'bitswap',
Metadata: 'gBI=',
ID: (await generateKeyPair('Ed25519')).publicKey.toString(),
Addrs: ['/ip4/41.41.41.41/tcp/1234']
}]

// load providers for the router to fetch
await fetch(`${process.env.ECHO_SERVER}/add-providers/${cid.toString()}`, {
method: 'POST',
body: providers.map(prov => JSON.stringify(prov)).join('\n')
})

// Reset call count
await fetch(`${process.env.ECHO_SERVER}/reset-call-count`)

// First request should hit the server
await all(clientWithShortTTL.getProviders(cid))

// Second and third request should use cache
await all(clientWithShortTTL.getProviders(cid))
await all(clientWithShortTTL.getProviders(cid))

let callCount = parseInt(await (await fetch(`${process.env.ECHO_SERVER}/get-call-count`)).text(), 10)
expect(callCount).to.equal(1) // Only one server call so far

// Wait for cache to expire
await new Promise(resolve => setTimeout(resolve, shortTTL + 50))

// This request should hit the server again because cache expired
await all(clientWithShortTTL.getProviders(cid))

callCount = parseInt(await (await fetch(`${process.env.ECHO_SERVER}/get-call-count`)).text(), 10)
expect(callCount).to.equal(2) // Second server call after cache expired

clientWithShortTTL.stop()

Check warning on line 398 in packages/client/test/index.spec.ts

View check run for this annotation

Codecov / codecov/patch

packages/client/test/index.spec.ts#L356-L398

Added lines #L356 - L398 were not covered by tests
})
})
2 changes: 1 addition & 1 deletion packages/client/test/routings.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ describe('libp2p content-routing', () => {
let client: DelegatedRoutingV1HttpApiClient

beforeEach(() => {
client = createDelegatedRoutingV1HttpApiClient(new URL(serverUrl))
client = createDelegatedRoutingV1HttpApiClient(new URL(serverUrl), { cacheTTL: 0 })
})

afterEach(async () => {
Expand Down
2 changes: 1 addition & 1 deletion packages/interop/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@
"@libp2p/kad-dht": "^14.0.1",
"aegir": "^45.0.1",
"fastify": "^5.0.0",
"helia": "next",
"helia": "^5.1.0",
"ipns": "^10.0.0",
"it-first": "^3.0.6",
"multiformats": "^13.3.0"
Expand Down
Loading