Skip to content
2 changes: 2 additions & 0 deletions libs/core/runtime/jsruntime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2230,6 +2230,8 @@ impl JsRuntime {
// ===== Phase 1: Timers =====
// 1a. Fire expired libuv C timers
if let Some(uv_inner_ptr) = context_state.uv_loop_inner.get() {
// Update cached loop time at the start of each tick, matching libuv.
unsafe { (*uv_inner_ptr).update_time() };
unsafe { (*uv_inner_ptr).run_timers() };
}

Expand Down
90 changes: 79 additions & 11 deletions libs/core/uv_compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ uv_errno!(UV_EPIPE, libc::EPIPE, -4047);
uv_errno!(UV_EBUSY, libc::EBUSY, -4082);
uv_errno!(UV_ENOBUFS, libc::ENOBUFS, -4060);
uv_errno!(UV_ENOTSUP, libc::ENOTSUP, -4049);
uv_errno!(UV_EALREADY, libc::EALREADY, -4084);
pub const UV_EOF: i32 = -4095;

/// Map a `std::io::Error` to the closest libuv error code.
Expand Down Expand Up @@ -172,10 +173,14 @@ pub(crate) struct UvLoopInner {
waker: RefCell<Option<Waker>>,
closing_handles: RefCell<VecDeque<(*mut uv_handle_t, Option<uv_close_cb>)>>,
time_origin: Instant,
/// Cached loop time in milliseconds. Updated once per tick via
/// `update_time()`, matching libuv's `uv_update_time` semantics.
cached_time_ms: Cell<u64>,
}

impl UvLoopInner {
fn new() -> Self {
let origin = Instant::now();
Self {
timers: RefCell::new(BTreeSet::new()),
next_timer_id: Cell::new(1),
Expand All @@ -187,7 +192,8 @@ impl UvLoopInner {
tty_handles: RefCell::new(Vec::with_capacity(4)),
waker: RefCell::new(None),
closing_handles: RefCell::new(VecDeque::with_capacity(16)),
time_origin: Instant::now(),
time_origin: origin,
cached_time_ms: Cell::new(0),
}
}

Expand Down Expand Up @@ -216,9 +222,19 @@ impl UvLoopInner {
id
}

/// Return the cached loop time. Matches libuv's `uv_now()` which
/// returns the time cached at the start of the current tick.
#[inline]
fn now_ms(&self) -> u64 {
Instant::now().duration_since(self.time_origin).as_millis() as u64
self.cached_time_ms.get()
}

/// Re-read the wall clock and update the cached time.
/// Matches libuv's `uv_update_time()`.
#[inline]
pub(crate) fn update_time(&self) {
let ms = Instant::now().duration_since(self.time_origin).as_millis() as u64;
self.cached_time_ms.set(ms);
}

pub(crate) fn has_alive_handles(&self) -> bool {
Expand Down Expand Up @@ -457,7 +473,8 @@ impl UvLoopInner {
}

// SAFETY: tcp_ptr is valid; checked above.
any_work |= unsafe { tcp::poll_tcp_handle(tcp_ptr, &mut cx) };
let work = unsafe { tcp::poll_tcp_handle(tcp_ptr, &mut cx) };
any_work |= work;
} // end per-tcp-handle loop

let mut j = 0;
Expand Down Expand Up @@ -557,11 +574,24 @@ impl UvLoopInner {
tty.internal_reading = false;
tty.internal_alloc_cb = None;
tty.internal_read_cb = None;
tty.internal_write_queue.clear();
tty.internal_shutdown = None;

// Cancel in-flight write requests with UV_ECANCELED, matching libuv.
while let Some(pw) = tty.internal_write_queue.pop_front() {
if let Some(cb) = pw.cb {
cb(pw.req, UV_ECANCELED);
}
}

// Cancel pending shutdown with UV_ECANCELED.
if let Some(pending) = tty.internal_shutdown.take()
&& let Some(cb) = pending.cb
{
cb(pending.req, UV_ECANCELED);
}

// Drop the reactor (AsyncFd or select fallback) to deregister
// from the reactor, then close the fd.
// Match libuv: do NOT close stdio fds (0, 1, 2).
#[cfg(unix)]
{
// If using the select fallback, shut down the background thread.
Expand All @@ -572,7 +602,7 @@ impl UvLoopInner {
tty::shutdown_select_fallback(s);
}
tty.internal_reactor = None;
if tty.internal_fd >= 0 {
if tty.internal_fd > 2 {
libc::close(tty.internal_fd);
tty.internal_fd = -1;
}
Expand Down Expand Up @@ -611,12 +641,41 @@ impl UvLoopInner {
tcp.internal_alloc_cb = None;
tcp.internal_read_cb = None;
tcp.internal_connection_cb = None;
tcp.internal_connect = None;
tcp.internal_write_queue.clear();
tcp.internal_stream = None;

// Cancel in-flight connect request with UV_ECANCELED, matching libuv.
if let Some(pending) = tcp.internal_connect.take()
&& let Some(cb) = pending.cb
{
cb(pending.req, UV_ECANCELED);
}

// Cancel in-flight write requests with UV_ECANCELED, matching libuv's
// uv__stream_flush_write_queue() called from uv__stream_destroy().
while let Some(pw) = tcp.internal_write_queue.pop_front() {
if let Some(cb) = pw.cb {
cb(pw.req, UV_ECANCELED);
}
}
if let Some(stream) = tcp.internal_stream.take() {
// Match libuv: just close the fd. The OS delivers FIN/RST to the
// peer naturally; the peer's read loop detects EOF via recv()
// returning 0. libuv does NOT manually signal EOF to peer handles.
if let Ok(std_stream) = stream.into_std() {
let _ = std_stream.shutdown(std::net::Shutdown::Both);
}
}
tcp.internal_socket = None;
tcp.internal_delayed_error = 0;
tcp.internal_listener = None;
tcp.internal_backlog.clear();
tcp.internal_shutdown = None;

// Cancel pending shutdown with UV_ECANCELED.
if let Some(pending) = tcp.internal_shutdown.take()
&& let Some(cb) = pending.cb
{
cb(pending.req, UV_ECANCELED);
}

tcp.flags &= !UV_HANDLE_ACTIVE;
}
}
Expand Down Expand Up @@ -764,6 +823,11 @@ pub unsafe extern "C" fn uv_loop_close(loop_: *mut uv_loop_t) -> c_int {
unsafe {
let internal = (*loop_).internal;
if !internal.is_null() {
let inner = &*(internal as *const UvLoopInner);
// Match libuv: return UV_EBUSY if handles or requests are still alive.
if inner.has_alive_handles() {
return UV_EBUSY;
}
drop(Box::from_raw(internal as *mut UvLoopInner));
(*loop_).internal = std::ptr::null_mut();
}
Expand All @@ -783,7 +847,11 @@ pub unsafe extern "C" fn uv_now(loop_: *mut uv_loop_t) -> u64 {
/// ### Safety
/// `_loop_` must be a valid pointer to a `uv_loop_t` initialized by `uv_loop_init`.
#[cfg_attr(feature = "uv_compat_export", unsafe(no_mangle))]
pub unsafe extern "C" fn uv_update_time(_loop_: *mut uv_loop_t) {}
pub unsafe extern "C" fn uv_update_time(loop_: *mut uv_loop_t) {
// SAFETY: Caller guarantees loop_ was initialized by uv_loop_init.
let inner = unsafe { get_inner(loop_) };
inner.update_time();
}

/// ### Safety
/// `loop_` must be initialized by `uv_loop_init`. `handle` must be a valid, writable pointer.
Expand Down
Loading
Loading