Skip to content

Commit abc4931

Browse files
committed
feat: Fiber interruptibility support
1 parent b040ee3 commit abc4931

File tree

6 files changed

+158
-57
lines changed

6 files changed

+158
-57
lines changed

Cargo.toml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ name = "wreq_ruby"
1717
magnus = { version = "0.8", features = ["bytes"] }
1818
rb-sys = { version = "0.9.110", default-features = false }
1919
tokio = { version = "1", features = ["full"] }
20-
wreq = { version = "6.0.0-rc.23", features = [
20+
tokio-util = { version = "0.7.17", default-features = false }
21+
wreq = { version = "6.0.0-rc.24", features = [
2122
"json",
2223
"socks",
2324
"stream",
@@ -50,6 +51,3 @@ incremental = false
5051
lto = "fat"
5152
opt-level = 3
5253
strip = true
53-
54-
[patch.crates-io]
55-
wreq = { git = "https://github.com/0x676e67/wreq" }

lib/wreq.rb

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,33 @@
1818

1919
unless defined?(Wreq)
2020
module Wreq
21+
# CancellationToken allows Ruby code to cooperatively cancel long-running Rust async tasks.
22+
#
23+
# This class is a binding to a Rust-side cancellation token, enabling Ruby to signal cancellation
24+
# to HTTP requests, streaming, or other operations running in Rust. When `cancel` is called, all
25+
# Rust tasks observing this token will be notified and can abort promptly.
26+
#
27+
# @example
28+
# token = Wreq::CancellationToken.new
29+
# # Pass token to a Wreq request or stream
30+
# token.cancel # Signal cancellation from Ruby
31+
#
32+
# The actual implementation and state are managed by the Rust extension.
33+
class CancellationToken
34+
# Create a new cancellation token.
35+
#
36+
# @return [Wreq::CancellationToken]
37+
def self.new
38+
end
39+
40+
# Signal cancellation to all Rust tasks observing this token.
41+
#
42+
# @return [void]
43+
def cancel
44+
end
45+
end
46+
47+
# This is a placeholder. The actual value is set by the Rust implementation.
2148
VERSION = nil
2249

2350
# Send an HTTP request.

src/gvl.rs

Lines changed: 13 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -4,55 +4,17 @@
44
use std::{ffi::c_void, mem::MaybeUninit, ptr::null_mut};
55

66
use rb_sys::rb_thread_call_without_gvl;
7-
use tokio::sync::watch;
7+
8+
use crate::rt::CancellationToken;
89

910
/// Container for safely passing closure and result through C callback.
1011
struct Args<F, R> {
1112
func: Option<F>,
1213
result: MaybeUninit<R>,
1314
}
1415

15-
/// Cancellation flag for thread interruption support.
16-
#[derive(Clone)]
17-
pub struct CancelFlag {
18-
rx: watch::Receiver<bool>,
19-
}
20-
21-
struct CancelSender {
22-
tx: watch::Sender<bool>,
23-
}
24-
25-
impl CancelSender {
26-
fn new() -> (Self, CancelFlag) {
27-
let (tx, rx) = watch::channel(false);
28-
(Self { tx }, CancelFlag { rx })
29-
}
30-
31-
fn cancel(&self) {
32-
let _ = self.tx.send(true);
33-
}
34-
}
35-
36-
impl CancelFlag {
37-
/// Wait until cancellation is signaled (zero-latency, no polling).
38-
pub async fn cancelled(&self) {
39-
let mut rx = self.rx.clone();
40-
if *rx.borrow_and_update() {
41-
return;
42-
}
43-
loop {
44-
if rx.changed().await.is_err() {
45-
return;
46-
}
47-
if *rx.borrow() {
48-
return;
49-
}
50-
}
51-
}
52-
}
53-
5416
struct UnblockData {
55-
sender: CancelSender,
17+
token: CancellationToken,
5618
}
5719

5820
unsafe extern "C" fn call_without_gvl<F, R>(arg: *mut c_void) -> *mut c_void
@@ -73,7 +35,7 @@ where
7335
unsafe extern "C" fn unblock_func(arg: *mut c_void) {
7436
if !arg.is_null() {
7537
let data = unsafe { &*(arg as *const UnblockData) };
76-
data.sender.cancel();
38+
data.token.cancel();
7739
}
7840
}
7941

@@ -110,31 +72,33 @@ where
11072
/// This results in all Ruby threads being suspended indefinitely.
11173
pub fn nogvl_cancellable<F, R>(func: F) -> R
11274
where
113-
F: FnOnce(CancelFlag) -> R,
75+
F: FnOnce(CancellationToken) -> R,
11476
R: Sized,
11577
{
116-
let (sender, flag) = CancelSender::new();
117-
let unblock_data = UnblockData { sender };
78+
let token = CancellationToken::new();
79+
let unblock_data = UnblockData {
80+
token: token.clone(),
81+
};
11882

11983
struct Wrapper<F, R> {
12084
func: Option<F>,
121-
flag: CancelFlag,
85+
token: CancellationToken,
12286
result: MaybeUninit<R>,
12387
}
12488

12589
let mut wrapper = Wrapper {
12690
func: Some(func),
127-
flag,
91+
token,
12892
result: MaybeUninit::uninit(),
12993
};
13094

13195
unsafe extern "C" fn call_with_flag<F, R>(arg: *mut c_void) -> *mut c_void
13296
where
133-
F: FnOnce(CancelFlag) -> R,
97+
F: FnOnce(CancellationToken) -> R,
13498
{
13599
let wrapper = unsafe { &mut *(arg as *mut Wrapper<F, R>) };
136100
if let Some(func) = wrapper.func.take() {
137-
wrapper.result.write(func(wrapper.flag.clone()));
101+
wrapper.result.write(func(wrapper.token.clone()));
138102
}
139103
null_mut()
140104
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,5 +93,6 @@ fn init(ruby: &Ruby) -> Result<(), Error> {
9393
client::include(ruby, &gem_module)?;
9494
emulation::include(ruby, &gem_module)?;
9595
error::include(ruby);
96+
rt::include(ruby, &gem_module)?;
9697
Ok(())
9798
}

src/rt.rs

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
use std::sync::LazyLock;
22

3+
use magnus::{
4+
DataTypeFunctions, Error, Module, Object, RModule, Ruby, TypedData, function, method,
5+
};
36
use tokio::runtime::{Builder, Runtime};
47

58
use crate::{error::interrupt_error, gvl};
@@ -17,11 +20,11 @@ pub fn try_block_on<F, T>(future: F) -> F::Output
1720
where
1821
F: Future<Output = Result<T, magnus::Error>>,
1922
{
20-
gvl::nogvl_cancellable(|flag| {
23+
gvl::nogvl_cancellable(|token| {
2124
RUNTIME.block_on(async move {
2225
tokio::select! {
2326
biased;
24-
_ = flag.cancelled() => Err(interrupt_error()),
27+
_ = token.cancelled() => Err(interrupt_error()),
2528
result = future => result,
2629
}
2730
})
@@ -35,13 +38,57 @@ pub fn maybe_block_on<F, T>(future: F) -> F::Output
3538
where
3639
F: Future<Output = Option<T>>,
3740
{
38-
gvl::nogvl_cancellable(|flag| {
41+
gvl::nogvl_cancellable(|token| {
3942
RUNTIME.block_on(async move {
4043
tokio::select! {
4144
biased;
42-
_ = flag.cancelled() => None,
45+
_ = token.cancelled() => None,
4346
result = future => result,
4447
}
4548
})
4649
})
4750
}
51+
52+
/// A cancellation token for cooperative cancellation of Rust async tasks from Ruby.
53+
///
54+
/// This type wraps [`tokio_util::sync::CancellationToken`] and is exposed to Ruby as
55+
/// `Wreq::CancellationToken`. It allows Ruby code to signal cancellation to long-running or
56+
/// blocking Rust tasks, enabling graceful interruption.
57+
///
58+
/// Typical usage:
59+
/// - Ruby creates a `Wreq::CancellationToken` and passes it to a Rust-backed async operation.
60+
/// - Rust code checks or awaits the token to detect cancellation and aborts work if cancelled.
61+
/// - Calling `cancel` from Ruby triggers cancellation for all tasks observing this token or its
62+
/// clones.
63+
///
64+
/// This is especially useful for interrupting HTTP requests, streaming, or other operations that
65+
/// may need to be stopped from Ruby.
66+
#[derive(Clone, DataTypeFunctions, TypedData)]
67+
#[magnus(class = "Wreq::CancellationToken", free_immediately, size)]
68+
pub struct CancellationToken(tokio_util::sync::CancellationToken);
69+
70+
impl CancellationToken {
71+
/// Create a new [`CancellationToken`].
72+
#[inline]
73+
pub fn new() -> Self {
74+
Self(tokio_util::sync::CancellationToken::new())
75+
}
76+
77+
/// Signal cancellation to all tasks observing this token.
78+
#[inline]
79+
pub fn cancel(&self) {
80+
self.0.cancel()
81+
}
82+
83+
#[inline]
84+
async fn cancelled(&self) {
85+
self.0.cancelled().await
86+
}
87+
}
88+
89+
pub fn include(ruby: &Ruby, gem_module: &RModule) -> Result<(), Error> {
90+
let headers_class = gem_module.define_class("CancellationToken", ruby.class_object())?;
91+
headers_class.define_singleton_method("new", function!(CancellationToken::new, 0))?;
92+
headers_class.define_method("cancel", method!(CancellationToken::cancel, 0))?;
93+
Ok(())
94+
}

test/interrupt_test.rb

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
require "test_helper"
2+
3+
class ThreadInterruptTest < Minitest::Test
4+
HANGING_URL = "http://10.255.255.1:12345/" # Non-routable, hangs forever
5+
SLOW_BODY_URL = "https://httpbin.io/drip?duration=5&numbytes=5" # 5s slow body
6+
7+
def test_connect_phase_interrupt
8+
thread = Thread.new do
9+
begin
10+
Wreq.get(HANGING_URL)
11+
rescue => e
12+
e
13+
end
14+
end
15+
sleep 2
16+
thread.kill
17+
result = thread.join(5)
18+
assert result, "Thread should be killed and joined"
19+
end
20+
21+
def test_connect_with_timeout_interrupt
22+
thread = Thread.new do
23+
begin
24+
Wreq.get(HANGING_URL, timeout: 60)
25+
rescue => e
26+
e
27+
end
28+
end
29+
sleep 2
30+
thread.kill
31+
result = thread.join(5)
32+
assert result, "Thread should be killed and joined"
33+
end
34+
35+
def test_body_reading_interrupt
36+
thread = Thread.new do
37+
resp = Wreq.get(SLOW_BODY_URL)
38+
begin
39+
resp.text
40+
rescue => e
41+
e
42+
end
43+
end
44+
sleep 2
45+
thread.kill
46+
result = thread.join(5)
47+
assert result, "Thread should be killed and joined"
48+
end
49+
50+
def test_body_streaming_interrupt
51+
thread = Thread.new do
52+
resp = Wreq.get(SLOW_BODY_URL)
53+
begin
54+
resp.chunks { |chunk| chunk }
55+
rescue => e
56+
e
57+
end
58+
end
59+
sleep 2
60+
thread.kill
61+
result = thread.join(5)
62+
assert result, "Thread should be killed and joined"
63+
end
64+
end

0 commit comments

Comments
 (0)