Skip to content

Commit 0b5abea

Browse files
committed
examples: Add examples about graceful shutdown and server restart
Test graceful shutdown and serrver restart. Signed-off-by: Tim Zhang <[email protected]>
1 parent bc395cb commit 0b5abea

File tree

3 files changed

+37
-21
lines changed

3 files changed

+37
-21
lines changed

example/Cargo.lock

Lines changed: 14 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

example/async-server.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,19 +92,32 @@ async fn main() {
9292
let a = Arc::new(a);
9393
let aservice = agent_ttrpc::create_agent_service(a);
9494

95-
let server = Server::new()
95+
let mut server = Server::new()
9696
.bind("unix:///tmp/1")
9797
.unwrap()
9898
.register_service(hservice)
9999
.register_service(aservice);
100100

101-
let mut stream = signal(SignalKind::hangup()).unwrap();
101+
let mut hangup = signal(SignalKind::hangup()).unwrap();
102+
let mut interrupt = signal(SignalKind::interrupt()).unwrap();
103+
server.start().await.unwrap();
104+
102105
tokio::select! {
103-
_ = stream.recv() => {
104-
println!("signal received")
106+
_ = hangup.recv() => {
107+
// test stop_listen -> start
108+
println!("stop listen");
109+
server.stop_listen().await;
110+
println!("start listen");
111+
server.start().await.unwrap();
112+
113+
// hold some time for the new test connection.
114+
let timeout = tokio::time::delay_for(std::time::Duration::from_secs(100));
115+
timeout.await;
105116
}
106-
_ = server.start() => {
107-
println!("server exit")
117+
_ = interrupt.recv() => {
118+
// test graceful shutdown
119+
println!("graceful shutdown");
120+
server.shutdown().await.unwrap();
108121
}
109122
};
110123
}

src/asynchronous/server.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,12 +127,13 @@ impl Server {
127127
S: AsyncRead + AsyncWrite + AsRawFd + Send + 'static,
128128
{
129129
let methods = self.methods.clone();
130+
130131
let (disconnect_tx, close_conn_rx) = watch::channel(0);
131132
self.disconnect_tx = Some(disconnect_tx);
132133

133134
let (conn_done_tx, all_conn_done_rx) = channel::<i32>(1);
134-
135135
self.all_conn_done_rx = Some(all_conn_done_rx);
136+
136137
let (stop_listen_tx, mut stop_listen_rx) = channel(1);
137138
self.stop_listen_tx = Some(stop_listen_tx);
138139

@@ -216,6 +217,8 @@ impl Server {
216217
}
217218
fd_tx = stop_listen_rx.recv() => {
218219
if let Some(mut fd_tx) = fd_tx {
220+
// dup fd to keep the listener open
221+
// or the listener will be closed when the incoming was dropped.
219222
let dup_fd = unistd::dup(incoming.as_raw_fd()).unwrap();
220223
common::set_fd_close_exec(dup_fd).unwrap();
221224
drop(incoming);

0 commit comments

Comments
 (0)