Skip to content

Commit ed14956

Browse files
committed
Impl websocket
1 parent 7c6cfc7 commit ed14956

File tree

10 files changed

+844
-54
lines changed

10 files changed

+844
-54
lines changed

Cargo.toml

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
[package]
22
name = "routerify-websocket"
3-
version = "0.1.0"
4-
description = "An websocket extension for Routerify."
3+
version = "1.0.0"
4+
description = "The websocket support for the Routerify library."
55
homepage = "https://github.com/routerify/routerify-websocket"
66
repository = "https://github.com/routerify/routerify-websocket"
77
keywords = ["routerify", "hyper-rs", "hyper", "websocket", "ws"]
8-
categories = ["asynchronous","web-programming","web-programming::http-server"]
8+
categories = ["asynchronous", "web-programming", "web-programming::websocket"]
99
authors = ["Rousan Ali <[email protected]>"]
1010
readme = "README.md"
1111
license = "MIT"
@@ -19,17 +19,24 @@ features = ["all"]
1919

2020
[features]
2121
default = []
22-
all = []
22+
all = ["json"]
23+
json = ["serde", "serde_json"]
2324

2425
[dependencies]
2526
log = "0.4"
2627
derive_more = "0.99"
2728
routerify = "1.1"
2829
hyper = "0.13"
29-
tokio-tungstenite = { version = "0.10", default-features = false, optional = true }
30-
headers = { version = "0.3", optional = true }
31-
futures = { version = "0.3", default-features = false, optional = true }
32-
tokio = { version = "0.2", features = ["rt-core"], optional = true}
30+
headers = "0.3"
31+
tokio-tungstenite = { version = "0.10", default-features = false }
32+
futures = { version = "0.3", default-features = false }
33+
tokio = { version = "0.2", features = ["rt-core"] }
34+
35+
serde = { version = "1.0", optional = true }
36+
serde_json = { version = "1.0", optional = true }
3337

3438
[dev-dependencies]
35-
tokio = { version = "0.2", features = ["full"] }
39+
tokio = { version = "0.2", features = ["full"] }
40+
stream-body = "0.1"
41+
serde = { version = "1.0", features = ["derive"] }
42+
tokio-tungstenite = { version = "0.10", features = ["tls"] }

README.md

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
# routerify-websocket
77

