Skip to content

Commit 100cff9

Browse files
committed
sync: support cancel_rx in ttrpc context
add cancel_rx in ttrpcContext which can help method.handler to realize the client has been closed, like context.Done in golang ttrpc. Signed-off-by: yuqitao <[email protected]>
1 parent 73a6104 commit 100cff9

File tree

3 files changed

+72
-25
lines changed

3 files changed

+72
-25
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ thiserror = "1.0"
2020
async-trait = { version = "0.1.31", optional = true }
2121
tokio = { version = "1", features = ["rt", "sync", "io-util", "macros", "time"], optional = true }
2222
futures = { version = "0.3", optional = true }
23+
crossbeam = "0.8.0"
2324

2425
[target.'cfg(windows)'.dependencies]
2526
windows-sys = {version = "0.48", features = [ "Win32_Foundation", "Win32_Storage_FileSystem", "Win32_System_IO", "Win32_System_Pipes", "Win32_Security", "Win32_System_Threading"]}

src/sync/server.rs

Lines changed: 70 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ const DEFAULT_WAIT_THREAD_COUNT_MAX: usize = 5;
4343

4444
type MessageSender = Sender<(MessageHeader, Vec<u8>)>;
4545
type MessageReceiver = Receiver<(MessageHeader, Vec<u8>)>;
46+
type WorkloadSender = crossbeam::channel::Sender<(MessageHeader, Vec<u8>)>;
47+
type WorkloadReceiver = crossbeam::channel::Receiver<(MessageHeader, Vec<u8>)>;
4648

