diff --git a/Cargo.toml b/Cargo.toml index 942d88a..2a9fe21 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,13 +28,10 @@ clap.workspace = true futures-lite.workspace = true humantime.workspace = true serde_json.workspace = true +futures-concurrency.workspace = true [workspace] -members = [ - "macro", - "test-programs", - "test-programs/artifacts", -] +members = ["macro", "test-programs", "test-programs/artifacts"] resolver = "2" [workspace.package] @@ -56,6 +53,7 @@ cargo_metadata = "0.18.1" clap = { version = "4.5.26", features = ["derive"] } futures-core = "0.3.19" futures-lite = "1.12.0" +futures-concurrency = "7.6.2" humantime = "2.1.0" heck = "0.5" http = "1.1" @@ -77,6 +75,4 @@ wstd = { path = "." } wstd-macro = { path = "macro", version = "=0.5.1" } [package.metadata.docs.rs] -targets = [ - "wasm32-wasip2" -] +targets = ["wasm32-wasip2"] diff --git a/examples/tcp_echo_server_non_blocking.rs b/examples/tcp_echo_server_non_blocking.rs new file mode 100644 index 0000000..78f3edf --- /dev/null +++ b/examples/tcp_echo_server_non_blocking.rs @@ -0,0 +1,55 @@ +use futures_concurrency::future::FutureGroup; +use futures_lite::{FutureExt, StreamExt}; +use std::{ + cell::RefCell, + future::Future, + pin::{pin, Pin}, + rc::Rc, + task::Poll, +}; +use wstd::io; +use wstd::iter::AsyncIterator; +use wstd::net::TcpListener; + +type StreamTasks = Rc>>>>>>; + +#[wstd::main] +async fn main() -> io::Result<()> { + let listener = TcpListener::bind("127.0.0.1:8080").await?; + println!("Listening on {}", listener.local_addr()?); + println!("type `nc localhost 8080` to create a TCP client"); + + let stream_tasks: StreamTasks = StreamTasks::default(); + let mut listening_task = pin!(start_listening(listener, stream_tasks.clone())); + + futures_lite::future::poll_fn(|cx| { + if let Poll::Ready(_) = listening_task.as_mut().poll(cx) { + return Poll::Ready(()); + }; + + let mut stream_tasks_ref = stream_tasks.borrow_mut(); + if let Poll::Ready(Some(res)) = stream_tasks_ref.poll_next(cx) { + println!("Task finished: {:?}", res); + println!("Tasks len: {}", stream_tasks_ref.len()); + } + + Poll::Pending + }) + .await; + Ok(()) +} + +async fn start_listening(listener: TcpListener, stream_tasks: StreamTasks) -> io::Result<()> { + let mut incoming = listener.incoming(); + while let Some(stream) = incoming.next().await { + let stream = stream?; + println!("Accepted from: {}", stream.peer_addr()?); + + let stream_task = async move { io::copy(&stream, &stream).await }.boxed_local(); + + stream_tasks.borrow_mut().insert(stream_task); + println!("Task added"); + println!("Tasks len: {}", stream_tasks.borrow().len()); + } + Ok(()) +} diff --git a/test-programs/Cargo.toml b/test-programs/Cargo.toml index 633c274..a56373d 100644 --- a/test-programs/Cargo.toml +++ b/test-programs/Cargo.toml @@ -8,3 +8,4 @@ publish = false futures-lite.workspace = true serde_json.workspace = true wstd.workspace = true +futures-concurrency.workspace = true