Skip to content

Commit 9f97207

Browse files
committed
async: add connection module.
For abstract connections, our connections are handled as one sending task and one receiving task. We can use the same logic to handle it. Signed-off-by: wllenyj <[email protected]>
1 parent 69bd03d commit 9f97207

File tree

2 files changed

+112
-1
lines changed

2 files changed

+112
-1
lines changed

src/asynchronous/connection.rs

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
// Copyright 2022 Alibaba Cloud. All rights reserved.
2+
// Copyright (c) 2020 Ant Financial
3+
//
4+
// SPDX-License-Identifier: Apache-2.0
5+
//
6+
7+
use std::os::unix::io::AsRawFd;
8+
9+
use async_trait::async_trait;
10+
use log::{error, trace};
11+
use tokio::{
12+
io::{split, AsyncRead, AsyncWrite, ReadHalf},
13+
select, task,
14+
};
15+
16+
use crate::error::Error;
17+
use crate::proto::GenMessage;
18+
19+
pub trait Builder {
20+
type Reader;
21+
type Writer;
22+
23+
fn build(&mut self) -> (Self::Reader, Self::Writer);
24+
}
25+
26+
#[async_trait]
27+
pub trait WriterDelegate {
28+
async fn recv(&mut self) -> Option<GenMessage>;
29+
async fn disconnect(&self, msg: &GenMessage, e: Error);
30+
async fn exit(&self);
31+
}
32+
33+
#[async_trait]
34+
pub trait ReaderDelegate {
35+
async fn wait_shutdown(&self);
36+
async fn disconnect(&self, e: Error, task: &mut task::JoinHandle<()>);
37+
async fn exit(&self);
38+
async fn handle_msg(&self, msg: GenMessage);
39+
}
40+
41+
pub struct Connection<S, B: Builder> {
42+
reader: ReadHalf<S>,
43+
writer_task: task::JoinHandle<()>,
44+
reader_delegate: B::Reader,
45+
}
46+
47+
impl<S, B> Connection<S, B>
48+
where
49+
S: AsyncRead + AsyncWrite + AsRawFd + Send + 'static,
50+
B: Builder,
51+
B::Reader: ReaderDelegate + Send + Sync + 'static,
52+
B::Writer: WriterDelegate + Send + Sync + 'static,
53+
{
54+
pub fn new(conn: S, mut builder: B) -> Self {
55+
let (reader, mut writer) = split(conn);
56+
57+
let (reader_delegate, mut writer_delegate) = builder.build();
58+
59+
let writer_task = tokio::spawn(async move {
60+
while let Some(msg) = writer_delegate.recv().await {
61+
trace!("write message: {:?}", msg);
62+
if let Err(e) = msg.write_to(&mut writer).await {
63+
error!("write_message got error: {:?}", e);
64+
writer_delegate.disconnect(&msg, e).await;
65+
}
66+
}
67+
writer_delegate.exit().await;
68+
trace!("Writer task exit.");
69+
});
70+
71+
Self {
72+
reader,
73+
writer_task,
74+
reader_delegate,
75+
}
76+
}
77+
78+
pub async fn run(self) -> std::io::Result<()> {
79+
let Connection {
80+
mut reader,
81+
mut writer_task,
82+
reader_delegate,
83+
} = self;
84+
loop {
85+
select! {
86+
res = GenMessage::read_from(&mut reader) => {
87+
match res {
88+
Ok(msg) => {
89+
trace!("Got Message {:?}", msg);
90+
reader_delegate.handle_msg(msg).await;
91+
}
92+
Err(e) => {
93+
trace!("Read msg err: {:?}", e);
94+
reader_delegate.disconnect(e, &mut writer_task).await;
95+
break;
96+
}
97+
}
98+
}
99+
_v = reader_delegate.wait_shutdown() => {
100+
trace!("Receive shutdown.");
101+
break;
102+
}
103+
}
104+
}
105+
reader_delegate.exit().await;
106+
trace!("Reader task exit.");
107+
108+
Ok(())
109+
}
110+
}

src/asynchronous/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ mod stream;
1111
#[macro_use]
1212
#[doc(hidden)]
1313
mod utils;
14-
mod unix_incoming;
14+
mod connection;
1515
pub mod shutdown;
16+
mod unix_incoming;
1617

1718
#[doc(inline)]
1819
pub use crate::r#async::client::Client;

0 commit comments

Comments
 (0)