Skip to content

Commit 9897ec2

Browse files
Checkpointing websocket server.
1 parent d567760 commit 9897ec2

File tree

7 files changed

+146
-32
lines changed

7 files changed

+146
-32
lines changed

Cargo.lock

Lines changed: 45 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

hydroflow/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ serde_json = "1.0.115"
5959
slotmap = "1.0.0"
6060
smallvec = "1.6.1"
6161
tokio-stream = { version = "0.1.3", default-features = false, features = [ "time", "io-util", "sync" ] }
62+
tokio-tungstenite = "0.23.1"
6263
tracing = "0.1.37"
6364
variadics = { path = "../variadics", version = "^0.0.6" }
6465
web-time = "1.0.0"

hydroflow/examples/chat/main.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,17 +49,18 @@ async fn main() {
4949
let opts = Opts::parse();
5050

5151
match opts.role {
52-
Role::Client => {
53-
run_client(opts).await;
54-
}
52+
// Role::Client => {
53+
// run_client(opts).await;
54+
// }
5555
Role::Server => {
5656
run_server(opts).await;
5757
}
58-
Role::GossipingServer1
59-
| Role::GossipingServer2
60-
| Role::GossipingServer3
61-
| Role::GossipingServer4
62-
| Role::GossipingServer5 => run_gossiping_server(opts).await,
58+
// Role::GossipingServer1
59+
// | Role::GossipingServer2
60+
// | Role::GossipingServer3
61+
// | Role::GossipingServer4
62+
// | Role::GossipingServer5 => run_gossiping_server(opts).await,
63+
_ => unimplemented!(),
6364
}
6465
}
6566

hydroflow/examples/chat/server.rs

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use hydroflow::hydroflow_syntax;
22
use hydroflow::scheduled::graph::Hydroflow;
3-
use hydroflow::util::bind_udp_bytes;
3+
use hydroflow::util::{bind_udp_bytes, bind_websocket};
44

