Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 67 additions & 16 deletions ch05/d-fibers-closure/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
/// FIX #31:
/// Inline assembly blocks inside naked functions now need to use
/// the `naked_asm` macro instead of the good old `asm` macro.
/// The `noreturn` option is implicitly set by the `naked_asm`
/// macro so there is no need to set that.
///
/// See: https://github.com/PacktPublishing/Asynchronous-Programming-in-Rust/issues/31
/// for more information.
#![feature(naked_functions)]
use std::{arch::{asm, naked_asm}};
use std::{
arch::{asm, naked_asm},
thread,
time::{self, Duration},
};

const DEFAULT_STACK_SIZE: usize = 1024 * 1024 * 2;
const MAX_THREADS: usize = 4;
const MAX_THREADS: usize = 5;
static mut RUNTIME: usize = 0;

pub struct Runtime {
Expand Down Expand Up @@ -56,6 +52,16 @@ impl Thread {
task: None,
}
}

fn new_wit(id: usize, stack_size: usize) -> Self {
Thread {
id, // changed
stack: vec![0_u8; stack_size],
ctx: ThreadContext::default(),
state: State::Available,
task: None,
}
}
}

impl Runtime {
Expand Down Expand Up @@ -120,15 +126,14 @@ impl Runtime {
let old_pos = self.current;
self.current = pos;

unsafe {
let old: *mut ThreadContext = &mut self.threads[old_pos].ctx;
let new: *const ThreadContext = &self.threads[pos].ctx;
asm!("call switch", in("rdi") old, in("rsi") new, clobber_abi("C"));
}
let old: *mut ThreadContext = &mut self.threads[old_pos].ctx;
let new: *const ThreadContext = &self.threads[pos].ctx;
swap_ctx(old, new);
self.threads.len() > 0
}

pub fn spawn<F: FnOnce() + 'static>(f: F) { // changed
pub fn spawn<F: FnOnce() + 'static>(f: F) {
// changed
unsafe {
let rt_ptr = RUNTIME as *mut Runtime;
let available = (*rt_ptr)
Expand All @@ -149,6 +154,39 @@ impl Runtime {
available.state = State::Ready;
}
}

pub fn sleep(dur: Duration) {
unsafe {
let rt_ptr = RUNTIME as *mut Runtime;
let rt = &mut *rt_ptr;
let cur = rt.current;
// commenting out the next line causes the Runtime to wait the sleep
rt.t_yield(); // yield to the next available thread
let mut available = Thread::new_wit(rt.threads.len(), 1024);
let size = available.stack.len();
let s_ptr = available.stack.as_mut_ptr().offset(size as isize);
let s_ptr = (s_ptr as usize & !15) as *mut u8;
let deadline = time::Instant::now() + dur;
let old_ctx: *mut ThreadContext = &mut available.ctx;
let new_ctx: *const ThreadContext = &rt.threads[cur].ctx;
let f = move || {
while deadline > time::Instant::now() {
// looping is necessary to avoid blocking
thread::sleep(Duration::from_millis(1));
}

swap_ctx(old_ctx, new_ctx); // swap back the old fiber
};
available.task = Some(Box::new(f));
available.ctx.thread_ptr = &available as *const Thread as u64;
std::ptr::write(s_ptr.offset(-16) as *mut u64, guard as u64);
std::ptr::write(s_ptr.offset(-24) as *mut u64, skip as u64);
std::ptr::write(s_ptr.offset(-32) as *mut u64, call as u64); // changed
available.ctx.rsp = s_ptr.offset(-32) as u64;
available.state = State::Ready;
swap_ctx(&mut rt.threads[cur].ctx, &available.ctx); // swap to the new fiber
}
}
}

// This function is new
Expand All @@ -174,6 +212,12 @@ fn guard() {
};
}

fn swap_ctx(old: *mut ThreadContext, new: *const ThreadContext) {
unsafe {
asm!("call switch", in("rdi") old, in("rsi") new, clobber_abi("C"));
}
}

pub fn yield_thread() {
unsafe {
let rt_ptr = RUNTIME as *mut Runtime;
Expand Down Expand Up @@ -209,6 +253,12 @@ unsafe extern "C" fn switch() {
pub fn main() {
let mut runtime = Runtime::new();
runtime.init();
Runtime::spawn(|| {
let secs = 3;
println!("About to sleep for {secs} seconds");
Runtime::sleep(Duration::from_secs(3));
println!("Done sleeping");
});
Runtime::spawn(|| {
println!("I haven't implemented a timer in this example.");
yield_thread();
Expand All @@ -220,6 +270,7 @@ pub fn main() {
println!("...like this!");
})
});

runtime.run();
}
#[cfg(windows)]
Expand Down