Skip to content

Commit af4b426

Browse files
authored
fix: handle aborted requests properly (#241)
* fix: abort signal handling when walking the path * fix: remove all throwIfAborted calls * chore: normalize check for aborted signal
1 parent 95397aa commit af4b426

File tree

7 files changed

+69
-10
lines changed

7 files changed

+69
-10
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/* eslint-env mocha */
2+
import { createVerifiedFetch } from '@helia/verified-fetch'
3+
import { expect } from 'aegir/chai'
4+
import type { VerifiedFetch } from '@helia/verified-fetch'
5+
6+
describe('verified-fetch abort handling', () => {
7+
let verifiedFetch: VerifiedFetch
8+
before(async () => {
9+
if (process.env.KUBO_DIRECT_RETRIEVAL_ROUTER == null || process.env.KUBO_DIRECT_RETRIEVAL_ROUTER === '') {
10+
throw new Error('KUBO_DIRECT_RETRIEVAL_ROUTER environment variable is required')
11+
}
12+
13+
verifiedFetch = await createVerifiedFetch({
14+
gateways: [process.env.KUBO_DIRECT_RETRIEVAL_ROUTER],
15+
routers: [process.env.KUBO_DIRECT_RETRIEVAL_ROUTER],
16+
allowInsecure: true,
17+
allowLocal: true
18+
})
19+
})
20+
21+
after(async () => {
22+
await verifiedFetch.stop()
23+
})
24+
25+
it('should handle aborts properly', async function () {
26+
this.timeout(2000)
27+
const controller = new AbortController()
28+
const timeout = setTimeout(() => {
29+
controller.abort()
30+
}, 70)
31+
32+
const fetchPromise = verifiedFetch('ipfs://QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm/1 - Barrel - Part 1/1 - Barrel - Part 1 - alt.txt', {
33+
signal: controller.signal
34+
})
35+
await expect(fetchPromise).to.eventually.be.rejected.with.property('name', 'AbortError')
36+
clearTimeout(timeout)
37+
})
38+
})

packages/verified-fetch/src/plugins/plugin-handle-dag-pb.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { unixfs } from '@helia/unixfs'
22
import { code as dagPbCode } from '@ipld/dag-pb'
3+
import { AbortError } from '@libp2p/interface'
34
import { exporter } from 'ipfs-unixfs-exporter'
45
import { CustomProgressEvent } from 'progress-events'
56
import { getStreamFromAsyncIterable } from '../utils/get-stream-from-async-iterable.js'
@@ -98,8 +99,10 @@ export class DagPbPlugin extends BasePlugin {
9899
path = rootFilePath
99100
resolvedCID = entry.cid
100101
} catch (err: any) {
102+
if (options?.signal?.aborted) {
103+
throw new AbortError(options?.signal?.reason)
104+
}
101105
this.log.error('error loading path %c/%s', dirCid, rootFilePath, err)
102-
options?.signal?.throwIfAborted()
103106
context.isDirectory = true
104107
context.directoryEntries = []
105108
context.modified++
@@ -161,7 +164,9 @@ export class DagPbPlugin extends BasePlugin {
161164

162165
return response
163166
} catch (err: any) {
164-
options?.signal?.throwIfAborted()
167+
if (options?.signal?.aborted) {
168+
throw new AbortError(options?.signal?.reason)
169+
}
165170
log.error('error streaming %c/%s', cid, path, err)
166171
if (byteRangeContext.isRangeRequest && err.code === 'ERR_INVALID_PARAMS') {
167172
return badRangeResponse(resource)

packages/verified-fetch/src/plugins/plugin-handle-dag-walk.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ export class DagWalkPlugin extends BasePlugin {
2929
const { cid, resource, options, withServerTiming = false } = context
3030
const { getBlockstore, handleServerTiming } = this.pluginOptions
3131
const blockstore = getBlockstore(cid, resource, options?.session ?? true, options)
32+
3233
// TODO: migrate handlePathWalking into this plugin
3334
const pathDetails = await handleServerTiming('path-walking', '', async () => handlePathWalking({ ...context, blockstore, log: this.log }), withServerTiming)
3435

35-
context.modified++
3636
if (pathDetails instanceof Response) {
3737
this.log.trace('path walking failed')
3838

@@ -42,10 +42,11 @@ export class DagWalkPlugin extends BasePlugin {
4242
return pathDetails
4343
}
4444

45-
// some error walking the path
45+
// some other error walking the path (codec doesn't support pathing, etc..), let the next plugin try to handle it
4646
return null
4747
}
4848

49+
context.modified++
4950
context.pathDetails = pathDetails
5051

5152
return null

packages/verified-fetch/src/utils/get-stream-from-async-iterable.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ export async function getStreamFromAsyncIterable (iterator: AsyncIterable<Uint8A
2525
},
2626
async pull (controller) {
2727
const { value, done } = await reader.next()
28-
if (options?.signal?.aborted === true) {
28+
if (options?.signal?.aborted) {
2929
controller.error(new AbortError(options.signal.reason ?? 'signal aborted by user'))
3030
controller.close()
3131
return

packages/verified-fetch/src/utils/parse-url-string.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { AbortError } from '@libp2p/interface'
12
import { CID } from 'multiformats/cid'
23
import { getPeerIdFromString } from './get-peer-id-from-string.js'
34
import { serverTiming } from './server-timing.js'
@@ -212,7 +213,9 @@ export async function parseUrlString ({ urlString, ipns, logger, withServerTimin
212213
resolvedPath = resolveResult?.path
213214
log.trace('resolved %s to %c', cidOrPeerIdOrDnsLink, cid)
214215
} catch (err) {
215-
options?.signal?.throwIfAborted()
216+
if (options?.signal?.aborted) {
217+
throw new AbortError(options?.signal?.reason)
218+
}
216219
if (peerId == null) {
217220
log.error('could not parse PeerId string "%s"', cidOrPeerIdOrDnsLink, err)
218221
errors.push(new TypeError(`Could not parse PeerId in ipns url "${cidOrPeerIdOrDnsLink}", ${(err as Error).message}`))
@@ -249,7 +252,10 @@ export async function parseUrlString ({ urlString, ipns, logger, withServerTimin
249252
resolvedPath = resolveResult?.path
250253
log.trace('resolved %s to %c', decodedDnsLinkLabel, cid)
251254
} catch (err: any) {
252-
options?.signal?.throwIfAborted()
255+
// eslint-disable-next-line max-depth
256+
if (options?.signal?.aborted) {
257+
throw new AbortError(options?.signal?.reason)
258+
}
253259
log.error('could not resolve DnsLink for "%s"', cidOrPeerIdOrDnsLink, err)
254260
errors.push(err)
255261
}

packages/verified-fetch/src/utils/walk-path.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { DoesNotExistError } from '@helia/unixfs/errors'
2+
import { AbortError } from '@libp2p/interface'
23
import { walkPath as exporterWalk } from 'ipfs-unixfs-exporter'
34
import { badGatewayResponse, notFoundResponse } from './responses.js'
45
import type { PluginContext } from '../plugins/types.js'
@@ -53,7 +54,10 @@ export async function handlePathWalking ({ cid, path, resource, options, blockst
5354
try {
5455
return await walkPath(blockstore, `${cid.toString()}/${path}`, options)
5556
} catch (err: any) {
56-
options?.signal?.throwIfAborted()
57+
if (options?.signal?.aborted) {
58+
throw new AbortError(options?.signal?.reason)
59+
}
60+
5761
if (['ERR_NO_PROP', 'ERR_NO_TERMINAL_ELEMENT', 'ERR_NOT_FOUND'].includes(err.code)) {
5862
return notFoundResponse(resource)
5963
}

packages/verified-fetch/src/verified-fetch.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { ipns as heliaIpns } from '@helia/ipns'
2+
import { AbortError } from '@libp2p/interface'
23
import { prefixLogger } from '@libp2p/logger'
34
import { CustomProgressEvent } from 'progress-events'
45
import QuickLRU from 'quick-lru'
@@ -282,7 +283,9 @@ export class VerifiedFetch {
282283
break
283284
}
284285
} catch (err: any) {
285-
context.options?.signal?.throwIfAborted()
286+
if (context.options?.signal?.aborted) {
287+
throw new AbortError(context.options?.signal?.reason)
288+
}
286289
this.log.error('Error in plugin:', plugin.constructor.name, err)
287290
// if fatal, short-circuit the pipeline
288291
if (err.name === 'PluginFatalError') {
@@ -341,7 +344,9 @@ export class VerifiedFetch {
341344
parsedResult = await this.handleServerTiming('parse-resource', '', async () => parseResource(resource, { ipns: this.ipns, logger: this.helia.logger }, { withServerTiming, ...options }), withServerTiming)
342345
this.serverTimingHeaders.push(...parsedResult.serverTimings.map(({ header }) => header))
343346
} catch (err: any) {
344-
options?.signal?.throwIfAborted()
347+
if (options?.signal?.aborted) {
348+
throw new AbortError(options?.signal?.reason)
349+
}
345350
this.log.error('error parsing resource %s', resource, err)
346351

347352
return this.handleFinalResponse(badRequestResponse(resource.toString(), err))

0 commit comments

Comments
 (0)