Skip to content

Commit ac9a25c

Browse files
authored
fix(http): use tokio oneshot channel for detecting abort (#1395)
* fix(http): properly handle aborting closes #1376 * abort early in JS * avoid using unnecessary mutexes * fix lint * update bundle * simplify
1 parent b07c092 commit ac9a25c

File tree

6 files changed

+109
-34
lines changed

6 files changed

+109
-34
lines changed

.changes/http-abort.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"http": "patch"
3+
"http-js": "patch"
4+
---
5+
6+
Fix cancelling requests using `AbortSignal`.

Cargo.lock

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

plugins/http/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ serde = { workspace = true }
2626
serde_json = { workspace = true }
2727
tauri = { workspace = true }
2828
thiserror = { workspace = true }
29+
tokio = { version = "1", features = [ "sync", "macros" ] }
2930
tauri-plugin-fs = { path = "../fs", version = "2.0.0-beta.10" }
3031
urlpattern = "0.2"
3132
regex = "1"

plugins/http/api-iife.js

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

plugins/http/guest-js/index.ts

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ export interface ClientOptions {
8686
proxy?: Proxy;
8787
}
8888

89+
const ERROR_REQUEST_CANCELLED = "Request canceled";
90+
8991
/**
9092
* Fetch a resource from the network. It returns a `Promise` that resolves to the
9193
* `Response` to that `Request`, whether it is successful or not.
@@ -104,6 +106,12 @@ export async function fetch(
104106
input: URL | Request | string,
105107
init?: RequestInit & ClientOptions,
106108
): Promise<Response> {
109+
// abort early here if needed
110+
const signal = init?.signal;
111+
if (signal?.aborted) {
112+
throw new Error(ERROR_REQUEST_CANCELLED);
113+
}
114+
107115
const maxRedirections = init?.maxRedirections;
108116
const connectTimeout = init?.connectTimeout;
109117
const proxy = init?.proxy;
@@ -115,8 +123,6 @@ export async function fetch(
115123
delete init.proxy;
116124
}
117125

118-
const signal = init?.signal;
119-
120126
const headers = init?.headers
121127
? init.headers instanceof Headers
122128
? init.headers
@@ -153,6 +159,11 @@ export async function fetch(
153159
],
154160
);
155161

162+
// abort early here if needed
163+
if (signal?.aborted) {
164+
throw new Error(ERROR_REQUEST_CANCELLED);
165+
}
166+
156167
const rid = await invoke<number>("plugin:http|fetch", {
157168
clientConfig: {
158169
method: req.method,
@@ -165,11 +176,17 @@ export async function fetch(
165176
},
166177
});
167178

168-
signal?.addEventListener("abort", () => {
169-
void invoke("plugin:http|fetch_cancel", {
170-
rid,
171-
});
172-
});
179+
const abort = () => invoke("plugin:http|fetch_cancel", { rid });
180+
181+
// abort early here if needed
182+
if (signal?.aborted) {
183+
// we don't care about the result of this proimse
184+
// eslint-disable-next-line @typescript-eslint/no-floating-promises
185+
abort();
186+
throw new Error(ERROR_REQUEST_CANCELLED);
187+
}
188+
189+
signal?.addEventListener("abort", () => abort);
173190

174191
interface FetchSendResponse {
175192
status: number;
@@ -203,7 +220,6 @@ export async function fetch(
203220
? new Uint8Array(body)
204221
: null,
205222
{
206-
headers: responseHeaders,
207223
status,
208224
statusText,
209225
},

plugins/http/src/commands.rs

Lines changed: 64 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ use tauri::{
1111
async_runtime::Mutex,
1212
command,
1313
ipc::{CommandScope, GlobalScope},
14-
Manager, ResourceId, Runtime, State, Webview,
14+
Manager, ResourceId, ResourceTable, Runtime, State, Webview,
1515
};
16+
use tokio::sync::oneshot::{channel, Receiver, Sender};
1617

1718
use crate::{
1819
scope::{Entry, Scope},
@@ -22,20 +23,47 @@ use crate::{
2223
const HTTP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);
2324

2425
struct ReqwestResponse(reqwest::Response);
26+
impl tauri::Resource for ReqwestResponse {}
2527

26-
type CancelableResponseResult = Result<Result<reqwest::Response>>;
28+
type CancelableResponseResult = Result<reqwest::Response>;
2729
type CancelableResponseFuture =
2830
Pin<Box<dyn Future<Output = CancelableResponseResult> + Send + Sync>>;
2931

30-
struct FetchRequest(Mutex<CancelableResponseFuture>);
31-
impl FetchRequest {
32-
fn new(f: CancelableResponseFuture) -> Self {
33-
Self(Mutex::new(f))
32+
struct FetchRequest {
33+
fut: Mutex<CancelableResponseFuture>,
34+
abort_tx_rid: ResourceId,
35+
abort_rx_rid: ResourceId,
36+
}
37+
impl tauri::Resource for FetchRequest {}
38+
39+
struct AbortSender(Sender<()>);
40+
impl tauri::Resource for AbortRecveiver {}
41+
42+
impl AbortSender {
43+
fn abort(self) {
44+
let _ = self.0.send(());
3445
}
3546
}
3647

37-
impl tauri::Resource for FetchRequest {}
38-
impl tauri::Resource for ReqwestResponse {}
48+
struct AbortRecveiver(Receiver<()>);
49+
impl tauri::Resource for AbortSender {}
50+
51+
trait AddRequest {
52+
fn add_request(&mut self, fut: CancelableResponseFuture) -> ResourceId;
53+
}
54+
55+
impl AddRequest for ResourceTable {
56+
fn add_request(&mut self, fut: CancelableResponseFuture) -> ResourceId {
57+
let (tx, rx) = channel::<()>();
58+
let (tx, rx) = (AbortSender(tx), AbortRecveiver(rx));
59+
let req = FetchRequest {
60+
fut: Mutex::new(fut),
61+
abort_tx_rid: self.add(tx),
62+
abort_rx_rid: self.add(rx),
63+
};
64+
self.add(req)
65+
}
66+
}
3967

4068
#[derive(Serialize)]
4169
#[serde(rename_all = "camelCase")]
@@ -239,9 +267,9 @@ pub async fn fetch<R: Runtime>(
239267
request = request.body(data);
240268
}
241269

242-
let fut = async move { Ok(request.send().await.map_err(Into::into)) };
270+
let fut = async move { request.send().await.map_err(Into::into) };
243271
let mut resources_table = webview.resources_table();
244-
let rid = resources_table.add(FetchRequest::new(Box::pin(fut)));
272+
let rid = resources_table.add_request(Box::pin(fut));
245273

246274
Ok(rid)
247275
} else {
@@ -260,24 +288,23 @@ pub async fn fetch<R: Runtime>(
260288
.header(header::CONTENT_TYPE, data_url.mime_type().to_string())
261289
.body(reqwest::Body::from(body))?;
262290

263-
let fut = async move { Ok(Ok(reqwest::Response::from(response))) };
291+
let fut = async move { Ok(reqwest::Response::from(response)) };
264292
let mut resources_table = webview.resources_table();
265-
let rid = resources_table.add(FetchRequest::new(Box::pin(fut)));
293+
let rid = resources_table.add_request(Box::pin(fut));
266294
Ok(rid)
267295
}
268296
_ => Err(Error::SchemeNotSupport(scheme.to_string())),
269297
}
270298
}
271299

272300
#[command]
273-
pub async fn fetch_cancel<R: Runtime>(webview: Webview<R>, rid: ResourceId) -> crate::Result<()> {
274-
let req = {
275-
let resources_table = webview.resources_table();
276-
resources_table.get::<FetchRequest>(rid)?
277-
};
278-
let mut req = req.0.lock().await;
279-
*req = Box::pin(async { Err(Error::RequestCanceled) });
280-
301+
pub fn fetch_cancel<R: Runtime>(webview: Webview<R>, rid: ResourceId) -> crate::Result<()> {
302+
let mut resources_table = webview.resources_table();
303+
let req = resources_table.get::<FetchRequest>(rid)?;
304+
let abort_tx = resources_table.take::<AbortSender>(req.abort_tx_rid)?;
305+
if let Some(abort_tx) = Arc::into_inner(abort_tx) {
306+
abort_tx.abort();
307+
}
281308
Ok(())
282309
}
283310

@@ -286,14 +313,26 @@ pub async fn fetch_send<R: Runtime>(
286313
webview: Webview<R>,
287314
rid: ResourceId,
288315
) -> crate::Result<FetchResponse> {
289-
let req = {
316+
let (req, abort_rx) = {
290317
let mut resources_table = webview.resources_table();
291-
resources_table.take::<FetchRequest>(rid)?
318+
let req = resources_table.get::<FetchRequest>(rid)?;
319+
let abort_rx = resources_table.take::<AbortRecveiver>(req.abort_rx_rid)?;
320+
(req, abort_rx)
292321
};
293322

294-
let res = match req.0.lock().await.as_mut().await {
295-
Ok(Ok(res)) => res,
296-
Ok(Err(e)) | Err(e) => return Err(e),
323+
let Some(abort_rx) = Arc::into_inner(abort_rx) else {
324+
return Err(Error::RequestCanceled);
325+
};
326+
327+
let mut fut = req.fut.lock().await;
328+
329+
let res = tokio::select! {
330+
res = fut.as_mut() => res?,
331+
_ = abort_rx.0 => {
332+
let mut resources_table = webview.resources_table();
333+
resources_table.close(rid)?;
334+
return Err(Error::RequestCanceled);
335+
}
297336
};
298337

299338
let status = res.status();

0 commit comments

Comments
 (0)