forked from kvc0/protosocket
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.rs
More file actions
143 lines (128 loc) · 4.87 KB
/
server.rs
File metadata and controls
143 lines (128 loc) · 4.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
use std::{future::Future, sync::atomic::AtomicUsize};
use futures::{future::BoxFuture, stream::BoxStream, FutureExt, Stream, StreamExt};
use messages::{EchoRequest, EchoResponse, EchoStream, Request, Response, ResponseBehavior};
use protosocket_prost::ProstSerializer;
use protosocket_rpc::{
server::{ConnectionService, RpcKind, SocketService},
ProtosocketControlCode,
};
mod messages;
fn main() -> Result<(), Box<dyn std::error::Error>> {
static I: AtomicUsize = AtomicUsize::new(0);
let runtime = tokio::runtime::Builder::new_multi_thread()
.thread_name_fn(|| {
format!(
"app-{}",
I.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
)
})
.worker_threads(2)
.event_interval(7)
.enable_all()
.build()?;
runtime.block_on(run_main())
}
#[allow(clippy::expect_used)]
async fn run_main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
let mut server = protosocket_rpc::server::SocketRpcServer::new(
std::env::var("HOST")
.unwrap_or_else(|_| "0.0.0.0:9000".to_string())
.parse()?,
DemoRpcSocketService,
4 << 20,
1 << 20,
128,
64 << 10,
)
.await?;
server.set_max_queued_outbound_messages(512);
tokio::spawn(server).await??;
Ok(())
}
/// This is the service that will be used to handle new connections.
/// It doesn't do much; yours might be simple like this too, or it might wire your per-connection
/// ConnectionServices to application-wide state tracking.
struct DemoRpcSocketService;
impl SocketService for DemoRpcSocketService {
type RequestDeserializer = ProstSerializer<Request, Response>;
type ResponseSerializer = ProstSerializer<Request, Response>;
type ConnectionService = DemoRpcConnectionServer;
type Stream = tokio::net::TcpStream;
fn deserializer(&self) -> Self::RequestDeserializer {
Self::RequestDeserializer::default()
}
fn serializer(&self) -> Self::ResponseSerializer {
Self::ResponseSerializer::default()
}
fn new_connection_service(&self, address: std::net::SocketAddr) -> Self::ConnectionService {
log::info!("new connection server {address}");
DemoRpcConnectionServer { address }
}
fn accept_stream(
&self,
stream: tokio::net::TcpStream,
) -> impl Future<Output = std::io::Result<Self::Stream>> + Send + 'static {
std::future::ready(Ok(stream))
}
}
/// This is the entry point for each Connection. State per-connection is tracked, and you
/// get mutable access to the service on each new rpc for state tracking.
struct DemoRpcConnectionServer {
address: std::net::SocketAddr,
}
impl ConnectionService for DemoRpcConnectionServer {
type Request = Request;
type Response = Response;
// Ideally you'd use real Future and Stream types here for performance and debuggability.
// For a demo though, it's fine to use BoxFuture and BoxStream.
type UnaryFutureType = BoxFuture<'static, Response>;
type StreamType = BoxStream<'static, Response>;
fn new_rpc(
&mut self,
initiating_message: Self::Request,
) -> RpcKind<Self::UnaryFutureType, Self::StreamType> {
log::debug!("{} new rpc: {initiating_message:?}", self.address);
let request_id = initiating_message.request_id;
let behavior = initiating_message.response_behavior();
match initiating_message.body {
Some(echo) => match behavior {
ResponseBehavior::Unary => RpcKind::Unary(echo_request(request_id, echo).boxed()),
ResponseBehavior::Stream => {
RpcKind::Streaming(echo_stream(request_id, echo).boxed())
}
},
None => {
// No completion messages will be sent for this message
log::warn!(
"{request_id} no request in rpc body. This may cause a client memory leak."
);
RpcKind::Unknown
}
}
}
}
async fn echo_request(request_id: u64, echo: EchoRequest) -> Response {
Response {
request_id,
code: ProtosocketControlCode::Normal as u32,
kind: Some(messages::EchoResponseKind::Echo(EchoResponse {
message: echo.message,
nanotime: echo.nanotime,
})),
}
}
fn echo_stream(request_id: u64, echo: EchoRequest) -> impl Stream<Item = Response> {
let nanotime = echo.nanotime;
futures::stream::iter(echo.message.into_bytes().into_iter().enumerate().map(
move |(sequence, c)| Response {
request_id,
code: ProtosocketControlCode::Normal as u32,
kind: Some(messages::EchoResponseKind::Stream(EchoStream {
message: (c as char).to_string(),
nanotime,
sequence: sequence as u64,
})),
},
))
}