Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 35 additions & 22 deletions plugins/http/guest-js/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* @module
*/

import { invoke } from '@tauri-apps/api/core'
import { Channel, invoke } from '@tauri-apps/api/core'

/**
* Configuration of a proxy that a Client should pass requests to.
Expand Down Expand Up @@ -106,6 +106,20 @@ export interface DangerousSettings {
acceptInvalidHostnames?: boolean
}

/**
* Stream Packet for IPC
*/
export interface StreamMessage {
/**
* The chunk - an array of bytes sent from Rust.
*/
value?: ArrayBuffer | number[]
/**
* Is the stream done.
*/
done: boolean
}

const ERROR_REQUEST_CANCELLED = 'Request canceled'

/**
Expand Down Expand Up @@ -186,6 +200,19 @@ export async function fetch(
throw new Error(ERROR_REQUEST_CANCELLED)
}

const streamChannel = new Channel<StreamMessage>()

const readableStreamBody = new ReadableStream({
start: (controller) => {
streamChannel.onmessage = (res: StreamMessage) => {
// close early if aborted
if (signal?.aborted) controller.error(ERROR_REQUEST_CANCELLED)
if (res.done) controller.close()
controller.enqueue(res.value)
}
}
})

const rid = await invoke<number>('plugin:http|fetch', {
clientConfig: {
method: req.method,
Expand All @@ -196,7 +223,8 @@ export async function fetch(
connectTimeout,
proxy,
danger
}
},
streamChannel
})

const abort = () => invoke('plugin:http|fetch_cancel', { rid })
Expand All @@ -223,30 +251,15 @@ export async function fetch(
status,
statusText,
url,
headers: responseHeaders,
rid: responseRid
headers: responseHeaders
} = await invoke<FetchSendResponse>('plugin:http|fetch_send', {
rid
})

const body = await invoke<ArrayBuffer | number[]>(
'plugin:http|fetch_read_body',
{
rid: responseRid
}
)

const res = new Response(
body instanceof ArrayBuffer && body.byteLength !== 0
? body
: body instanceof Array && body.length > 0
? new Uint8Array(body)
: null,
{
status,
statusText
}
)
const res = new Response(readableStreamBody, {
status,
statusText
})

// url and headers are read only properties
// but seems like we can set them like this
Expand Down
44 changes: 29 additions & 15 deletions plugins/http/src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-License-Identifier: MIT


use std::{future::Future, pin::Pin, str::FromStr, sync::Arc, time::Duration};

use http::{header, HeaderMap, HeaderName, HeaderValue, Method, StatusCode};
Expand All @@ -10,7 +11,7 @@ use serde::{Deserialize, Serialize};
use tauri::{
async_runtime::Mutex,
command,
ipc::{CommandScope, GlobalScope},
ipc::{Channel, CommandScope, GlobalScope},
Manager, ResourceId, ResourceTable, Runtime, State, Webview,
};
use tokio::sync::oneshot::{channel, Receiver, Sender};
Expand All @@ -22,6 +23,8 @@ use crate::{

const HTTP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);

// reqwest::Response is never read, but might be needed for future use.
#[allow(dead_code)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove, no need to keep it around anymore

struct ReqwestResponse(reqwest::Response);
impl tauri::Resource for ReqwestResponse {}

Expand Down Expand Up @@ -126,6 +129,12 @@ pub struct BasicAuth {
password: String,
}

#[derive(Clone, Serialize)]
pub struct StreamMessage {
value: Option<Vec<u8>>,
done: bool,
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not performant because of serialization overhead.

To avoid this, you can send a tauri::Response which is just a Vec<u8> and then you could append a single byte at the end that is either 1 or 0, indicating the status of done. Or even an empty array would suffice.

I am not sure if the channel is as performant as invoke (will have to check that later) but would be better to avoid using the channel.

Copy link
Contributor Author

@adrieljss adrieljss Mar 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you're right, I will change the struct, how about using serde_bytes? Since serde could be serializing the Vec as an array of numbers.

I don't think there's any way to do streaming using invoke(?) so maybe a solution would be having a seperate function for seperate usecases, one for streaming and one for collecting all the contents at the end, but the user has to specify that and that will not be fetch-like.

Copy link
Contributor Author

@adrieljss adrieljss Mar 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though, if the server doesn't want to stream, the channel will only receive 1 chunk and that should be (around) the same performance since both uses IPC anyways.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be done with invoke and an AsyncIterator in JS:

  • The removed fetch_read_body instead of using .bytes() it will use .chunk(), effictively returning a single chunk each time it is called
  • On JS side, you can create an AsyncIterator that will be used to create the ReadableStream, on each iteration it will call fetch_read_body and append the data to the stream

Copy link
Contributor Author

@adrieljss adrieljss Mar 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would that be better though? In channels, it said that is the recommended way for streaming data (like HTTP streaming, which is our case) to the frontend. I think the invoke approach will block operations until it has returned something. While channels is just an event listener.

In my opinion, I feel like it's more intuitive rather than spamming invokes

Copy link
Member

@amrbashir amrbashir Mar 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The channel calls invoke under the hood. Channels are more ergonomic but there is room for improvement in their implementation as it uses tauri::State with a Mutex unlike what I am proposing which would use a Mutex only until the request is sent, after that the body could be read without any Mutexes.

For now, let's use the channel approach and I can improve on it later (once I confirm that there is mutex overhead involved).

My main blocker for this PR is the use of serialization which we can avoid even with channels.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes okay, I will fix that 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amrbashir can you check the new commit I did? I can't use tauri::Response inside the channel because it doesn't impl Clone and the compiler doesn't allow me to. I used serde_bytes to try and make it as efficient as possible, instead of just sending an array of numbers.

Copy link
Member

@amrbashir amrbashir Mar 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That clone bound shouldn't be needed, I will open a PR to fix it later.
Edit: here it is tauri-apps/tauri#12876

For now, you can use InvokeResponseBody::Raw which is Clone and implements IpcResponse trait and so can be used with channels.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That clone bound shouldn't be needed, I will open a PR to fix it later.

For now, you can use InvokeResponseBody::Raw which is Clone and implements IpcResponse trait and so can be used with channels.

done! 👍

#[inline]
fn proxy_creator(
url_or_config: UrlOrConfig,
Expand Down Expand Up @@ -181,6 +190,7 @@ pub async fn fetch<R: Runtime>(
client_config: ClientConfig,
command_scope: CommandScope<Entry>,
global_scope: GlobalScope<Entry>,
stream_channel: Channel<StreamMessage>
) -> crate::Result<ResourceId> {
let ClientConfig {
method,
Expand Down Expand Up @@ -314,7 +324,24 @@ pub async fn fetch<R: Runtime>(
#[cfg(feature = "tracing")]
tracing::trace!("{:?}", request);

let fut = async move { request.send().await.map_err(Into::into) };
let fut = async move {
let mut res = request.send().await?;

// send response through IPC channel
while let Some(chunk) = res.chunk().await? {
stream_channel.send(StreamMessage{
value: Some(chunk.to_vec()),
done: false,
})?;
}

stream_channel.send(StreamMessage { value: None, done: true })?;

// return that response
Ok(res)
};


let mut resources_table = webview.resources_table();
let rid = resources_table.add_request(Box::pin(fut));

Expand Down Expand Up @@ -410,19 +437,6 @@ pub async fn fetch_send<R: Runtime>(
})
}

#[tauri::command]
pub(crate) async fn fetch_read_body<R: Runtime>(
webview: Webview<R>,
rid: ResourceId,
) -> crate::Result<tauri::ipc::Response> {
let res = {
let mut resources_table = webview.resources_table();
resources_table.take::<ReqwestResponse>(rid)?
};
let res = Arc::into_inner(res).unwrap().0;
Ok(tauri::ipc::Response::new(res.bytes().await?.to_vec()))
}

// forbidden headers per fetch spec https://fetch.spec.whatwg.org/#terminology-headers
#[cfg(not(feature = "unsafe-headers"))]
fn is_unsafe_header(header: &HeaderName) -> bool {
Expand Down
3 changes: 1 addition & 2 deletions plugins/http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ pub fn init<R: Runtime>() -> TauriPlugin<R> {
.invoke_handler(tauri::generate_handler![
commands::fetch,
commands::fetch_cancel,
commands::fetch_send,
commands::fetch_read_body,
commands::fetch_send
])
.build()
}
Loading