13
13
// limitations under the License.
14
14
15
15
//! Sync server of ttrpc.
16
- //!
16
+ //!
17
17
18
18
#[ cfg( unix) ]
19
19
use std:: os:: unix:: io:: { AsRawFd , FromRawFd , RawFd } ;
@@ -23,16 +23,16 @@ use std::collections::HashMap;
23
23
use std:: sync:: atomic:: { AtomicBool , AtomicUsize , Ordering } ;
24
24
use std:: sync:: mpsc:: { channel, sync_channel, Receiver , Sender , SyncSender } ;
25
25
use std:: sync:: { Arc , Mutex } ;
26
+ use std:: thread;
26
27
use std:: thread:: JoinHandle ;
27
- use std:: { thread} ;
28
28
29
29
use super :: utils:: response_to_channel;
30
30
use crate :: context;
31
31
use crate :: error:: { get_status, Error , Result } ;
32
32
use crate :: proto:: { Code , MessageHeader , Request , Response , MESSAGE_TYPE_REQUEST } ;
33
33
use crate :: sync:: channel:: { read_message, write_message} ;
34
+ use crate :: sync:: sys:: { PipeConnection , PipeListener } ;
34
35
use crate :: { MethodHandler , TtrpcContext } ;
35
- use crate :: sync:: sys:: { PipeListener , PipeConnection } ;
36
36
37
37
// poll_queue will create WAIT_THREAD_COUNT_DEFAULT threads in begin.
38
38
// If wait thread count < WAIT_THREAD_COUNT_MIN, create number to WAIT_THREAD_COUNT_DEFAULT.
@@ -64,7 +64,7 @@ struct Connection {
64
64
}
65
65
66
66
impl Connection {
67
- fn close ( & self ) {
67
+ fn close ( & self ) {
68
68
self . connection . close ( ) . unwrap_or ( ( ) ) ;
69
69
}
70
70
@@ -77,7 +77,7 @@ impl Connection {
77
77
}
78
78
79
79
struct ThreadS < ' a > {
80
- connection : & ' a Arc < PipeConnection > ,
80
+ connection : & ' a Arc < PipeConnection > ,
81
81
fdlock : & ' a Arc < Mutex < ( ) > > ,
82
82
wtc : & ' a Arc < AtomicUsize > ,
83
83
quit : & ' a Arc < AtomicBool > ,
@@ -300,7 +300,7 @@ impl Server {
300
300
}
301
301
302
302
let listener = PipeListener :: new_from_fd ( fd) ?;
303
-
303
+
304
304
self . listeners . push ( Arc :: new ( listener) ) ;
305
305
306
306
Ok ( self )
@@ -339,8 +339,6 @@ impl Server {
339
339
340
340
self . listener_quit_flag . store ( false , Ordering :: SeqCst ) ;
341
341
342
-
343
-
344
342
let listener = self . listeners [ 0 ] . clone ( ) ;
345
343
let methods = self . methods . clone ( ) ;
346
344
let default = self . thread_count_default ;
@@ -383,15 +381,13 @@ impl Server {
383
381
let handler = thread:: Builder :: new ( )
384
382
. name ( "listener_loop" . into ( ) )
385
383
. spawn ( move || {
386
- loop {
384
+ loop {
387
385
trace ! ( "listening..." ) ;
388
386
let pipe_connection = match listener. accept ( & listener_quit_flag) {
389
387
Ok ( None ) => {
390
388
continue ;
391
389
}
392
- Ok ( Some ( conn) ) => {
393
- Arc :: new ( conn)
394
- }
390
+ Ok ( Some ( conn) ) => Arc :: new ( conn) ,
395
391
Err ( e) => {
396
392
error ! ( "listener accept got {:?}" , e) ;
397
393
break ;
@@ -505,12 +501,10 @@ impl Server {
505
501
pub fn stop_listen ( mut self ) -> Self {
506
502
self . listener_quit_flag . store ( true , Ordering :: SeqCst ) ;
507
503
508
- self . listeners [ 0 ] . close ( ) . unwrap_or_else ( |e| {
509
- warn ! (
510
- "failed to close connection with error: {}" , e
511
- )
512
- } ) ;
513
-
504
+ self . listeners [ 0 ]
505
+ . close ( )
506
+ . unwrap_or_else ( |e| warn ! ( "failed to close connection with error: {}" , e) ) ;
507
+
514
508
info ! ( "close monitor" ) ;
515
509
if let Some ( handler) = self . handler . take ( ) {
516
510
handler. join ( ) . unwrap ( ) ;
0 commit comments