Skip to content

Add an example tcp_echo_server_non_blocking #32

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 4 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,10 @@ clap.workspace = true
futures-lite.workspace = true
humantime.workspace = true
serde_json.workspace = true
futures-concurrency.workspace = true

[workspace]
members = [
"macro",
"test-programs",
"test-programs/artifacts",
]
members = ["macro", "test-programs", "test-programs/artifacts"]
resolver = "2"

[workspace.package]
Expand All @@ -56,6 +53,7 @@ cargo_metadata = "0.18.1"
clap = { version = "4.5.26", features = ["derive"] }
futures-core = "0.3.19"
futures-lite = "1.12.0"
futures-concurrency = "7.6.2"
humantime = "2.1.0"
heck = "0.5"
http = "1.1"
Expand All @@ -77,6 +75,4 @@ wstd = { path = "." }
wstd-macro = { path = "macro", version = "=0.5.1" }

[package.metadata.docs.rs]
targets = [
"wasm32-wasip2"
]
targets = ["wasm32-wasip2"]
55 changes: 55 additions & 0 deletions examples/tcp_echo_server_non_blocking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use futures_concurrency::future::FutureGroup;
use futures_lite::{FutureExt, StreamExt};
use std::{
cell::RefCell,
future::Future,
pin::{pin, Pin},
rc::Rc,
task::Poll,
};
use wstd::io;
use wstd::iter::AsyncIterator;
use wstd::net::TcpListener;

type StreamTasks = Rc<RefCell<FutureGroup<Pin<Box<dyn Future<Output = io::Result<()>>>>>>>;

#[wstd::main]
async fn main() -> io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8080").await?;
println!("Listening on {}", listener.local_addr()?);
println!("type `nc localhost 8080` to create a TCP client");

let stream_tasks: StreamTasks = StreamTasks::default();
let mut listening_task = pin!(start_listening(listener, stream_tasks.clone()));

futures_lite::future::poll_fn(|cx| {
if let Poll::Ready(_) = listening_task.as_mut().poll(cx) {
return Poll::Ready(());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to return as Ready until both the Listener is ready and the stream tasks are all complete.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't quite understand what you meant. Could you please tell me your thoughts directly?

};

let mut stream_tasks_ref = stream_tasks.borrow_mut();
if let Poll::Ready(Some(res)) = stream_tasks_ref.poll_next(cx) {
println!("Task finished: {:?}", res);
println!("Tasks len: {}", stream_tasks_ref.len());
}

Poll::Pending
})
.await;
Ok(())
}

async fn start_listening(listener: TcpListener, stream_tasks: StreamTasks) -> io::Result<()> {
let mut incoming = listener.incoming();
while let Some(stream) = incoming.next().await {
let stream = stream?;
println!("Accepted from: {}", stream.peer_addr()?);

let stream_task = async move { io::copy(&stream, &stream).await }.boxed_local();

stream_tasks.borrow_mut().insert(stream_task);
println!("Task added");
println!("Tasks len: {}", stream_tasks.borrow().len());
}
Ok(())
}
1 change: 1 addition & 0 deletions test-programs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ publish = false
futures-lite.workspace = true
serde_json.workspace = true
wstd.workspace = true
futures-concurrency.workspace = true