Skip to content

Commit e5e1dbe

Browse files
authored
Merge pull request #197 from yuqitao/support-cancel_rx
sync: support cancel_rx in ttrpc context
2 parents f00994f + 100cff9 commit e5e1dbe

File tree

3 files changed

+84
-43
lines changed

3 files changed

+84
-43
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ thiserror = "1.0"
2020
async-trait = { version = "0.1.31", optional = true }
2121
tokio = { version = "1", features = ["rt", "sync", "io-util", "macros", "time"], optional = true }
2222
futures = { version = "0.3", optional = true }
23+
crossbeam = "0.8.0"
2324

2425
[target.'cfg(windows)'.dependencies]
2526
windows-sys = {version = "0.48", features = [ "Win32_Foundation", "Win32_Storage_FileSystem", "Win32_System_IO", "Win32_System_Pipes", "Win32_Security", "Win32_System_Threading"]}

src/sync/server.rs

Lines changed: 82 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// limitations under the License.
1414

1515
//! Sync server of ttrpc.
16-
//!
16+
//!
1717
1818
#[cfg(unix)]
1919
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
@@ -23,16 +23,16 @@ use std::collections::HashMap;
2323
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
2424
use std::sync::mpsc::{channel, sync_channel, Receiver, Sender, SyncSender};
2525
use std::sync::{Arc, Mutex};
26+
use std::thread;
2627
use std::thread::JoinHandle;
27-
use std::{thread};
2828

2929
use super::utils::response_to_channel;
3030
use crate::context;
3131
use crate::error::{get_status, Error, Result};
3232
use crate::proto::{Code, MessageHeader, Request, Response, MESSAGE_TYPE_REQUEST};
3333
use crate::sync::channel::{read_message, write_message};
34+
use crate::sync::sys::{PipeConnection, PipeListener};
3435
use crate::{MethodHandler, TtrpcContext};
35-
use crate::sync::sys::{PipeListener, PipeConnection};
3636

3737
// poll_queue will create WAIT_THREAD_COUNT_DEFAULT threads in begin.
3838
// If wait thread count < WAIT_THREAD_COUNT_MIN, create number to WAIT_THREAD_COUNT_DEFAULT.
@@ -43,6 +43,8 @@ const DEFAULT_WAIT_THREAD_COUNT_MAX: usize = 5;
4343

4444
type MessageSender = Sender<(MessageHeader, Vec<u8>)>;
4545
type MessageReceiver = Receiver<(MessageHeader, Vec<u8>)>;
46+
type WorkloadSender = crossbeam::channel::Sender<(MessageHeader, Vec<u8>)>;
47+
type WorkloadReceiver = crossbeam::channel::Receiver<(MessageHeader, Vec<u8>)>;
4648

