Skip to content

Commit 9126674

Browse files
committed
Improve Message codec.
Introduce Codec trait for protobuf message encoding/decoding. Signed-off-by: wllenyj <[email protected]>
1 parent 5da5061 commit 9126674

File tree

5 files changed

+248
-91
lines changed

5 files changed

+248
-91
lines changed

src/asynchronous/client.rs

Lines changed: 28 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,26 +4,25 @@
44
//
55

66
use nix::unistd::close;
7-
use protobuf::{CodedInputStream, CodedOutputStream, Message};
87
use std::collections::HashMap;
8+
use std::convert::TryInto;
99
use std::os::unix::io::RawFd;
1010
use std::sync::{Arc, Mutex};
1111

1212
use crate::common::client_connect;
1313
use crate::error::{Error, Result};
14-
use crate::proto::{Code, Request, Response, MESSAGE_TYPE_RESPONSE};
14+
use crate::proto::{Code, Codec, GenMessage, Message, Request, Response, MESSAGE_TYPE_RESPONSE};
1515

16-
use crate::asynchronous::stream::{receive, to_req_buf};
1716
use crate::r#async::utils;
1817
use tokio::{
1918
self,
20-
io::{split, AsyncWriteExt},
19+
io::split,
2120
sync::mpsc::{channel, Receiver, Sender},
2221
sync::Notify,
2322
};
2423

25-
type RequestSender = Sender<(Vec<u8>, Sender<Result<Vec<u8>>>)>;
26-
type RequestReceiver = Receiver<(Vec<u8>, Sender<Result<Vec<u8>>>)>;
24+
type RequestSender = Sender<(GenMessage, Sender<Result<Vec<u8>>>)>;
25+
type RequestReceiver = Receiver<(GenMessage, Sender<Result<Vec<u8>>>)>;
2726

