Skip to content

Commit 762fe2a

Browse files
committed
support restart server
1. split shutdown to shutdown_listener and shutdown_connection 2. add start_listen Signed-off-by: quanweiZhou <[email protected]>
1 parent 59381ad commit 762fe2a

File tree

1 file changed

+77
-38
lines changed

1 file changed

+77
-38
lines changed

src/sync/server.rs

Lines changed: 77 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,11 @@ type MessageReceiver = Receiver<(MessageHeader, Vec<u8>)>;
4949
pub struct Server {
5050
listeners: Vec<RawFd>,
5151
monitor_fd: (RawFd, RawFd),
52-
quit: Arc<AtomicBool>,
52+
listener_quit_flag: Arc<AtomicBool>,
5353
connections: Arc<Mutex<HashMap<RawFd, Connection>>>,
5454
methods: Arc<HashMap<String, Box<dyn MethodHandler + Send + Sync>>>,
5555
handler: Option<JoinHandle<()>>,
56+
reaper: Option<(Sender<i32>, JoinHandle<()>)>,
5657
thread_count_default: usize,
5758
thread_count_min: usize,
5859
thread_count_max: usize,
@@ -251,14 +252,14 @@ fn check_method_handler_threads(ts: &ThreadS) {
251252

252253
impl Default for Server {
253254
fn default() -> Self {
254-
let (rfd, wfd) = pipe2(OFlag::O_CLOEXEC).unwrap();
255255
Server {
256256
listeners: Vec::with_capacity(1),
257-
monitor_fd: (rfd, wfd),
258-
quit: Arc::new(AtomicBool::new(false)),
257+
monitor_fd: (-1, -1),
258+
listener_quit_flag: Arc::new(AtomicBool::new(false)),
259259
connections: Arc::new(Mutex::new(HashMap::new())),
260260
methods: Arc::new(HashMap::new()),
261261
handler: None,
262+
reaper: None,
262263
thread_count_default: DEFAULT_WAIT_THREAD_COUNT_DEFAULT,
263264
thread_count_min: DEFAULT_WAIT_THREAD_COUNT_MIN,
264265
thread_count_max: DEFAULT_WAIT_THREAD_COUNT_MAX,
@@ -315,40 +316,31 @@ impl Server {
315316
self
316317
}
317318

318-
pub fn start(&mut self) -> Result<()> {
319-
if self.thread_count_default >= self.thread_count_max {
320-
return Err(Error::Others(
321-
"thread_count_default should smaller than thread_count_max".to_string(),
322-
));
323-
}
324-
if self.thread_count_default <= self.thread_count_min {
325-
return Err(Error::Others(
326-
"thread_count_default should biger than thread_count_min".to_string(),
327-
));
328-
}
329-
319+
pub fn start_listener(&mut self) -> Result<()> {
330320
let connections = self.connections.clone();
331321

332322
if self.listeners.is_empty() {
333323
return Err(Error::Others("ttrpc-rust not bind".to_string()));
334324
}
335325

326+
self.listener_quit_flag.store(false, Ordering::SeqCst);
327+
let (rfd, wfd) = pipe2(OFlag::O_CLOEXEC).unwrap();
328+
self.monitor_fd = (rfd, wfd);
329+
336330
let listener = self.listeners[0];
337331

338332
let methods = self.methods.clone();
339333
let default = self.thread_count_default;
340334
let min = self.thread_count_min;
341335
let max = self.thread_count_max;
342-
let service_quit = self.quit.clone();
336+
let listener_quit_flag = self.listener_quit_flag.clone();
343337
let monitor_fd = self.monitor_fd.0;
344338

345-
let handler = thread::Builder::new()
346-
.name("listener_loop".into())
347-
.spawn(move || {
348-
let (reaper_tx, reaper_rx) = channel();
339+
let reaper_tx = match self.reaper.take() {
340+
None => {
349341
let reaper_connections = connections.clone();
350-
351-
let reaper = thread::Builder::new()
342+
let (reaper_tx, reaper_rx) = channel();
343+
let reaper_handler = thread::Builder::new()
352344
.name("reaper".into())
353345
.spawn(move || {
354346
for fd in reaper_rx.iter() {
@@ -360,11 +352,25 @@ impl Server {
360352
cn.handler.take().map(|handler| handler.join().unwrap())
361353
});
362354
}
355+
info!("reaper thread exited");
363356
})
364357
.unwrap();
358+
self.reaper = Some((reaper_tx.clone(), reaper_handler));
359+
reaper_tx
360+
}
361+
Some(r) => {
362+
let reaper_tx = r.0.clone();
363+
self.reaper = Some(r);
364+
reaper_tx
365+
}
366+
};
365367

368+
let handler = thread::Builder::new()
369+
.name("listener_loop".into())
370+
.spawn(move || {
366371
loop {
367-
if service_quit.load(Ordering::SeqCst) {
372+
if listener_quit_flag.load(Ordering::SeqCst) {
373+
info!("listener shutdown for quit flag");
368374
break;
369375
}
370376

@@ -384,6 +390,7 @@ impl Server {
384390
if e == nix::Error::from(nix::errno::Errno::EINTR) {
385391
continue;
386392
} else {
393+
error!("failed to select error {:?}", e);
387394
break;
388395
}
389396
}
@@ -393,15 +400,18 @@ impl Server {
393400
continue;
394401
}
395402

396-
if service_quit.load(Ordering::SeqCst) {
403+
if listener_quit_flag.load(Ordering::SeqCst) {
404+
info!("listener shutdown for quit flag");
397405
break;
398406
}
399407

400408
let fd = match accept4(listener, SockFlag::SOCK_CLOEXEC) {
401409
Ok(fd) => fd,
402-
Err(_e) => break,
410+
Err(e) => {
411+
error!("failed to accept error {:?}", e);
412+
break;
413+
}
403414
};
404-
405415
let methods = methods.clone();
406416
let quit = Arc::new(AtomicBool::new(false));
407417
let child_quit = quit.clone();
@@ -418,7 +428,7 @@ impl Server {
418428
for r in res_rx.iter() {
419429
trace!("response thread get {:?}", r);
420430
if let Err(e) = write_message(fd, r.0, r.1) {
421-
info!("write_message got {:?}", e);
431+
error!("write_message got {:?}", e);
422432
quit_res.store(true, Ordering::SeqCst);
423433
break;
424434
}
@@ -449,7 +459,6 @@ impl Server {
449459
break;
450460
}
451461
}
452-
453462
// drop the res_tx, thus the res_rx would get terminated notification.
454463
drop(res_tx);
455464
handler.join().unwrap_or(());
@@ -473,38 +482,68 @@ impl Server {
473482

474483
// notify reaper thread to exit.
475484
drop(reaper_tx);
476-
reaper.join().unwrap();
477-
info!("ttrpc server stopped");
485+
info!("ttrpc server listener stopped");
478486
})
479487
.unwrap();
480488

481489
self.handler = Some(handler);
482-
490+
info!("server listen started");
483491
Ok(())
484492
}
485493

486-
pub fn shutdown(mut self) {
487-
let connections = self.connections.lock().unwrap();
494+
pub fn start(&mut self) -> Result<()> {
495+
if self.thread_count_default >= self.thread_count_max {
496+
return Err(Error::Others(
497+
"thread_count_default should smaller than thread_count_max".to_string(),
498+
));
499+
}
500+
if self.thread_count_default <= self.thread_count_min {
501+
return Err(Error::Others(
502+
"thread_count_default should biger than thread_count_min".to_string(),
503+
));
504+
}
505+
self.start_listener()?;
506+
info!("server started");
507+
Ok(())
508+
}
488509

489-
self.quit.store(true, Ordering::SeqCst);
510+
pub fn shutdown_listen(mut self) -> Self {
511+
self.listener_quit_flag.store(true, Ordering::SeqCst);
490512
close(self.monitor_fd.1).unwrap_or_else(|e| {
491513
warn!(
492514
"failed to close notify fd: {} with error: {}",
493515
self.monitor_fd.1, e
494516
)
495517
});
518+
info!("close monitor");
519+
if let Some(handler) = self.handler.take() {
520+
handler.join().unwrap();
521+
}
522+
info!("listener thread stopped");
523+
self
524+
}
525+
526+
pub fn shutdown_connection(mut self) {
527+
info!("begin to shutdown connection");
528+
let connections = self.connections.lock().unwrap();
496529

497530
for (_fd, c) in connections.iter() {
498531
c.close();
499532
}
500-
501533
// release connections's lock, since the following handler.join()
502534
// would wait on the other thread's exit in which would take the lock.
503535
drop(connections);
536+
info!("connections closed");
504537

505-
if let Some(handler) = self.handler.take() {
506-
handler.join().unwrap();
538+
if let Some(r) = self.reaper.take() {
539+
drop(r.0);
540+
r.1.join().unwrap();
507541
}
542+
info!("reaper thread stopped");
543+
}
544+
545+
pub fn shutdown(self) {
546+
self.shutdown_listen().shutdown_connection();
508547
}
509548
}
510549

0 commit comments

Comments
 (0)