4749
/// A ttrpc Server (sync).
4850
pub struct Server {
@@ -64,7 +66,7 @@ struct Connection {
6466
}
6567

6668
impl Connection {
67-
fn close (&self) {
69+
fn close(&self) {
6870
self.connection.close().unwrap_or(());
6971
}
7072

@@ -77,13 +79,14 @@ impl Connection {
7779
}
7880

7981
struct ThreadS<'a> {
80-
connection: &'a Arc<PipeConnection>,
81-
fdlock: &'a Arc<Mutex<()>>,
82+
connection: &'a Arc<PipeConnection>,
83+
workload_rx: &'a WorkloadReceiver,
8284
wtc: &'a Arc<AtomicUsize>,
8385
quit: &'a Arc<AtomicBool>,
8486
methods: &'a Arc<HashMap<String, Box<dyn MethodHandler + Send + Sync>>>,
8587
res_tx: &'a MessageSender,
8688
control_tx: &'a SyncSender<()>,
89+
cancel_rx: &'a crossbeam::channel::Receiver<()>,
8790
default: usize,
8891
min: usize,
8992
max: usize,
@@ -92,12 +95,13 @@ struct ThreadS<'a> {
9295
#[allow(clippy::too_many_arguments)]
9396
fn start_method_handler_thread(
9497
connection: Arc<PipeConnection>,
95-
fdlock: Arc<Mutex<()>>,
98+
workload_rx: WorkloadReceiver,
9699
wtc: Arc<AtomicUsize>,
97100
quit: Arc<AtomicBool>,
98101
methods: Arc<HashMap<String, Box<dyn MethodHandler + Send + Sync>>>,
99102
res_tx: MessageSender,
100103
control_tx: SyncSender<()>,
104+
cancel_rx: crossbeam::channel::Receiver<()>,
101105
min: usize,
102106
max: usize,
103107
) {
@@ -109,18 +113,7 @@ fn start_method_handler_thread(
109113
break;
110114
}
111115

112-
let result;
113-
{
114-
let _guard = fdlock.lock().unwrap();
115-
if quit.load(Ordering::SeqCst) {
116-
// notify the connection dealing main thread to stop.
117-
control_tx
118-
.send(())
119-
.unwrap_or_else(|err| trace!("Failed to send {:?}", err));
120-
break;
121-
}
122-
result = read_message(&connection);
123-
}
116+
let result = workload_rx.recv();
124117

125118
if quit.load(Ordering::SeqCst) {
126119
// notify the connection dealing main thread to stop.
@@ -146,22 +139,18 @@ fn start_method_handler_thread(
146139
buf = y;
147140
}
148141
Err(x) => match x {
149-
Error::Socket(y) => {
150-
trace!("Socket error {}", y);
142+
crossbeam::channel::RecvError => {
143+
trace!("workload_rx recv error");
151144
quit.store(true, Ordering::SeqCst);
152-
// the client connection would be closed and
145+
// the workload tx would be dropped and
153146
// the connection dealing main thread would
154147
// have exited.
155148
control_tx
156149
.send(())
157150
.unwrap_or_else(|err| trace!("Failed to send {:?}", err));
158-
trace!("Socket error send control_tx");
151+
trace!("workload_rx recv error, send control_tx");
159152
break;
160153
}
161-
_ => {
162-
trace!("Others error {:?}", x);
163-
continue;
164-
}
165154
},
166155
}
167156

@@ -211,6 +200,7 @@ fn start_method_handler_thread(
211200
};
212201
let ctx = TtrpcContext {
213202
fd: connection.id(),
203+
cancel_rx: cancel_rx.clone(),
214204
mh,
215205
res_tx: res_tx.clone(),
216206
metadata: context::from_pb(&req.metadata),
@@ -238,12 +228,13 @@ fn start_method_handler_threads(num: usize, ts: &ThreadS) {
238228
}
239229
start_method_handler_thread(
240230
ts.connection.clone(),
241-
ts.fdlock.clone(),
231+
ts.workload_rx.clone(),
242232
ts.wtc.clone(),
243233
ts.quit.clone(),
244234
ts.methods.clone(),
245235
ts.res_tx.clone(),
246236
ts.control_tx.clone(),
237+
ts.cancel_rx.clone(),
247238
ts.min,
248239
ts.max,
249240
);
@@ -300,7 +291,7 @@ impl Server {
300291
}
301292

302293
let listener = PipeListener::new_from_fd(fd)?;
303-
294+
304295
self.listeners.push(Arc::new(listener));
305296

306297
Ok(self)
@@ -339,8 +330,6 @@ impl Server {
339330

340331
self.listener_quit_flag.store(false, Ordering::SeqCst);
341332

342-
343-
344333
let listener = self.listeners[0].clone();
345334
let methods = self.methods.clone();
346335
let default = self.thread_count_default;
@@ -383,15 +372,13 @@ impl Server {
383372
let handler = thread::Builder::new()
384373
.name("listener_loop".into())
385374
.spawn(move || {
386-
loop {
375+
loop {
387376
trace!("listening...");
388377
let pipe_connection = match listener.accept(&listener_quit_flag) {
389378
Ok(None) => {
390379
continue;
391380
}
392-
Ok(Some(conn)) => {
393-
Arc::new(conn)
394-
}
381+
Ok(Some(conn)) => Arc::new(conn),
395382
Err(e) => {
396383
error!("listener accept got {:?}", e);
397384
break;
@@ -425,16 +412,68 @@ impl Server {
425412
trace!("response thread quit");
426413
});
427414

428-
let pipe = pipe_connection_child.clone();
429415
let (control_tx, control_rx): (SyncSender<()>, Receiver<()>) =
430416
sync_channel(0);
417+
418+
// start read message thread
419+
let quit_reader = child_quit.clone();
420+
let pipe_reader = pipe_connection_child.clone();
421+
let (workload_tx, workload_rx): (WorkloadSender, WorkloadReceiver) =
422+
crossbeam::channel::unbounded();
423+
let (cancel_tx, cancel_rx) = crossbeam::channel::unbounded::<()>();
424+
let control_tx_reader = control_tx.clone();
425+
let reader = thread::spawn(move || {
426+
while !quit_reader.load(Ordering::SeqCst) {
427+
let msg = read_message(&pipe_reader);
428+
match msg {
429+
Ok((x, y)) => {
430+
let res = workload_tx.send((x, y));
431+
match res {
432+
Ok(_) => {}
433+
Err(crossbeam::channel::SendError(e)) => {
434+
error!("Send workload error {:?}", e);
435+
quit_reader.store(true, Ordering::SeqCst);
436+
control_tx_reader.send(()).unwrap_or_else(
437+
|err| trace!("Failed to send {:?}", err),
438+
);
439+
break;
440+
}
441+
}
442+
}
443+
Err(x) => match x {
444+
Error::Socket(y) => {
445+
trace!("Socket error {}", y);
446+
drop(cancel_tx);
447+
quit_reader.store(true, Ordering::SeqCst);
448+
// the client connection would be closed and
449+
// the connection dealing main thread would
450+
// have exited.
451+
control_tx_reader.send(()).unwrap_or_else(|err| {
452+
trace!("Failed to send {:?}", err)
453+
});
454+
trace!("Socket error send control_tx");
455+
break;
456+
}
457+
_ => {
458+
trace!("Other error {:?}", x);
459+
continue;
460+
}
461+
},
462+
}
463+
}
464+
465+
trace!("read message thread quit");
466+
});
467+
468+
let pipe = pipe_connection_child.clone();
431469
let ts = ThreadS {
432470
connection: &pipe,
433-
fdlock: &Arc::new(Mutex::new(())),
471+
workload_rx: &workload_rx,
434472
wtc: &Arc::new(AtomicUsize::new(0)),
435473
methods: &methods,
436474
res_tx: &res_tx,
437475
control_tx: &control_tx,
476+
cancel_rx: &cancel_rx,
438477
quit: &child_quit,
439478
default,
440479
min,
@@ -453,7 +492,9 @@ impl Server {
453492
drop(control_rx);
454493
// drop the res_tx, thus the res_rx would get terminated notification.
455494
drop(res_tx);
495+
drop(workload_rx);
456496
handler.join().unwrap_or(());
497+
reader.join().unwrap_or(());
457498
// client_handler should not close fd before exit
458499
// , which prevent fd reuse issue.
459500
reaper_tx_child.send(pipe.id()).unwrap();
@@ -505,12 +546,10 @@ impl Server {
505546
pub fn stop_listen(mut self) -> Self {
506547
self.listener_quit_flag.store(true, Ordering::SeqCst);
507548

508-
self.listeners[0].close().unwrap_or_else(|e| {
509-
warn!(
510-
"failed to close connection with error: {}", e
511-
)
512-
});
513-
549+
self.listeners[0]
550+
.close()
551+
.unwrap_or_else(|e| warn!("failed to close connection with error: {}", e));
552+
514553
info!("close monitor");
515554
if let Some(handler) = self.handler.take() {
516555
handler.join().unwrap();

src/sync/utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ pub struct TtrpcContext {
101101
pub fd: std::os::unix::io::RawFd,
102102
#[cfg(windows)]
103103
pub fd: i32,
104+
pub cancel_rx: crossbeam::channel::Receiver<()>,
104105
pub mh: MessageHeader,
105106
pub res_tx: std::sync::mpsc::Sender<(MessageHeader, Vec<u8>)>,
106107
pub metadata: HashMap<String, Vec<String>>,

0 commit comments

Comments
 (0)