2827
type ResponseSender = Sender<Result<Vec<u8>>>;
2928
type ResponseReceiver = Receiver<Result<Vec<u8>>>;
@@ -57,17 +56,17 @@ impl Client {
5756
let request_sender = tokio::spawn(async move {
5857
let mut stream_id: u32 = 1;
5958

60-
while let Some((body, resp_tx)) = rx.recv().await {
59+
while let Some((mut msg, resp_tx)) = rx.recv().await {
6160
let current_stream_id = stream_id;
61+
msg.header.set_stream_id(current_stream_id);
6262
stream_id += 2;
6363

6464
{
6565
let mut map = req_map2.lock().unwrap();
6666
map.insert(current_stream_id, resp_tx.clone());
6767
}
6868

69-
let buf = to_req_buf(current_stream_id, body);
70-
if let Err(e) = writer.write_all(&buf).await {
69+
if let Err(e) = msg.write_to(&mut writer).await {
7170
error!("write_message got error: {:?}", e);
7271

7372
{
@@ -97,41 +96,42 @@ impl Client {
9796
_ = notify2.notified() => {
9897
break;
9998
}
100-
res = receive(&mut reader) => {
99+
res = GenMessage::read_from(&mut reader) => {
101100
match res {
102-
Ok((header, body)) => {
101+
Ok(msg) => {
102+
trace!("Got Message body {:?}", msg.payload);
103103
let req_map = req_map.clone();
104104
tokio::spawn(async move {
105105
let resp_tx2;
106106
{
107107
let mut map = req_map.lock().unwrap();
108-
let resp_tx = match map.get(&header.stream_id) {
108+
let resp_tx = match map.get(&msg.header.stream_id) {
109109
Some(tx) => tx,
110110
None => {
111111
debug!(
112-
"Receiver got unknown packet {:?} {:?}",
113-
header, body
112+
"Receiver got unknown packet {:?}",
113+
msg
114114
);
115115
return;
116116
}
117117
};
118118

119119
resp_tx2 = resp_tx.clone();
120-
map.remove(&header.stream_id); // Forget the result, just remove.
120+
map.remove(&msg.header.stream_id); // Forget the result, just remove.
121121
}
122122

123-
if header.type_ != MESSAGE_TYPE_RESPONSE {
123+
if msg.header.type_ != MESSAGE_TYPE_RESPONSE {
124124
resp_tx2
125125
.send(Err(Error::Others(format!(
126-
"Recver got malformed packet {:?} {:?}",
127-
header, body
126+
"Recver got malformed packet {:?}",
127+
msg
128128
))))
129129
.await
130130
.unwrap_or_else(|_e| error!("The request has returned"));
131131
return;
132132
}
133133

134-
resp_tx2.send(Ok(body)).await.unwrap_or_else(|_e| error!("The request has returned"));
134+
resp_tx2.send(Ok(msg.payload)).await.unwrap_or_else(|_e| error!("The request has returned"));
135135
});
136136
}
137137
Err(e) => {
@@ -165,26 +165,24 @@ impl Client {
165165

166166
/// Requsts a unary request and returns with response.
167167
pub async fn request(&self, req: Request) -> Result<Response> {
168-
let mut buf = Vec::with_capacity(req.compute_size() as usize);
169-
{
170-
let mut s = CodedOutputStream::vec(&mut buf);
171-
req.write_to(&mut s).map_err(err_to_others_err!(e, ""))?;
172-
s.flush().map_err(err_to_others_err!(e, ""))?;
173-
}
168+
let timeout_nano = req.timeout_nano;
169+
let msg: GenMessage = Message::new_request(0, req)
170+
.try_into()
171+
.map_err(|e: protobuf::error::ProtobufError| Error::Others(e.to_string()))?;
174172

175173
let (tx, mut rx): (ResponseSender, ResponseReceiver) = channel(100);
176174
self.req_tx
177-
.send((buf, tx))
175+
.send((msg, tx))
178176
.await
179177
.map_err(|e| Error::Others(format!("Send packet to sender error {:?}", e)))?;
180178

181-
let result = if req.timeout_nano == 0 {
179+
let result = if timeout_nano == 0 {
182180
rx.recv()
183181
.await
184182
.ok_or_else(|| Error::Others("Receive packet from receiver error".to_string()))?
185183
} else {
186184
tokio::time::timeout(
187-
std::time::Duration::from_nanos(req.timeout_nano as u64),
185+
std::time::Duration::from_nanos(timeout_nano as u64),
188186
rx.recv(),
189187
)
190188
.await
@@ -193,10 +191,8 @@ impl Client {
193191
};
194192

195193
let buf = result?;
196-
let mut s = CodedInputStream::from_bytes(&buf);
197-
let mut res = Response::new();
198-
res.merge_from(&mut s)
199-
.map_err(err_to_others_err!(e, "Unpack response error "))?;
194+
let res =
195+
Response::decode(&buf).map_err(err_to_others_err!(e, "Unpack response error "))?;
200196

201197
let status = res.get_status();
202198
if status.get_code() != Code::OK {

src/asynchronous/stream.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,6 @@ fn header_to_buf(mh: MessageHeader) -> Vec<u8> {
7676
mh.into()
7777
}
7878

79-
pub(crate) fn to_req_buf(stream_id: u32, mut body: Vec<u8>) -> Vec<u8> {
80-
let header = utils::get_request_header_from_body(stream_id, &body);
81-
let mut buf = header_to_buf(header);
82-
buf.append(&mut body);
83-
84-
buf
85-
}
86-
8779
pub(crate) fn to_res_buf(stream_id: u32, mut body: Vec<u8>) -> Vec<u8> {
8880
let header = utils::get_response_header_from_body(stream_id, &body);
8981
let mut buf = header_to_buf(header);

src/asynchronous/utils.rs

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
use crate::error::{get_status, Result};
77
use crate::proto::{
8-
Code, MessageHeader, Request, Status, MESSAGE_TYPE_REQUEST, MESSAGE_TYPE_RESPONSE,
8+
Code, MessageHeader, Request, Status, MESSAGE_TYPE_RESPONSE,
99
};
1010
use async_trait::async_trait;
1111
use protobuf::{CodedInputStream, Message};
@@ -109,15 +109,6 @@ pub(crate) fn get_response_header_from_body(stream_id: u32, body: &[u8]) -> Mess
109109
}
110110
}
111111

112-
pub(crate) fn get_request_header_from_body(stream_id: u32, body: &[u8]) -> MessageHeader {
113-
MessageHeader {
114-
length: body.len() as u32,
115-
stream_id,
116-
type_: MESSAGE_TYPE_REQUEST,
117-
flags: 0,
118-
}
119-
}
120-
121112
pub(crate) fn new_unix_stream_from_raw_fd(fd: RawFd) -> UnixStream {
122113
let std_stream: std::os::unix::net::UnixStream;
123114
unsafe {

0 commit comments

Comments
 (0)