4749
/// A ttrpc Server (sync).
4850
pub struct Server {
@@ -78,12 +80,13 @@ impl Connection {
7880

7981
struct ThreadS<'a> {
8082
connection: &'a Arc<PipeConnection>,
81-
fdlock: &'a Arc<Mutex<()>>,
83+
workload_rx: &'a WorkloadReceiver,
8284
wtc: &'a Arc<AtomicUsize>,
8385
quit: &'a Arc<AtomicBool>,
8486
methods: &'a Arc<HashMap<String, Box<dyn MethodHandler + Send + Sync>>>,
8587
res_tx: &'a MessageSender,
8688
control_tx: &'a SyncSender<()>,
89+
cancel_rx: &'a crossbeam::channel::Receiver<()>,
8790
default: usize,
8891
min: usize,
8992
max: usize,
@@ -92,12 +95,13 @@ struct ThreadS<'a> {
9295
#[allow(clippy::too_many_arguments)]
9396
fn start_method_handler_thread(
9497
connection: Arc<PipeConnection>,
95-
fdlock: Arc<Mutex<()>>,
98+
workload_rx: WorkloadReceiver,
9699
wtc: Arc<AtomicUsize>,
97100
quit: Arc<AtomicBool>,
98101
methods: Arc<HashMap<String, Box<dyn MethodHandler + Send + Sync>>>,
99102
res_tx: MessageSender,
100103
control_tx: SyncSender<()>,
104+
cancel_rx: crossbeam::channel::Receiver<()>,
101105
min: usize,
102106
max: usize,
103107
) {
@@ -109,18 +113,7 @@ fn start_method_handler_thread(
109113
break;
110114
}
111115

112-
let result;
113-
{
114-
let _guard = fdlock.lock().unwrap();
115-
if quit.load(Ordering::SeqCst) {
116-
// notify the connection dealing main thread to stop.
117-
control_tx
118-
.send(())
119-
.unwrap_or_else(|err| trace!("Failed to send {:?}", err));
120-
break;
121-
}
122-
result = read_message(&connection);
123-
}
116+
let result = workload_rx.recv();
124117

125118
if quit.load(Ordering::SeqCst) {
126119
// notify the connection dealing main thread to stop.
@@ -146,22 +139,18 @@ fn start_method_handler_thread(
146139
buf = y;
147140
}
148141
Err(x) => match x {
149-
Error::Socket(y) => {
150-
trace!("Socket error {}", y);
142+
crossbeam::channel::RecvError => {
143+
trace!("workload_rx recv error");
151144
quit.store(true, Ordering::SeqCst);
152-
// the client connection would be closed and
145+
// the workload tx would be dropped and
153146
// the connection dealing main thread would
154147
// have exited.
155148
control_tx
156149
.send(())
157150
.unwrap_or_else(|err| trace!("Failed to send {:?}", err));
158-
trace!("Socket error send control_tx");
151+
trace!("workload_rx recv error, send control_tx");
159152
break;
160153
}
161-
_ => {
162-
trace!("Others error {:?}", x);
163-
continue;
164-
}
165154
},
166155
}
167156

@@ -211,6 +200,7 @@ fn start_method_handler_thread(
211200
};
212201
let ctx = TtrpcContext {
213202
fd: connection.id(),
203+
cancel_rx: cancel_rx.clone(),
214204
mh,
215205
res_tx: res_tx.clone(),
216206
metadata: context::from_pb(&req.metadata),
@@ -238,12 +228,13 @@ fn start_method_handler_threads(num: usize, ts: &ThreadS) {
238228
}
239229
start_method_handler_thread(
240230
ts.connection.clone(),
241-
ts.fdlock.clone(),
231+
ts.workload_rx.clone(),
242232
ts.wtc.clone(),
243233
ts.quit.clone(),
244234
ts.methods.clone(),
245235
ts.res_tx.clone(),
246236
ts.control_tx.clone(),
237+
ts.cancel_rx.clone(),
247238
ts.min,
248239
ts.max,
249240
);
@@ -421,16 +412,68 @@ impl Server {
421412
trace!("response thread quit");
422413
});
423414

424-
let pipe = pipe_connection_child.clone();
425415
let (control_tx, control_rx): (SyncSender<()>, Receiver<()>) =
426416
sync_channel(0);
417+
418+
// start read message thread
419+
let quit_reader = child_quit.clone();
420+
let pipe_reader = pipe_connection_child.clone();
421+
let (workload_tx, workload_rx): (WorkloadSender, WorkloadReceiver) =
422+
crossbeam::channel::unbounded();
423+
let (cancel_tx, cancel_rx) = crossbeam::channel::unbounded::<()>();
424+
let control_tx_reader = control_tx.clone();
425+
let reader = thread::spawn(move || {
426+
while !quit_reader.load(Ordering::SeqCst) {
427+
let msg = read_message(&pipe_reader);
428+
match msg {
429+
Ok((x, y)) => {
430+
let res = workload_tx.send((x, y));
431+
match res {
432+
Ok(_) => {}
433+
Err(crossbeam::channel::SendError(e)) => {
434+
error!("Send workload error {:?}", e);
435+
quit_reader.store(true, Ordering::SeqCst);
436+
control_tx_reader.send(()).unwrap_or_else(
437+
|err| trace!("Failed to send {:?}", err),
438+
);
439+
break;
440+
}
441+
}
442+
}
443+
Err(x) => match x {
444+
Error::Socket(y) => {
445+
trace!("Socket error {}", y);
446+
drop(cancel_tx);
447+
quit_reader.store(true, Ordering::SeqCst);
448+
// the client connection would be closed and
449+
// the connection dealing main thread would
450+
// have exited.
451+
control_tx_reader.send(()).unwrap_or_else(|err| {
452+
trace!("Failed to send {:?}", err)
453+
});
454+
trace!("Socket error send control_tx");
455+
break;
456+
}
457+
_ => {
458+
trace!("Other error {:?}", x);
459+
continue;
460+
}
461+
},
462+
}
463+
}
464+
465+
trace!("read message thread quit");
466+
});
467+
468+
let pipe = pipe_connection_child.clone();
427469
let ts = ThreadS {
428470
connection: &pipe,
429-
fdlock: &Arc::new(Mutex::new(())),
471+
workload_rx: &workload_rx,
430472
wtc: &Arc::new(AtomicUsize::new(0)),
431473
methods: &methods,
432474
res_tx: &res_tx,
433475
control_tx: &control_tx,
476+
cancel_rx: &cancel_rx,
434477
quit: &child_quit,
435478
default,
436479
min,
@@ -449,7 +492,9 @@ impl Server {
449492
drop(control_rx);
450493
// drop the res_tx, thus the res_rx would get terminated notification.
451494
drop(res_tx);
495+
drop(workload_rx);
452496
handler.join().unwrap_or(());
497+
reader.join().unwrap_or(());
453498
// client_handler should not close fd before exit
454499
// , which prevent fd reuse issue.
455500
reaper_tx_child.send(pipe.id()).unwrap();

src/sync/utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ pub struct TtrpcContext {
101101
pub fd: std::os::unix::io::RawFd,
102102
#[cfg(windows)]
103103
pub fd: i32,
104+
pub cancel_rx: crossbeam::channel::Receiver<()>,
104105
pub mh: MessageHeader,
105106
pub res_tx: std::sync::mpsc::Sender<(MessageHeader, Vec<u8>)>,
106107
pub metadata: HashMap<String, Vec<String>>,

0 commit comments

Comments
 (0)