55
use crate::protocol::{Message, MessageWithAddr};
66
use crate::{default_server_address, Opts};
@@ -14,27 +14,20 @@ pub(crate) async fn run_server(opts: Opts) {
1414

1515
println!("Starting server on {:?}", server_address);
1616

17-
let (outbound, inbound, actual_server_addr) = bind_udp_bytes(server_address).await;
17+
let (outbound, inbound, actual_server_addr) = bind_websocket(server_address).await.unwrap();
1818

1919
println!("Server is live! Listening on {:?}", actual_server_addr);
2020

2121
let mut hf: Hydroflow = hydroflow_syntax! {
2222
// Define shared inbound and outbound channels
23-
outbound_chan = union() -> dest_sink_serde(outbound);
24-
inbound_chan = source_stream_serde(inbound)
23+
inbound_chan = source_stream(inbound)
2524
-> map(Result::unwrap)
26-
-> map(|(msg, addr)| MessageWithAddr::from_message(msg, addr))
27-
-> demux_enum::<MessageWithAddr>();
28-
clients = inbound_chan[ConnectRequest] -> map(|(addr,)| addr) -> tee();
29-
inbound_chan[ConnectResponse] -> for_each(|(addr,)| println!("Received unexpected `ConnectResponse` as server from addr {}.", addr));
30-
31-
// Pipeline 1: Acknowledge client connections
32-
clients[0] -> map(|addr| (Message::ConnectResponse, addr)) -> [0]outbound_chan;
33-
34-
// Pipeline 2: Broadcast messages to all clients
35-
inbound_chan[ChatMsg] -> map(|(_addr, nickname, message, ts)| Message::ChatMsg { nickname, message, ts }) -> [0]broadcast;
36-
clients[1] -> [1]broadcast;
37-
broadcast = cross_join::<'tick, 'static>() -> [1]outbound_chan;
25+
-> tee();
26+
27+
inbound_chan -> map(|(msg, addr)| addr) -> [1]broadcast;
28+
inbound_chan -> map(|(msg, addr)| msg) -> [0]broadcast;
29+
30+
broadcast = cross_join::<'tick, 'static>() -> dest_sink(outbound);
3831
};
3932

4033
#[cfg(feature = "debugging")]

hydroflow/examples/echoserver/main.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::net::SocketAddr;
22

33
use clap::{Parser, ValueEnum};
44
use client::run_client;
5-
use hydroflow::util::{bind_udp_bytes, ipv4_resolve};
5+
use hydroflow::util::{bind_udp_bytes, bind_websocket, ipv4_resolve};
66
use server::run_server;
77

88
mod client;
@@ -35,16 +35,14 @@ async fn main() {
3535
.unwrap_or_else(|| ipv4_resolve("localhost:0").unwrap());
3636

3737
// allocate `outbound` sink and `inbound` stream
38-
let (outbound, inbound, addr) = bind_udp_bytes(addr).await;
38+
let (outbound, inbound, addr) = bind_websocket(addr).await.unwrap();
3939
println!("Listening on {:?}", addr);
4040

4141
match opts.role {
4242
Role::Server => {
4343
run_server(outbound, inbound, opts).await;
4444
}
45-
Role::Client => {
46-
run_client(outbound, inbound, opts).await;
47-
}
45+
_ => panic!("Unsupported!")
4846
}
4947
}
5048

hydroflow/src/util/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ pub use socket::*;
2929

3030
#[cfg(feature = "deploy_integration")]
3131
pub mod deploy;
32+
mod websocket;
33+
pub use websocket::*;
3234

3335
use std::io::Read;
3436
use std::net::SocketAddr;

hydroflow/src/util/websocket.rs

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
use std::cell::RefCell;
2+
use std::collections::HashMap;
3+
use std::net::SocketAddr;
4+
use std::pin::pin;
5+
use std::rc::Rc;
6+
use futures::{SinkExt, StreamExt};
7+
use tokio::net::{TcpListener};
8+
use tokio::task::spawn_local;
9+
use tokio_tungstenite::tungstenite::{Error, Message};
10+
use crate::util::unsync::mpsc::{Receiver, Sender};
11+
use crate::util::unsync_channel;
12+
13+
pub async fn bind_websocket(endpoint: SocketAddr) -> Result<(Sender<(Message, SocketAddr)>, Receiver<Result<(Message, SocketAddr), Error>>, SocketAddr), std::io::Error>{
14+
let listener = TcpListener::bind(endpoint).await.unwrap();
15+
16+
let bound_endpoint = listener.local_addr()?;
17+
18+
let (tx_egress, mut rx_egress) = unsync_channel(None);
19+
let (tx_ingress, rx_ingress) = unsync_channel(None);
20+
21+
let clients = Rc::new(RefCell::new(HashMap::new()));
22+
23+
spawn_local({
24+
let clients = clients.clone();
25+
26+
async move {
27+
while let Some((payload, addr)) = rx_egress.next().await {
28+
let client = clients.borrow_mut().remove(&addr);
29+
30+
if let Some(mut sender) = client {
31+
let _ = SinkExt::send(&mut sender, payload).await;
32+
clients.borrow_mut().insert(addr, sender);
33+
}
34+
}
35+
}
36+
});
37+
38+
// Spawn the listener
39+
spawn_local(async move {
40+
loop {
41+
let (stream, peer_addr) = if let Ok((stream, _)) = listener.accept().await {
42+
if let Ok(peer_addr) = stream.peer_addr() {
43+
(stream, peer_addr)
44+
} else {
45+
continue;
46+
}
47+
} else {
48+
continue;
49+
};
50+
51+
// Perform the websocket handshake
52+
let ws_stream = tokio_tungstenite::accept_async(stream)
53+
.await
54+
.expect("Error during the websocket handshake occurred");
55+
56+
// Split the stream into incoming and outgoing
57+
let (outgoing, incoming) = ws_stream.split();
58+
let mut tx_ingress = tx_ingress.clone();
59+
60+
clients.borrow_mut().insert(peer_addr, outgoing);
61+
62+
spawn_local({
63+
let clients = clients.clone();
64+
async move {
65+
let mapped = incoming.map(|x| Ok(x.map(|x| (x, peer_addr))));
66+
let _ = tx_ingress.send_all(&mut pin!(mapped)).await;
67+
68+
clients.borrow_mut().remove(&peer_addr);
69+
}
70+
});
71+
}
72+
73+
});
74+
75+
Ok((tx_egress, rx_ingress, bound_endpoint))
76+
}
77+

0 commit comments

Comments
 (0)