8-
An websocket extension for Routerify.
8+
The `websocket` support for the [Routerify](https://github.com/routerify/routerify) library.
99

1010
[Docs](https://docs.rs/routerify-websocket)
1111

@@ -16,16 +16,75 @@ Add this to your `Cargo.toml` file:
1616
```toml
1717
[dependencies]
1818
routerify = "1.1"
19-
routerify-websocket = "0.1.0"
19+
routerify-websocket = "1.0"
2020
```
2121

2222
## Example
2323

2424
```rust
25-
use routerify_websocket;
25+
// Import `SinkExt` and `StreamExt` to send and read websocket messages.
26+
use futures::{SinkExt, StreamExt};
27+
use hyper::{Body, Response, Server};
28+
use routerify::{Router, RouterService};
29+
// Import websocket types.
30+
use routerify_websocket::{upgrade_ws, Message, WebSocket};
31+
use std::{convert::Infallible, net::SocketAddr};
2632

27-
fn main() {
28-
println!("{}", routerify_websocket::add(2, 3));
33+
// A handler for websocket connections.
34+
async fn ws_handler(ws: WebSocket) {
35+
println!("New websocket connection: {}", ws.remote_addr());
36+
37+
// The `WebSocket` implements the `Sink` and `Stream` traits
38+
// to read and write messages.
39+
let (mut tx, mut rx) = ws.split();
40+
41+
// Read messages.
42+
while let Some(msg) = rx.next().await {
43+
let msg = msg.unwrap();
44+
45+
// Check message type and take appropriate actions.
46+
if msg.is_text() {
47+
println!("{}", msg.into_text().unwrap());
48+
} else if msg.is_binary() {
49+
println!("{:?}", msg.into_bytes());
50+
}
51+
52+
// Send a text message.
53+
let send_msg = Message::text("Hello world");
54+
tx.send(send_msg).await.unwrap();
55+
}
56+
}
57+
58+
fn router() -> Router<Body, Infallible> {
59+
// Create a router and specify the path and the handler for new websocket connections.
60+
Router::builder()
61+
// It will accept websocket connections at `/ws` path with any method type.
62+
.any_method("/ws", upgrade_ws(ws_handler))
63+
// It will accept http connections at `/` path.
64+
.get("/", |_req| async move {
65+
Ok(Response::new("I also serve http requests".into()))
66+
})
67+
.build()
68+
.unwrap()
69+
}
70+
71+
#[tokio::main]
72+
async fn main() {
73+
let router = router();
74+
75+
// Create a Service from the router above to handle incoming requests.
76+
let service = RouterService::new(router).unwrap();
77+
78+
// The address on which the server will be listening.
79+
let addr = SocketAddr::from(([127, 0, 0, 1], 3001));
80+
81+
// Create a server by passing the created service to `.serve` method.
82+
let server = Server::bind(&addr).serve(service);
83+
84+
println!("App is running on: {}", addr);
85+
if let Err(err) = server.await {
86+
eprintln!("Server error: {}", err);
87+
}
2988
}
3089
```
3190

examples/simple_example.rs

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
// Import `SinkExt` and `StreamExt` to send and read websocket messages.
2+
use futures::{SinkExt, StreamExt};
3+
use hyper::{Body, Response, Server};
4+
use routerify::{Router, RouterService};
5+
// Import websocket types.
6+
use routerify_websocket::{upgrade_ws, Message, WebSocket};
7+
use std::{convert::Infallible, net::SocketAddr};
8+
9+
// A handler for websocket connections.
10+
async fn ws_handler(ws: WebSocket) {
11+
println!("New websocket connection: {}", ws.remote_addr());
12+
13+
// The `WebSocket` implements the `Sink` and `Stream` traits to read and write messages.
14+
let (mut tx, mut rx) = ws.split();
15+
16+
// Read messages.
17+
while let Some(msg) = rx.next().await {
18+
let msg = msg.unwrap();
19+
20+
// Check message type and take appropriate actions.
21+
if msg.is_text() {
22+
println!("{}", msg.into_text().unwrap());
23+
} else if msg.is_binary() {
24+
println!("{:?}", msg.into_bytes());
25+
}
26+
27+
// Send a text message.
28+
let send_msg = Message::text("Hello world");
29+
tx.send(send_msg).await.unwrap();
30+
}
31+
}
32+
33+
fn router() -> Router<Body, Infallible> {
34+
// Create a router and specify the path and the handler for new websocket connections.
35+
Router::builder()
36+
// It will accept websocket connections at `/ws` path with any method type.
37+
.any_method("/ws", upgrade_ws(ws_handler))
38+
// It will accept http connections at `/` path.
39+
.get("/", |_req| async move {
40+
Ok(Response::new("I also serve http requests".into()))
41+
})
42+
.build()
43+
.unwrap()
44+
}
45+
46+
#[tokio::main]
47+
async fn main() {
48+
let router = router();
49+
50+
// Create a Service from the router above to handle incoming requests.
51+
let service = RouterService::new(router).unwrap();
52+
53+
// The address on which the server will be listening.
54+
let addr = SocketAddr::from(([127, 0, 0, 1], 3001));
55+
56+
// Create a server by passing the created service to `.serve` method.
57+
let server = Server::bind(&addr).serve(service);
58+
59+
println!("App is running on: {}", addr);
60+
if let Err(err) = server.await {
61+
eprintln!("Server error: {}", err);
62+
}
63+
}

examples/test.rs

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,50 @@
1+
use futures::{Sink, SinkExt, StreamExt};
12
use hyper::{Body, Request, Response, Server};
23
use routerify::prelude::*;
34
use routerify::{Middleware, Router, RouterService};
5+
use routerify_websocket::{upgrade_ws, upgrade_ws_with_config, Message, WebSocket, WebSocketConfig};
6+
use serde::{Deserialize, Serialize};
47
use std::{convert::Infallible, net::SocketAddr};
8+
use tokio_tungstenite::{
9+
tungstenite::protocol::{frame::coding::CloseCode, CloseFrame, Message as ClientMessage},
10+
WebSocketStream,
11+
};
512

6-
async fn ws_handler(_: Request<Body>) -> Result<Response<Body>, Infallible> {
7-
Ok(Response::new(Body::from("ws handler")))
13+
#[derive(Serialize, Deserialize, Debug)]
14+
struct User {
15+
name: String,
16+
roll: u64,
17+
}
18+
19+
async fn ws_handler(ws: WebSocket) {
20+
println!("new websocket connection: {}", ws.remote_addr());
21+
22+
let (mut tx, mut rx) = ws.split();
23+
24+
while let Some(msg) = rx.next().await {
25+
let msg = msg.unwrap();
26+
27+
println!("{:?}", msg.close_reason());
28+
println!("{}", String::from_utf8(msg.into_bytes()).unwrap());
29+
}
830
}
931

1032
async fn logger(req: Request<Body>) -> Result<Request<Body>, Infallible> {
1133
println!("{} {} {}", req.remote_addr(), req.method(), req.uri().path());
1234
Ok(req)
1335
}
1436

37+
// A handler for "/about" page.
38+
async fn about_handler(_: Request<Body>) -> Result<Response<Body>, Infallible> {
39+
println!("{:?}", std::thread::current().id());
40+
Ok(Response::new(Body::from("About page")))
41+
}
42+
1543
fn router() -> Router<Body, Infallible> {
1644
Router::builder()
1745
.middleware(Middleware::pre(logger))
18-
.get("/ws/connect", ws_handler)
46+
.get("/about", about_handler)
47+
.any_method("/ws", upgrade_ws(ws_handler))
1948
.build()
2049
.unwrap()
2150
}
@@ -30,8 +59,25 @@ async fn main() {
3059

3160
let server = Server::bind(&addr).serve(service);
3261

62+
tokio::spawn(async move {
63+
tokio::time::delay_for(tokio::time::Duration::from_secs(3)).await;
64+
65+
let (mut ws, resp) = tokio_tungstenite::connect_async("ws://127.0.0.1:3001/ws")
66+
.await
67+
.unwrap();
68+
69+
println!("{:?}", resp.headers());
70+
71+
let msg = ClientMessage::text("hey");
72+
ws.send(msg).await.unwrap();
73+
74+
ws.close(None).await.unwrap();
75+
76+
tokio::time::delay_for(tokio::time::Duration::from_secs(3)).await;
77+
});
78+
3379
println!("App is running on: {}", addr);
3480
if let Err(err) = server.await {
3581
eprintln!("Server error: {}", err);
3682
}
37-
}
83+
}

examples/test_stream_body.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
use futures::{Sink, SinkExt, StreamExt};
2+
use hyper::{Request, Response, Server};
3+
use routerify::{Router, RouterService};
4+
use routerify_websocket::{upgrade_ws, WebSocket};
5+
use std::{convert::Infallible, net::SocketAddr};
6+
use stream_body::StreamBody;
7+
use tokio::prelude::*;
8+
9+
async fn ws_handler(ws: WebSocket) {
10+
println!("new websocket connection: {}", ws.remote_addr());
11+
12+
let (mut tx, mut rx) = ws.split();
13+
14+
while let Some(msg) = rx.next().await {
15+
let msg = msg.unwrap();
16+
17+
println!("{:?}", msg.close_reason());
18+
println!("{}", String::from_utf8(msg.into_bytes()).unwrap());
19+
}
20+
}
21+
22+
fn router() -> Router<StreamBody, Infallible> {
23+
Router::builder()
24+
.any_method("/ws", upgrade_ws(ws_handler))
25+
// Add options handler.
26+
.options(
27+
"/*",
28+
|_req| async move { Ok(Response::new(StreamBody::from("Options"))) },
29+
)
30+
// Add 404 page handler.
31+
.any(|_req| async move { Ok(Response::new(StreamBody::from("Not Found"))) })
32+
// Add an error handler.
33+
.err_handler(|err| async move { Response::new(StreamBody::from(format!("Error: {}", err))) })
34+
.build()
35+
.unwrap()
36+
}
37+
38+
#[tokio::main]
39+
async fn main() {
40+
let router = router();
41+
42+
// Create a Service from the router above to handle incoming requests.
43+
let service = RouterService::new(router).unwrap();
44+
45+
// The address on which the server will be listening.
46+
let addr = SocketAddr::from(([127, 0, 0, 1], 3001));
47+
48+
// Create a server by passing the created service to `.serve` method.
49+
let server = Server::bind(&addr).serve(service);
50+
51+
println!("App is running on: {}", addr);
52+
if let Err(err) = server.await {
53+
eprintln!("Server error: {}", err);
54+
}
55+
}

0 commit comments

Comments
 (0)