Skip to content

Commit 1f1c3ca

Browse files
feat: resume download on network errors (#133)
* fix: retry on network errors * wrap in error * format * . * test interruption * short and fast * test the tests (1) * test the tests (2) * test the tests (3) * test the tests (4) * test the tests (5) * revert * try a few times * rename * thanks copilot * less spam * hmm ok Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * warn * fewer repeats * still works * spaces * , * test: reduce retryDelay * cleanup * jsdoc * byteLength - thanks copilot --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 93caaf0 commit 1f1c3ca

File tree

4 files changed

+228
-16
lines changed

4 files changed

+228
-16
lines changed

src/utils/image.js

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import { useEffect, useRef } from 'react'
22
import { XzReadableStream } from 'xz-decompress'
33

4+
import { fetchStream } from './stream'
5+
46
/**
57
* Progress callback
68
*
@@ -49,21 +51,7 @@ export class ImageManager {
4951
}
5052

5153
console.debug(`[ImageManager] Downloading ${image.name} from ${archiveUrl}`)
52-
const response = await fetch(archiveUrl, { mode: 'cors' })
53-
if (!response.ok) {
54-
throw new Error(`Fetch failed: ${response.status} ${response.statusText}`)
55-
}
56-
57-
const contentLength = +response.headers.get('Content-Length')
58-
let receivedLength = 0
59-
const transform = new TransformStream({
60-
transform(chunk, controller) {
61-
receivedLength += chunk.byteLength
62-
onProgress?.(receivedLength / contentLength)
63-
controller.enqueue(chunk)
64-
},
65-
})
66-
let stream = response.body.pipeThrough(transform)
54+
let stream = await fetchStream(archiveUrl, { mode: 'cors' }, { onProgress })
6755
try {
6856
if (image.compressed) {
6957
stream = new XzReadableStream(stream)

src/utils/manifest.test.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import config from '../config'
44
import { ImageManager } from './image'
55
import { getManifest } from './manifest'
66

7+
const CI = import.meta.env.CI
78
const MANIFEST_BRANCH = import.meta.env.MANIFEST_BRANCH
89

910
const imageManager = new ImageManager()
@@ -64,7 +65,10 @@ for (const [branch, manifestUrl] of Object.entries(config.manifests)) {
6465
}
6566
})
6667

67-
test.skipIf(big && !MANIFEST_BRANCH)('download', { timeout: (big ? 11 * 60 : 20) * 1000 }, async () => {
68+
test.skipIf(big && !MANIFEST_BRANCH)('download', {
69+
timeout: (big ? 11 * 60 : 20) * 1000,
70+
repeats: CI && !MANIFEST_BRANCH ? 2 : 1,
71+
}, async () => {
6872
await imageManager.downloadImage(image)
6973
})
7074
})

src/utils/stream.js

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/**
2+
* @param {string|URL} url
3+
* @param {RequestInit} [requestOptions]
4+
* @param {object} [options]
5+
* @param {number} options.maxRetries
6+
* @param {number} options.retryDelay
7+
*/
8+
export async function fetchStream(url, requestOptions = {}, options = {}) {
9+
const maxRetries = options.maxRetries || 3
10+
const retryDelay = options.retryDelay || 1000
11+
12+
/** @param {Response} response */
13+
const getContentLength = (response) => {
14+
const total = response.headers.get('Content-Length')
15+
const range = response.headers.get('Content-Range')
16+
if (range) {
17+
const match = range.match(/\/(\d+)$/)
18+
if (match) return parseInt(match[1], 10)
19+
}
20+
if (total) return parseInt(total, 10)
21+
throw new Error('Content-Length not found in response headers')
22+
}
23+
24+
/**
25+
* @param {number} startByte
26+
* @param {AbortSignal} signal
27+
*/
28+
const fetchRange = async (startByte, signal) => {
29+
const headers = { ...(requestOptions.headers || {}) }
30+
if (startByte > 0) {
31+
headers['range'] = `bytes=${startByte}-`
32+
}
33+
34+
const response = await fetch(url, {
35+
...requestOptions,
36+
headers,
37+
signal
38+
})
39+
if (!response.ok && response.status !== 206 && response.status !== 200) {
40+
throw new Error(`Fetch error: ${response.status}`)
41+
}
42+
return response
43+
}
44+
45+
return new ReadableStream({
46+
start() {
47+
this.startByte = 0
48+
this.contentLength = null
49+
this.abortController = new AbortController()
50+
},
51+
52+
async pull(streamController) {
53+
for (let attempt = 0; attempt <= maxRetries; attempt++) {
54+
try {
55+
const response = await fetchRange(this.startByte, this.abortController.signal)
56+
if (this.contentLength === null) {
57+
this.contentLength = getContentLength(response)
58+
}
59+
60+
const reader = response.body.getReader()
61+
while (true) {
62+
const { done, value } = await reader.read()
63+
if (done) {
64+
streamController.close()
65+
return
66+
}
67+
68+
this.startByte += value.byteLength
69+
streamController.enqueue(value)
70+
options.onProgress?.(this.startByte / this.contentLength)
71+
}
72+
} catch (err) {
73+
console.warn(`Attempt ${attempt + 1} failed:`, err)
74+
if (attempt === maxRetries) {
75+
this.abortController.abort()
76+
streamController.error(new Error('Max retries reached', { cause: err }))
77+
return
78+
}
79+
await new Promise((res) => setTimeout(res, retryDelay))
80+
}
81+
}
82+
},
83+
84+
cancel(reason) {
85+
console.warn('Stream canceled:', reason)
86+
this.abortController.abort()
87+
},
88+
})
89+
}

src/utils/stream.test.js

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
import { describe, it, expect, vi, beforeEach } from 'vitest'
2+
import { fetchStream } from './stream'
3+
4+
const encoder = new TextEncoder()
5+
const decoder = new TextDecoder()
6+
7+
function mockResponse({
8+
body = [],
9+
headers = {},
10+
status = 200,
11+
failAfter = -1,
12+
}) {
13+
const chunks = body.map(chunk => encoder.encode(chunk))
14+
return {
15+
ok: status >= 200 && status < 300,
16+
status,
17+
headers: {
18+
get: (key) => headers[key.toLowerCase()] || null,
19+
},
20+
body: {
21+
getReader() {
22+
let readCount = 0
23+
return {
24+
read() {
25+
if (failAfter >= 0 && readCount > failAfter) {
26+
return Promise.reject(new Error('Network error'))
27+
}
28+
if (readCount < chunks.length) {
29+
return Promise.resolve({ done: false, value: chunks[readCount++] })
30+
}
31+
return Promise.resolve({ done: true })
32+
},
33+
}
34+
},
35+
}
36+
}
37+
}
38+
39+
async function readText(stream) {
40+
const reader = stream.getReader()
41+
const chunks = []
42+
while (true) {
43+
const { done, value } = await reader.read()
44+
if (done) break
45+
chunks.push(decoder.decode(value))
46+
}
47+
return chunks.join('')
48+
}
49+
50+
describe('fetchStream', () => {
51+
const retryDelay = 1
52+
53+
beforeEach(() => {
54+
global.fetch = vi.fn()
55+
})
56+
57+
it('downloads content and tracks progress', async () => {
58+
global.fetch.mockResolvedValueOnce(mockResponse({
59+
body: ['Hello ', 'World'],
60+
headers: { 'content-length': '11' },
61+
}))
62+
63+
const progress = []
64+
const stream = await fetchStream('https://example.com', {}, {
65+
onProgress: progress.push.bind(progress),
66+
})
67+
68+
expect(await readText(stream)).toBe('Hello World')
69+
expect(progress[progress.length - 1]).toBe(1)
70+
})
71+
72+
it('retries and uses Range header after failure', async () => {
73+
global.fetch
74+
.mockResolvedValueOnce(
75+
mockResponse({
76+
body: ['partial'],
77+
headers: { 'content-length': '12' },
78+
status: 500,
79+
})
80+
)
81+
.mockResolvedValueOnce(
82+
mockResponse({
83+
body: [' data'],
84+
headers: {
85+
'content-length': '12',
86+
'content-range': 'bytes 7-11/12',
87+
},
88+
})
89+
)
90+
91+
const stream = await fetchStream('https://example.com', {}, { maxRetries: 1, retryDelay })
92+
93+
expect(await readText(stream)).toBe(' data')
94+
expect(global.fetch).toHaveBeenCalledTimes(2)
95+
})
96+
97+
it('resumes download when reader fails mid-stream', async () => {
98+
global.fetch
99+
.mockResolvedValueOnce(
100+
mockResponse({
101+
body: ['First part'],
102+
headers: { 'content-length': '22' },
103+
failAfter: 0,
104+
})
105+
)
106+
.mockResolvedValueOnce(
107+
mockResponse({
108+
body: [' second part'],
109+
headers: {
110+
'content-length': '12',
111+
'content-range': 'bytes 10-21/22',
112+
},
113+
})
114+
)
115+
116+
const stream = await fetchStream('https://example.com', {}, { maxRetries: 1, retryDelay })
117+
118+
expect(await readText(stream)).toBe('First part second part')
119+
const { headers } = global.fetch.mock.calls[1][1]
120+
expect(headers['range']).toBe('bytes=10-')
121+
})
122+
123+
it('throws after max retries', async () => {
124+
global.fetch.mockRejectedValue(new Error('network error'))
125+
126+
const stream = await fetchStream('https://example.com', {}, { maxRetries: 2, retryDelay })
127+
128+
await expect(stream.getReader().read()).rejects.toThrow('Max retries reached')
129+
expect(global.fetch).toHaveBeenCalledTimes(3)
130+
})
131+
})

0 commit comments

Comments
 (0)