Skip to content

Commit d3183aa

Browse files
authored
fix(http): return response early in JS before waiting for chunks on the rust side (#2522)
1 parent 1e9e496 commit d3183aa

File tree

6 files changed

+81
-52
lines changed

6 files changed

+81
-52
lines changed

.changes/http-stream-support.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"http": patch
3+
"http-js": patch
4+
---
5+
6+
Fix `fetch` blocking until the whole response is read even if it was a streaming response.

plugins/http/api-iife.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

plugins/http/guest-js/index.ts

Lines changed: 39 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ export interface DangerousSettings {
106106
acceptInvalidHostnames?: boolean
107107
}
108108

109-
const ERROR_REQUEST_CANCELLED = 'Request canceled'
109+
const ERROR_REQUEST_CANCELLED = 'Request cancelled'
110110

111111
/**
112112
* Fetch a resource from the network. It returns a `Promise` that resolves to the
@@ -186,35 +186,6 @@ export async function fetch(
186186
throw new Error(ERROR_REQUEST_CANCELLED)
187187
}
188188

189-
const streamChannel = new Channel<ArrayBuffer | number[]>()
190-
191-
const readableStreamBody = new ReadableStream({
192-
start: (controller) => {
193-
streamChannel.onmessage = (res: ArrayBuffer | number[]) => {
194-
// close early if aborted
195-
if (signal?.aborted) {
196-
controller.error(ERROR_REQUEST_CANCELLED)
197-
controller.close()
198-
return
199-
}
200-
201-
// close when the signal to close (an empty chunk)
202-
// is sent from the IPC.
203-
if (
204-
res instanceof ArrayBuffer ? res.byteLength == 0 : res.length == 0
205-
) {
206-
controller.close()
207-
return
208-
}
209-
210-
// the content conversion (like .text(), .json(), etc.) in Response
211-
// must have Uint8Array as its content, else it will
212-
// have untraceable error that's hard to debug.
213-
controller.enqueue(new Uint8Array(res))
214-
}
215-
}
216-
})
217-
218189
const rid = await invoke<number>('plugin:http|fetch', {
219190
clientConfig: {
220191
method: req.method,
@@ -225,8 +196,7 @@ export async function fetch(
225196
connectTimeout,
226197
proxy,
227198
danger
228-
},
229-
streamChannel
199+
}
230200
})
231201

232202
const abort = () => invoke('plugin:http|fetch_cancel', { rid })
@@ -253,11 +223,47 @@ export async function fetch(
253223
status,
254224
statusText,
255225
url,
256-
headers: responseHeaders
226+
headers: responseHeaders,
227+
rid: responseRid
257228
} = await invoke<FetchSendResponse>('plugin:http|fetch_send', {
258229
rid
259230
})
260231

232+
const readableStreamBody = new ReadableStream({
233+
start: (controller) => {
234+
const streamChannel = new Channel<ArrayBuffer | number[]>()
235+
streamChannel.onmessage = (res: ArrayBuffer | number[]) => {
236+
// close early if aborted
237+
if (signal?.aborted) {
238+
controller.error(ERROR_REQUEST_CANCELLED)
239+
return
240+
}
241+
242+
// close when the signal to close (an empty chunk)
243+
// is sent from the IPC.
244+
if (
245+
res instanceof ArrayBuffer ? res.byteLength == 0 : res.length == 0
246+
) {
247+
controller.close()
248+
return
249+
}
250+
251+
// the content conversion (like .text(), .json(), etc.) in Response
252+
// must have Uint8Array as its content, else it will
253+
// have untraceable error that's hard to debug.
254+
controller.enqueue(new Uint8Array(res))
255+
}
256+
257+
// run a non-blocking body stream fetch
258+
invoke('plugin:http|fetch_read_body', {
259+
rid: responseRid,
260+
streamChannel
261+
}).catch((e) => {
262+
controller.error(e)
263+
})
264+
}
265+
})
266+
261267
const res = new Response(readableStreamBody, {
262268
status,
263269
statusText

plugins/http/src/commands.rs

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ use crate::{
2222

2323
const HTTP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);
2424

25+
struct ReqwestResponse(reqwest::Response);
26+
impl tauri::Resource for ReqwestResponse {}
27+
2528
type CancelableResponseResult = Result<reqwest::Response>;
2629
type CancelableResponseFuture =
2730
Pin<Box<dyn Future<Output = CancelableResponseResult> + Send + Sync>>;
@@ -178,7 +181,6 @@ pub async fn fetch<R: Runtime>(
178181
client_config: ClientConfig,
179182
command_scope: CommandScope<Entry>,
180183
global_scope: GlobalScope<Entry>,
181-
stream_channel: Channel<tauri::ipc::InvokeResponseBody>,
182184
) -> crate::Result<ResourceId> {
183185
let ClientConfig {
184186
method,
@@ -312,20 +314,7 @@ pub async fn fetch<R: Runtime>(
312314
#[cfg(feature = "tracing")]
313315
tracing::trace!("{:?}", request);
314316

315-
let fut = async move {
316-
let mut res = request.send().await?;
317-
318-
// send response through IPC channel
319-
while let Some(chunk) = res.chunk().await? {
320-
stream_channel.send(tauri::ipc::InvokeResponseBody::Raw(chunk.to_vec()))?;
321-
}
322-
323-
// send empty vector when done
324-
stream_channel.send(tauri::ipc::InvokeResponseBody::Raw(Vec::new()))?;
325-
326-
// return that response
327-
Ok(res)
328-
};
317+
let fut = async move { request.send().await.map_err(Into::into) };
329318

330319
let mut resources_table = webview.resources_table();
331320
let rid = resources_table.add_request(Box::pin(fut));
@@ -370,7 +359,7 @@ pub fn fetch_cancel<R: Runtime>(webview: Webview<R>, rid: ResourceId) -> crate::
370359
Ok(())
371360
}
372361

373-
#[tauri::command]
362+
#[command]
374363
pub async fn fetch_send<R: Runtime>(
375364
webview: Webview<R>,
376365
rid: ResourceId,
@@ -410,6 +399,9 @@ pub async fn fetch_send<R: Runtime>(
410399
));
411400
}
412401

402+
let mut resources_table = webview.resources_table();
403+
let rid = resources_table.add(ReqwestResponse(res));
404+
413405
Ok(FetchResponse {
414406
status: status.as_u16(),
415407
status_text: status.canonical_reason().unwrap_or_default().to_string(),
@@ -419,6 +411,30 @@ pub async fn fetch_send<R: Runtime>(
419411
})
420412
}
421413

414+
#[command]
415+
pub async fn fetch_read_body<R: Runtime>(
416+
webview: Webview<R>,
417+
rid: ResourceId,
418+
stream_channel: Channel<tauri::ipc::InvokeResponseBody>,
419+
) -> crate::Result<()> {
420+
let res = {
421+
let mut resources_table = webview.resources_table();
422+
resources_table.take::<ReqwestResponse>(rid)?
423+
};
424+
425+
let mut res = Arc::into_inner(res).unwrap().0;
426+
427+
// send response through IPC channel
428+
while let Some(chunk) = res.chunk().await? {
429+
stream_channel.send(tauri::ipc::InvokeResponseBody::Raw(chunk.to_vec()))?;
430+
}
431+
432+
// send empty vector when done
433+
stream_channel.send(tauri::ipc::InvokeResponseBody::Raw(Vec::new()))?;
434+
435+
Ok(())
436+
}
437+
422438
// forbidden headers per fetch spec https://fetch.spec.whatwg.org/#terminology-headers
423439
#[cfg(not(feature = "unsafe-headers"))]
424440
fn is_unsafe_header(header: &HeaderName) -> bool {

plugins/http/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ pub fn init<R: Runtime>() -> TauriPlugin<R> {
3636
.invoke_handler(tauri::generate_handler![
3737
commands::fetch,
3838
commands::fetch_cancel,
39-
commands::fetch_send
39+
commands::fetch_send,
40+
commands::fetch_read_body
4041
])
4142
.build()
4243
}

pnpm-lock.yaml

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)