diff --git a/packages/client/package.json b/packages/client/package.json index ad2012c..e0d9ffa 100644 --- a/packages/client/package.json +++ b/packages/client/package.json @@ -130,6 +130,10 @@ "dep-check": "aegir dep-check", "build": "aegir build", "test": "aegir test", + "test:chrome": "aegir test -t browser --cov", + "test:chrome-webworker": "aegir test -t webworker", + "test:firefox": "aegir test -t browser -- --browser firefox", + "test:firefox-webworker": "aegir test -t webworker -- --browser firefox", "test:node": "aegir test -t node --cov", "release": "aegir release" }, @@ -153,7 +157,8 @@ "@libp2p/crypto": "^5.0.1", "aegir": "^45.0.1", "body-parser": "^1.20.3", - "it-all": "^3.0.6" + "it-all": "^3.0.6", + "wherearewe": "^2.0.1" }, "sideEffects": false } diff --git a/packages/client/src/client.ts b/packages/client/src/client.ts index 9173b2a..5417aa0 100644 --- a/packages/client/src/client.ts +++ b/packages/client/src/client.ts @@ -20,7 +20,8 @@ const log = logger('delegated-routing-v1-http-api-client') const defaultValues = { concurrentRequests: 4, - timeout: 30e3 + timeout: 30e3, + cacheTTL: 5 * 60 * 1000 // 5 minutes default as per https://specs.ipfs.tech/routing/http-routing-v1/#response-headers } export class DefaultDelegatedRoutingV1HttpApiClient implements DelegatedRoutingV1HttpApiClient { @@ -33,7 +34,9 @@ export class DefaultDelegatedRoutingV1HttpApiClient implements DelegatedRoutingV private readonly peerRouting: PeerRouting private readonly filterAddrs?: string[] private readonly filterProtocols?: string[] - + private readonly inFlightRequests: Map> + private cache?: Cache + private readonly cacheTTL: number /** * Create a new DelegatedContentRouting instance */ @@ -44,12 +47,25 @@ export class DefaultDelegatedRoutingV1HttpApiClient implements DelegatedRoutingV this.httpQueue = new PQueue({ concurrency: init.concurrentRequests ?? defaultValues.concurrentRequests }) + this.inFlightRequests = new Map() // Tracks in-flight requests to avoid duplicate requests this.clientUrl = url instanceof URL ? url : new URL(url) this.timeout = init.timeout ?? defaultValues.timeout this.filterAddrs = init.filterAddrs this.filterProtocols = init.filterProtocols this.contentRouting = new DelegatedRoutingV1HttpApiClientContentRouting(this) this.peerRouting = new DelegatedRoutingV1HttpApiClientPeerRouting(this) + + this.cacheTTL = init.cacheTTL ?? defaultValues.cacheTTL + const cacheEnabled = (typeof globalThis.caches !== 'undefined') && (this.cacheTTL > 0) + + if (cacheEnabled) { + log('cache enabled with ttl %d', this.cacheTTL) + globalThis.caches.open('delegated-routing-v1-cache').then(cache => { + this.cache = cache + }).catch(() => { + this.cache = undefined + }) + } } get [contentRoutingSymbol] (): ContentRouting { @@ -72,6 +88,11 @@ export class DefaultDelegatedRoutingV1HttpApiClient implements DelegatedRoutingV this.httpQueue.clear() this.shutDownController.abort() this.started = false + + // Clear the cache when stopping + if (this.cache != null) { + void this.cache.delete('delegated-routing-v1-cache') + } } async * getProviders (cid: CID, options: GetProvidersOptions = {}): AsyncGenerator { @@ -95,7 +116,7 @@ export class DefaultDelegatedRoutingV1HttpApiClient implements DelegatedRoutingV const url = new URL(`${this.clientUrl}routing/v1/providers/${cid.toString()}`) this.#addFilterParams(url, options.filterAddrs, options.filterProtocols) const getOptions = { headers: { Accept: 'application/x-ndjson' }, signal } - const res = await fetch(url, getOptions) + const res = await this.#makeRequest(url.toString(), getOptions) if (res.status === 404) { // https://specs.ipfs.tech/routing/http-routing-v1/#response-status-codes @@ -162,7 +183,7 @@ export class DefaultDelegatedRoutingV1HttpApiClient implements DelegatedRoutingV this.#addFilterParams(url, options.filterAddrs, options.filterProtocols) const getOptions = { headers: { Accept: 'application/x-ndjson' }, signal } - const res = await fetch(url, getOptions) + const res = await this.#makeRequest(url.toString(), getOptions) if (res.status === 404) { // https://specs.ipfs.tech/routing/http-routing-v1/#response-status-codes @@ -228,7 +249,7 @@ export class DefaultDelegatedRoutingV1HttpApiClient implements DelegatedRoutingV await onStart.promise const getOptions = { headers: { Accept: 'application/vnd.ipfs.ipns-record' }, signal } - const res = await fetch(resource, getOptions) + const res = await this.#makeRequest(resource, getOptions) log('getIPNS GET %s %d', resource, res.status) @@ -290,7 +311,7 @@ export class DefaultDelegatedRoutingV1HttpApiClient implements DelegatedRoutingV const body = marshalIPNSRecord(record) const getOptions = { method: 'PUT', headers: { 'Content-Type': 'application/vnd.ipfs.ipns-record' }, body, signal } - const res = await fetch(resource, getOptions) + const res = await this.#makeRequest(resource, getOptions) log('putIPNS PUT %s %d', resource, res.status) @@ -349,4 +370,65 @@ export class DefaultDelegatedRoutingV1HttpApiClient implements DelegatedRoutingV } } } + + /** + * makeRequest has two features: + * - Ensures only one concurrent request is made for the same URL + * - Caches GET requests if the Cache API is available + */ + async #makeRequest (url: string, options: RequestInit): Promise { + const requestMethod = options.method ?? 'GET' + const key = `${requestMethod}-${url}` + + // Only try to use cache for GET requests + if (requestMethod === 'GET') { + const cachedResponse = await this.cache?.match(url) + if (cachedResponse != null) { + // Check if the cached response has expired + const expires = parseInt(cachedResponse.headers.get('x-cache-expires') ?? '0', 10) + if (expires > Date.now()) { + log('returning cached response for %s', key) + return cachedResponse + } else { + // Remove expired response from cache + await this.cache?.delete(url) + } + } + } + + // Check if there's already an in-flight request for this URL + const existingRequest = this.inFlightRequests.get(key) + if (existingRequest != null) { + const response = await existingRequest + log('deduplicating outgoing request for %s', key) + return response.clone() + } + + // Create new request and track it + const requestPromise = fetch(url, options).then(async response => { + // Only cache successful GET requests + if (this.cache != null && response.ok && requestMethod === 'GET') { + const expires = Date.now() + this.cacheTTL + const headers = new Headers(response.headers) + headers.set('x-cache-expires', expires.toString()) + + // Create a new response with expiration header + const cachedResponse = new Response(response.clone().body, { + status: response.status, + statusText: response.statusText, + headers + }) + + await this.cache.put(url, cachedResponse) + } + return response + }).finally(() => { + // Clean up the tracked request when it completes + this.inFlightRequests.delete(key) + }) + + this.inFlightRequests.set(key, requestPromise) + const response = await requestPromise + return response + } } diff --git a/packages/client/src/index.ts b/packages/client/src/index.ts index 67b81e9..393db52 100644 --- a/packages/client/src/index.ts +++ b/packages/client/src/index.ts @@ -39,6 +39,20 @@ * await libp2p.peerRouting.findPeer(peerIdFromString('QmFoo')) * ``` * + * ### Caching + * + * By default, the client caches successful (200) delegated routing responses in browser environments (that support the [Cache API](https://developer.mozilla.org/en-US/docs/Web/API/Cache)) for a duration of 5 minutes. The client does this by adding an `x-cache-expires` header to the response object. + * + * If caching is enabled, the client will cache responses for the duration of `cacheTTL` milliseconds. + * If `cacheTTL` is 0, caching is disabled: + * + * @example + * + * ```typescript + * // disable caching + * const client = createDelegatedRoutingV1HttpApiClient('https://example.org', { cacheTTL: 0 }) + * ``` + * * ### Filtering with IPIP-484 * * The client can be configured to pass filter options to the delegated routing server as defined in IPIP-484. @@ -124,6 +138,12 @@ export interface DelegatedRoutingV1HttpApiClientInit extends FilterOptions { * How long a request is allowed to take in ms (default: 30 seconds) */ timeout?: number + + /** + * How long to cache responses for in ms (default: 5 minutes) + * If 0, caching is disabled + */ + cacheTTL?: number } export interface GetIPNSOptions extends AbortOptions { diff --git a/packages/client/test/index.spec.ts b/packages/client/test/index.spec.ts index 70e5d90..2b6d96f 100644 --- a/packages/client/test/index.spec.ts +++ b/packages/client/test/index.spec.ts @@ -7,6 +7,7 @@ import { expect } from 'aegir/chai' import { createIPNSRecord, marshalIPNSRecord } from 'ipns' import all from 'it-all' import { CID } from 'multiformats/cid' +import { isBrowser } from 'wherearewe' import { createDelegatedRoutingV1HttpApiClient, type DelegatedRoutingV1HttpApiClient } from '../src/index.js' if (process.env.ECHO_SERVER == null) { @@ -14,12 +15,13 @@ if (process.env.ECHO_SERVER == null) { } const serverUrl = process.env.ECHO_SERVER +const itBrowser = (isBrowser ? it : it.skip) describe('delegated-routing-v1-http-api-client', () => { let client: DelegatedRoutingV1HttpApiClient beforeEach(() => { - client = createDelegatedRoutingV1HttpApiClient(new URL(serverUrl)) + client = createDelegatedRoutingV1HttpApiClient(new URL(serverUrl), { cacheTTL: 0 }) }) afterEach(async () => { @@ -302,4 +304,97 @@ describe('delegated-routing-v1-http-api-client', () => { const receivedRecord = new Uint8Array(await res.arrayBuffer()) expect(marshalIPNSRecord(record)).to.equalBytes(receivedRecord) }) + + it('should deduplicate concurrent requests to the same URL', async () => { + const cid = CID.parse('QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn') + const providers = [{ + Protocol: 'transport-bitswap', + Schema: 'bitswap', + Metadata: 'gBI=', + ID: (await generateKeyPair('Ed25519')).publicKey.toString(), + Addrs: ['/ip4/41.41.41.41/tcp/1234'] + }] + + // load providers for the router to fetch + await fetch(`${process.env.ECHO_SERVER}/add-providers/${cid.toString()}`, { + method: 'POST', + body: providers.map(prov => JSON.stringify(prov)).join('\n') + }) + + // Reset call count before our test + await fetch(`${process.env.ECHO_SERVER}/reset-call-count`) + + // Make multiple concurrent requests + const results = await Promise.all([ + all(client.getProviders(cid)), + all(client.getProviders(cid)), + all(client.getProviders(cid)), + all(client.getProviders(cid)) + ]) + + // Get the number of times the server was called + const callCountRes = await fetch(`${process.env.ECHO_SERVER}/get-call-count`) + const callCount = parseInt(await callCountRes.text(), 10) + + // Verify server was only called once + expect(callCount).to.equal(1) + + // Verify all results are the same + results.forEach(resultProviders => { + expect(resultProviders.map(prov => ({ + id: prov.ID.toString(), + // eslint-disable-next-line max-nested-callbacks + addrs: prov.Addrs?.map(ma => ma.toString()) + }))).to.deep.equal(providers.map(prov => ({ + id: prov.ID, + addrs: prov.Addrs + }))) + }) + }) + + itBrowser('should respect cache TTL', async () => { + const shortTTL = 100 // 100ms TTL for testing + const clientWithShortTTL = createDelegatedRoutingV1HttpApiClient(new URL(serverUrl), { + cacheTTL: shortTTL + }) + + const cid = CID.parse('QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn') + const providers = [{ + Protocol: 'transport-bitswap', + Schema: 'bitswap', + Metadata: 'gBI=', + ID: (await generateKeyPair('Ed25519')).publicKey.toString(), + Addrs: ['/ip4/41.41.41.41/tcp/1234'] + }] + + // load providers for the router to fetch + await fetch(`${process.env.ECHO_SERVER}/add-providers/${cid.toString()}`, { + method: 'POST', + body: providers.map(prov => JSON.stringify(prov)).join('\n') + }) + + // Reset call count + await fetch(`${process.env.ECHO_SERVER}/reset-call-count`) + + // First request should hit the server + await all(clientWithShortTTL.getProviders(cid)) + + // Second and third request should use cache + await all(clientWithShortTTL.getProviders(cid)) + await all(clientWithShortTTL.getProviders(cid)) + + let callCount = parseInt(await (await fetch(`${process.env.ECHO_SERVER}/get-call-count`)).text(), 10) + expect(callCount).to.equal(1) // Only one server call so far + + // Wait for cache to expire + await new Promise(resolve => setTimeout(resolve, shortTTL + 50)) + + // This request should hit the server again because cache expired + await all(clientWithShortTTL.getProviders(cid)) + + callCount = parseInt(await (await fetch(`${process.env.ECHO_SERVER}/get-call-count`)).text(), 10) + expect(callCount).to.equal(2) // Second server call after cache expired + + clientWithShortTTL.stop() + }) }) diff --git a/packages/client/test/routings.spec.ts b/packages/client/test/routings.spec.ts index 354c576..fb7cb62 100644 --- a/packages/client/test/routings.spec.ts +++ b/packages/client/test/routings.spec.ts @@ -23,7 +23,7 @@ describe('libp2p content-routing', () => { let client: DelegatedRoutingV1HttpApiClient beforeEach(() => { - client = createDelegatedRoutingV1HttpApiClient(new URL(serverUrl)) + client = createDelegatedRoutingV1HttpApiClient(new URL(serverUrl), { cacheTTL: 0 }) }) afterEach(async () => { diff --git a/packages/interop/package.json b/packages/interop/package.json index 0626662..09fb6c5 100644 --- a/packages/interop/package.json +++ b/packages/interop/package.json @@ -125,7 +125,7 @@ "@libp2p/kad-dht": "^14.0.1", "aegir": "^45.0.1", "fastify": "^5.0.0", - "helia": "next", + "helia": "^5.1.0", "ipns": "^10.0.0", "it-first": "^3.0.6", "multiformats": "^13.3.0"