Skip to content

Commit bedd77b

Browse files
darkprokobaAndrey Petrov
andauthored
Exposed the curl_multi_poll libcurl API behind a new feature flag: po… (#413)
* Exposed the curl_multi_poll libcurl API behind a new feature flag: poll_7_66_0, see #403. See also: https://curl.se/libcurl/c/curl_multi_poll.html * Exposed curl_multi_wakeup as well. * [trivial] Made cargo fmt happy. * Renamed Multi::_timeout_i32 to Multi::timeout_i32 as suggested at #413. * Got rid of the Mutex and moved the raw pointer from the Multi struct to a new RawMulti struct. curl_sys::curl_multi_cleanup is now invoked in RawMulti::drop instead of in Multi::drop. The MultiWaker struct now has a std::sync::Weak reference to the RawMulti struct. * Got rid of the poll_7_66_0 feature as suggested by alexcrichton. * [trivial] Explicitly state the mimimum curl version required for the MultiWaker::wakeup method. * Brought back the feature flag, but since it turned out curl_multi_wakeup actually requires curl 7.68.0, i've adjusted the name of the feature accordingly. * Made cargo fmt happy again. * Documented the newly introduced poll_7_68_0 feature in README.md Co-authored-by: Andrey Petrov <[email protected]>
1 parent b57ef18 commit bedd77b

File tree

6 files changed

+189
-34
lines changed

6 files changed

+189
-34
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ force-system-lib-on-osx = ['curl-sys/force-system-lib-on-osx']
5050
protocol-ftp = ["curl-sys/protocol-ftp"]
5151
zlib-ng-compat = ["curl-sys/zlib-ng-compat", "static-curl"]
5252
upkeep_7_62_0 = ["curl-sys/upkeep_7_62_0"]
53+
poll_7_68_0 = ["curl-sys/poll_7_68_0"]
5354

5455
[[test]]
5556
name = "atexit"

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ with various Cargo features:
129129
- `static-ssl`: Use a bundled OpenSSL version and statically link to it. Only applies on platforms that use OpenSSL. Disabled by default.
130130
- `spnego`: Enable SPNEGO support. Disabled by default.
131131
- `upkeep_7_62_0`: Enable curl_easy_upkeep() support, introduced in curl 7.62.0. Disabled by default.
132+
- `poll_7_68_0`: Enable curl_multi_poll()/curl_multi_wakeup() support, requires curl 7.68.0 or later. Disabled by default.
132133

133134
## Version Support
134135

curl-sys/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,4 @@ force-system-lib-on-osx = []
5454
protocol-ftp = []
5555
zlib-ng-compat = ["libz-sys/zlib-ng", "static-curl"]
5656
upkeep_7_62_0 = []
57+
poll_7_68_0 = []

curl-sys/lib.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1079,6 +1079,19 @@ extern "C" {
10791079
timeout_ms: c_int,
10801080
ret: *mut c_int,
10811081
) -> CURLMcode;
1082+
1083+
#[cfg(feature = "poll_7_68_0")]
1084+
pub fn curl_multi_poll(
1085+
multi_handle: *mut CURLM,
1086+
extra_fds: *mut curl_waitfd,
1087+
extra_nfds: c_uint,
1088+
timeout_ms: c_int,
1089+
ret: *mut c_int,
1090+
) -> CURLMcode;
1091+
1092+
#[cfg(feature = "poll_7_68_0")]
1093+
pub fn curl_multi_wakeup(multi_handle: *mut CURLM) -> CURLMcode;
1094+
10821095
pub fn curl_multi_perform(multi_handle: *mut CURLM, running_handles: *mut c_int) -> CURLMcode;
10831096
pub fn curl_multi_cleanup(multi_handle: *mut CURLM) -> CURLMcode;
10841097
pub fn curl_multi_info_read(

src/multi.rs

Lines changed: 139 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
use std::fmt;
44
use std::marker;
55
use std::ptr;
6+
use std::sync::Arc;
67
use std::time::Duration;
78

89
use curl_sys;
@@ -29,10 +30,15 @@ use crate::{Error, MultiError};
2930
///
3031
/// [multi tutorial]: https://curl.haxx.se/libcurl/c/libcurl-multi.html
3132
pub struct Multi {
32-
raw: *mut curl_sys::CURLM,
33+
raw: Arc<RawMulti>,
3334
data: Box<MultiData>,
3435
}
3536

37+
#[derive(Debug)]
38+
struct RawMulti {
39+
handle: *mut curl_sys::CURLM,
40+
}
41+
3642
struct MultiData {
3743
socket: Box<dyn FnMut(Socket, SocketEvents, usize) + Send>,
3844
timer: Box<dyn FnMut(Option<Duration>) -> bool + Send>,
@@ -92,6 +98,20 @@ pub struct WaitFd {
9298
inner: curl_sys::curl_waitfd,
9399
}
94100

101+
/// A handle that can be used to wake up a thread that's blocked in [Multi::poll].
102+
/// The handle can be passed to and used from any thread.
103+
#[cfg(feature = "poll_7_68_0")]
104+
#[derive(Debug, Clone)]
105+
pub struct MultiWaker {
106+
raw: std::sync::Weak<RawMulti>,
107+
}
108+
109+
#[cfg(feature = "poll_7_68_0")]
110+
unsafe impl Send for MultiWaker {}
111+
112+
#[cfg(feature = "poll_7_68_0")]
113+
unsafe impl Sync for MultiWaker {}
114+
95115
impl Multi {
96116
/// Creates a new multi session through which multiple HTTP transfers can be
97117
/// initiated.
@@ -101,7 +121,7 @@ impl Multi {
101121
let ptr = curl_sys::curl_multi_init();
102122
assert!(!ptr.is_null());
103123
Multi {
104-
raw: ptr,
124+
raw: Arc::new(RawMulti { handle: ptr }),
105125
data: Box::new(MultiData {
106126
socket: Box::new(|_, _, _| ()),
107127
timer: Box::new(|_| true),
@@ -196,7 +216,7 @@ impl Multi {
196216
pub fn assign(&self, socket: Socket, token: usize) -> Result<(), MultiError> {
197217
unsafe {
198218
cvt(curl_sys::curl_multi_assign(
199-
self.raw,
219+
self.raw.handle,
200220
socket,
201221
token as *mut _,
202222
))?;
@@ -341,15 +361,15 @@ impl Multi {
341361
}
342362

343363
fn setopt_long(&mut self, opt: curl_sys::CURLMoption, val: c_long) -> Result<(), MultiError> {
344-
unsafe { cvt(curl_sys::curl_multi_setopt(self.raw, opt, val)) }
364+
unsafe { cvt(curl_sys::curl_multi_setopt(self.raw.handle, opt, val)) }
345365
}
346366

347367
fn setopt_ptr(
348368
&mut self,
349369
opt: curl_sys::CURLMoption,
350370
val: *const c_char,
351371
) -> Result<(), MultiError> {
352-
unsafe { cvt(curl_sys::curl_multi_setopt(self.raw, opt, val)) }
372+
unsafe { cvt(curl_sys::curl_multi_setopt(self.raw.handle, opt, val)) }
353373
}
354374

355375
/// Add an easy handle to a multi session
@@ -377,7 +397,7 @@ impl Multi {
377397
easy.transfer();
378398

379399
unsafe {
380-
cvt(curl_sys::curl_multi_add_handle(self.raw, easy.raw()))?;
400+
cvt(curl_sys::curl_multi_add_handle(self.raw.handle, easy.raw()))?;
381401
}
382402
Ok(EasyHandle {
383403
easy,
@@ -388,7 +408,7 @@ impl Multi {
388408
/// Same as `add`, but works with the `Easy2` type.
389409
pub fn add2<H>(&self, easy: Easy2<H>) -> Result<Easy2Handle<H>, MultiError> {
390410
unsafe {
391-
cvt(curl_sys::curl_multi_add_handle(self.raw, easy.raw()))?;
411+
cvt(curl_sys::curl_multi_add_handle(self.raw.handle, easy.raw()))?;
392412
}
393413
Ok(Easy2Handle {
394414
easy,
@@ -410,7 +430,7 @@ impl Multi {
410430
pub fn remove(&self, easy: EasyHandle) -> Result<Easy, MultiError> {
411431
unsafe {
412432
cvt(curl_sys::curl_multi_remove_handle(
413-
self.raw,
433+
self.raw.handle,
414434
easy.easy.raw(),
415435
))?;
416436
}
@@ -421,7 +441,7 @@ impl Multi {
421441
pub fn remove2<H>(&self, easy: Easy2Handle<H>) -> Result<Easy2<H>, MultiError> {
422442
unsafe {
423443
cvt(curl_sys::curl_multi_remove_handle(
424-
self.raw,
444+
self.raw.handle,
425445
easy.easy.raw(),
426446
))?;
427447
}
@@ -445,7 +465,7 @@ impl Multi {
445465
let mut queue = 0;
446466
unsafe {
447467
loop {
448-
let ptr = curl_sys::curl_multi_info_read(self.raw, &mut queue);
468+
let ptr = curl_sys::curl_multi_info_read(self.raw.handle, &mut queue);
449469
if ptr.is_null() {
450470
break;
451471
}
@@ -479,7 +499,7 @@ impl Multi {
479499
let mut remaining = 0;
480500
unsafe {
481501
cvt(curl_sys::curl_multi_socket_action(
482-
self.raw,
502+
self.raw.handle,
483503
socket,
484504
events.bits,
485505
&mut remaining,
@@ -507,7 +527,7 @@ impl Multi {
507527
let mut remaining = 0;
508528
unsafe {
509529
cvt(curl_sys::curl_multi_socket_action(
510-
self.raw,
530+
self.raw.handle,
511531
curl_sys::CURL_SOCKET_BAD,
512532
0,
513533
&mut remaining,
@@ -536,7 +556,7 @@ impl Multi {
536556
pub fn get_timeout(&self) -> Result<Option<Duration>, MultiError> {
537557
let mut ms = 0;
538558
unsafe {
539-
cvt(curl_sys::curl_multi_timeout(self.raw, &mut ms))?;
559+
cvt(curl_sys::curl_multi_timeout(self.raw.handle, &mut ms))?;
540560
if ms == -1 {
541561
Ok(None)
542562
} else {
@@ -571,19 +591,72 @@ impl Multi {
571591
/// }
572592
/// ```
573593
pub fn wait(&self, waitfds: &mut [WaitFd], timeout: Duration) -> Result<u32, MultiError> {
574-
let timeout_ms = {
575-
let secs = timeout.as_secs();
576-
if secs > (i32::max_value() / 1000) as u64 {
577-
// Duration too large, clamp at maximum value.
578-
i32::max_value()
579-
} else {
580-
secs as i32 * 1000 + timeout.subsec_nanos() as i32 / 1_000_000
581-
}
582-
};
594+
let timeout_ms = Multi::timeout_i32(timeout);
583595
unsafe {
584596
let mut ret = 0;
585597
cvt(curl_sys::curl_multi_wait(
586-
self.raw,
598+
self.raw.handle,
599+
waitfds.as_mut_ptr() as *mut _,
600+
waitfds.len() as u32,
601+
timeout_ms,
602+
&mut ret,
603+
))?;
604+
Ok(ret as u32)
605+
}
606+
}
607+
608+
fn timeout_i32(timeout: Duration) -> i32 {
609+
let secs = timeout.as_secs();
610+
if secs > (i32::MAX / 1000) as u64 {
611+
// Duration too large, clamp at maximum value.
612+
i32::MAX
613+
} else {
614+
secs as i32 * 1000 + timeout.subsec_nanos() as i32 / 1_000_000
615+
}
616+
}
617+
618+
/// Block until activity is detected or a timeout passes.
619+
///
620+
/// The timeout is used in millisecond-precision. Large durations are
621+
/// clamped at the maximum value curl accepts.
622+
///
623+
/// The returned integer will contain the number of internal file
624+
/// descriptors on which interesting events occurred.
625+
///
626+
/// This function is a simpler alternative to using `fdset()` and `select()`
627+
/// and does not suffer from file descriptor limits.
628+
///
629+
/// While this method is similar to [Multi::wait], with the following
630+
/// distinctions:
631+
/// * If there are no handles added to the multi, poll will honor the
632+
/// provided timeout, while [Multi::wait] returns immediately.
633+
/// * If poll has blocked due to there being no activity on the handles in
634+
/// the Multi, it can be woken up from any thread and at any time before
635+
/// the timeout expires.
636+
///
637+
/// Requires libcurl 7.66.0 or later.
638+
///
639+
/// # Example
640+
///
641+
/// ```
642+
/// use curl::multi::Multi;
643+
/// use std::time::Duration;
644+
///
645+
/// let m = Multi::new();
646+
///
647+
/// // Add some Easy handles...
648+
///
649+
/// while m.perform().unwrap() > 0 {
650+
/// m.poll(&mut [], Duration::from_secs(1)).unwrap();
651+
/// }
652+
/// ```
653+
#[cfg(feature = "poll_7_68_0")]
654+
pub fn poll(&self, waitfds: &mut [WaitFd], timeout: Duration) -> Result<u32, MultiError> {
655+
let timeout_ms = Multi::timeout_i32(timeout);
656+
unsafe {
657+
let mut ret = 0;
658+
cvt(curl_sys::curl_multi_poll(
659+
self.raw.handle,
587660
waitfds.as_mut_ptr() as *mut _,
588661
waitfds.len() as u32,
589662
timeout_ms,
@@ -593,6 +666,13 @@ impl Multi {
593666
}
594667
}
595668

669+
/// Returns a new [MultiWaker] that can be used to wake up a thread that's
670+
/// currently blocked in [Multi::poll].
671+
#[cfg(feature = "poll_7_68_0")]
672+
pub fn waker(&self) -> MultiWaker {
673+
MultiWaker::new(Arc::downgrade(&self.raw))
674+
}
675+
596676
/// Reads/writes available data from each easy handle.
597677
///
598678
/// This function handles transfers on all the added handles that need
@@ -636,7 +716,7 @@ impl Multi {
636716
pub fn perform(&self) -> Result<u32, MultiError> {
637717
unsafe {
638718
let mut ret = 0;
639-
cvt(curl_sys::curl_multi_perform(self.raw, &mut ret))?;
719+
cvt(curl_sys::curl_multi_perform(self.raw.handle, &mut ret))?;
640720
Ok(ret as u32)
641721
}
642722
}
@@ -684,7 +764,11 @@ impl Multi {
684764
let write = write.map(|r| r as *mut _).unwrap_or(ptr::null_mut());
685765
let except = except.map(|r| r as *mut _).unwrap_or(ptr::null_mut());
686766
cvt(curl_sys::curl_multi_fdset(
687-
self.raw, read, write, except, &mut ret,
767+
self.raw.handle,
768+
read,
769+
write,
770+
except,
771+
&mut ret,
688772
))?;
689773
if ret == -1 {
690774
Ok(None)
@@ -710,11 +794,38 @@ impl Multi {
710794

711795
/// Get a pointer to the raw underlying CURLM handle.
712796
pub fn raw(&self) -> *mut curl_sys::CURLM {
713-
self.raw
797+
self.raw.handle
714798
}
799+
}
715800

716-
unsafe fn close_impl(&self) -> Result<(), MultiError> {
717-
cvt(curl_sys::curl_multi_cleanup(self.raw))
801+
impl Drop for RawMulti {
802+
fn drop(&mut self) {
803+
unsafe {
804+
let _ = cvt(curl_sys::curl_multi_cleanup(self.handle));
805+
}
806+
}
807+
}
808+
809+
#[cfg(feature = "poll_7_68_0")]
810+
impl MultiWaker {
811+
/// Creates a new MultiWaker handle.
812+
fn new(raw: std::sync::Weak<RawMulti>) -> Self {
813+
Self { raw }
814+
}
815+
816+
/// Wakes up a thread that is blocked in [Multi::poll]. This method can be
817+
/// invoked from any thread.
818+
///
819+
/// Will return an error if the RawMulti has already been dropped.
820+
///
821+
/// Requires libcurl 7.68.0 or later.
822+
pub fn wakeup(&self) -> Result<(), MultiError> {
823+
if let Some(raw) = self.raw.upgrade() {
824+
unsafe { cvt(curl_sys::curl_multi_wakeup(raw.handle)) }
825+
} else {
826+
// This happens if the RawMulti has already been dropped:
827+
Err(MultiError::new(curl_sys::CURLM_BAD_HANDLE))
828+
}
718829
}
719830
}
720831

@@ -732,12 +843,6 @@ impl fmt::Debug for Multi {
732843
}
733844
}
734845

735-
impl Drop for Multi {
736-
fn drop(&mut self) {
737-
let _ = unsafe { self.close_impl() };
738-
}
739-
}
740-
741846
macro_rules! impl_easy_getters {
742847
() => {
743848
impl_easy_getters! {

0 commit comments

Comments
 (0)