Skip to content

Commit 6b1054d

Browse files
adrieljssFabianLars
authored andcommitted
HTTP add stream support (tauri-apps#2479)
* feat: add stream support * feat: add stream support * Revert "feat: add stream support" This reverts commit 5edea81. * feat: add stream support * Discard changes to pnpm-lock.yaml * Discard changes to plugins/http/package.json * fix(stream): change IPC packet * fix: update stream message guest-js * fix: return early when aborted * fix: use InvokeResponseBody as packet * fix: remove serde_bytes * fix: remove reqwest response * fix: content conversion bug * fix: remove ReqwestResponses along with its implementations * formatting and update changelog * build api-iife.js --------- Co-authored-by: Fabian-Lars <[email protected]>
1 parent 3563609 commit 6b1054d

File tree

6 files changed

+67
-51
lines changed

6 files changed

+67
-51
lines changed

.changes/http-stream-support.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"http": minor
3+
"http-js": minor
4+
---
5+
6+
Add stream support for HTTP stream responses.

Cargo.lock

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

plugins/http/api-iife.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

plugins/http/guest-js/index.ts

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
* @module
2727
*/
2828

29-
import { invoke } from '@tauri-apps/api/core'
29+
import { Channel, invoke } from '@tauri-apps/api/core'
3030

3131
/**
3232
* Configuration of a proxy that a Client should pass requests to.
@@ -186,6 +186,35 @@ export async function fetch(
186186
throw new Error(ERROR_REQUEST_CANCELLED)
187187
}
188188

189+
const streamChannel = new Channel<ArrayBuffer | number[]>()
190+
191+
const readableStreamBody = new ReadableStream({
192+
start: (controller) => {
193+
streamChannel.onmessage = (res: ArrayBuffer | number[]) => {
194+
// close early if aborted
195+
if (signal?.aborted) {
196+
controller.error(ERROR_REQUEST_CANCELLED)
197+
controller.close()
198+
return
199+
}
200+
201+
// close when the signal to close (an empty chunk)
202+
// is sent from the IPC.
203+
if (
204+
res instanceof ArrayBuffer ? res.byteLength == 0 : res.length == 0
205+
) {
206+
controller.close()
207+
return
208+
}
209+
210+
// the content conversion (like .text(), .json(), etc.) in Response
211+
// must have Uint8Array as its content, else it will
212+
// have untraceable error that's hard to debug.
213+
controller.enqueue(new Uint8Array(res))
214+
}
215+
}
216+
})
217+
189218
const rid = await invoke<number>('plugin:http|fetch', {
190219
clientConfig: {
191220
method: req.method,
@@ -196,7 +225,8 @@ export async function fetch(
196225
connectTimeout,
197226
proxy,
198227
danger
199-
}
228+
},
229+
streamChannel
200230
})
201231

202232
const abort = () => invoke('plugin:http|fetch_cancel', { rid })
@@ -223,30 +253,15 @@ export async function fetch(
223253
status,
224254
statusText,
225255
url,
226-
headers: responseHeaders,
227-
rid: responseRid
256+
headers: responseHeaders
228257
} = await invoke<FetchSendResponse>('plugin:http|fetch_send', {
229258
rid
230259
})
231260

232-
const body = await invoke<ArrayBuffer | number[]>(
233-
'plugin:http|fetch_read_body',
234-
{
235-
rid: responseRid
236-
}
237-
)
238-
239-
const res = new Response(
240-
body instanceof ArrayBuffer && body.byteLength !== 0
241-
? body
242-
: body instanceof Array && body.length > 0
243-
? new Uint8Array(body)
244-
: null,
245-
{
246-
status,
247-
statusText
248-
}
249-
)
261+
const res = new Response(readableStreamBody, {
262+
status,
263+
statusText
264+
})
250265

251266
// url and headers are read only properties
252267
// but seems like we can set them like this

plugins/http/src/commands.rs

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use serde::{Deserialize, Serialize};
1010
use tauri::{
1111
async_runtime::Mutex,
1212
command,
13-
ipc::{CommandScope, GlobalScope},
13+
ipc::{Channel, CommandScope, GlobalScope},
1414
Manager, ResourceId, ResourceTable, Runtime, State, Webview,
1515
};
1616
use tokio::sync::oneshot::{channel, Receiver, Sender};
@@ -22,9 +22,6 @@ use crate::{
2222

2323
const HTTP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);
2424

25-
struct ReqwestResponse(reqwest::Response);
26-
impl tauri::Resource for ReqwestResponse {}
27-
2825
type CancelableResponseResult = Result<reqwest::Response>;
2926
type CancelableResponseFuture =
3027
Pin<Box<dyn Future<Output = CancelableResponseResult> + Send + Sync>>;
@@ -181,6 +178,7 @@ pub async fn fetch<R: Runtime>(
181178
client_config: ClientConfig,
182179
command_scope: CommandScope<Entry>,
183180
global_scope: GlobalScope<Entry>,
181+
stream_channel: Channel<tauri::ipc::InvokeResponseBody>,
184182
) -> crate::Result<ResourceId> {
185183
let ClientConfig {
186184
method,
@@ -314,7 +312,21 @@ pub async fn fetch<R: Runtime>(
314312
#[cfg(feature = "tracing")]
315313
tracing::trace!("{:?}", request);
316314

317-
let fut = async move { request.send().await.map_err(Into::into) };
315+
let fut = async move {
316+
let mut res = request.send().await?;
317+
318+
// send response through IPC channel
319+
while let Some(chunk) = res.chunk().await? {
320+
stream_channel.send(tauri::ipc::InvokeResponseBody::Raw(chunk.to_vec()))?;
321+
}
322+
323+
// send empty vector when done
324+
stream_channel.send(tauri::ipc::InvokeResponseBody::Raw(Vec::new()))?;
325+
326+
// return that response
327+
Ok(res)
328+
};
329+
318330
let mut resources_table = webview.resources_table();
319331
let rid = resources_table.add_request(Box::pin(fut));
320332

@@ -398,9 +410,6 @@ pub async fn fetch_send<R: Runtime>(
398410
));
399411
}
400412

401-
let mut resources_table = webview.resources_table();
402-
let rid = resources_table.add(ReqwestResponse(res));
403-
404413
Ok(FetchResponse {
405414
status: status.as_u16(),
406415
status_text: status.canonical_reason().unwrap_or_default().to_string(),
@@ -410,19 +419,6 @@ pub async fn fetch_send<R: Runtime>(
410419
})
411420
}
412421

413-
#[tauri::command]
414-
pub(crate) async fn fetch_read_body<R: Runtime>(
415-
webview: Webview<R>,
416-
rid: ResourceId,
417-
) -> crate::Result<tauri::ipc::Response> {
418-
let res = {
419-
let mut resources_table = webview.resources_table();
420-
resources_table.take::<ReqwestResponse>(rid)?
421-
};
422-
let res = Arc::into_inner(res).unwrap().0;
423-
Ok(tauri::ipc::Response::new(res.bytes().await?.to_vec()))
424-
}
425-
426422
// forbidden headers per fetch spec https://fetch.spec.whatwg.org/#terminology-headers
427423
#[cfg(not(feature = "unsafe-headers"))]
428424
fn is_unsafe_header(header: &HeaderName) -> bool {

plugins/http/src/lib.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,7 @@ pub fn init<R: Runtime>() -> TauriPlugin<R> {
3636
.invoke_handler(tauri::generate_handler![
3737
commands::fetch,
3838
commands::fetch_cancel,
39-
commands::fetch_send,
40-
commands::fetch_read_body,
39+
commands::fetch_send
4140
])
4241
.build()
4342
}

0 commit comments

Comments
 (0)