Skip to content

Commit 66369d9

Browse files
authored
Merge pull request #137 from wllenyj/message_header
Improve MessageHeader parsing.
2 parents 9c40736 + 6fe12be commit 66369d9

File tree

4 files changed

+63
-51
lines changed

4 files changed

+63
-51
lines changed

src/asynchronous/stream.rs

Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@
33
// SPDX-License-Identifier: Apache-2.0
44
//
55

6-
use byteorder::{BigEndian, ByteOrder};
7-
86
use crate::common::{MESSAGE_HEADER_LENGTH, MESSAGE_LENGTH_MAX, MESSAGE_TYPE_RESPONSE};
97
use crate::error::{get_rpc_status, sock_error_msg, Error, Result};
108
use crate::proto::{Code, Response, Status};
@@ -38,15 +36,7 @@ where
3836
));
3937
}
4038

41-
let mut mh = MessageHeader::default();
42-
let mut covbuf: &[u8] = &buf[..4];
43-
mh.length = byteorder::ReadBytesExt::read_u32::<BigEndian>(&mut covbuf)
44-
.map_err(err_to_rpc_err!(Code::INVALID_ARGUMENT, e, ""))?;
45-
let mut covbuf: &[u8] = &buf[4..8];
46-
mh.stream_id = byteorder::ReadBytesExt::read_u32::<BigEndian>(&mut covbuf)
47-
.map_err(err_to_rpc_err!(Code::INVALID_ARGUMENT, e, ""))?;
48-
mh.type_ = buf[8];
49-
mh.flags = buf[9];
39+
let mh = MessageHeader::from(&buf);
5040

5141
Ok(mh)
5242
}
@@ -82,16 +72,7 @@ where
8272
}
8373

8474
fn header_to_buf(mh: MessageHeader) -> Vec<u8> {
85-
let mut buf = vec![0u8; MESSAGE_HEADER_LENGTH];
86-
87-
let covbuf: &mut [u8] = &mut buf[..4];
88-
BigEndian::write_u32(covbuf, mh.length);
89-
let covbuf: &mut [u8] = &mut buf[4..8];
90-
BigEndian::write_u32(covbuf, mh.stream_id);
91-
buf[8] = mh.type_;
92-
buf[9] = mh.flags;
93-
94-
buf
75+
mh.into()
9576
}
9677

9778
pub(crate) fn to_req_buf(stream_id: u32, mut body: Vec<u8>) -> Vec<u8> {

src/common.rs

Lines changed: 59 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
//! Common functions and macros.
77
88
use crate::error::{Error, Result};
9+
use byteorder::{BigEndian, ByteOrder};
910
#[cfg(any(feature = "async", not(target_os = "linux")))]
1011
use nix::fcntl::FdFlag;
1112
use nix::fcntl::{fcntl, FcntlArg, OFlag};
@@ -28,6 +29,44 @@ pub struct MessageHeader {
2829
pub flags: u8,
2930
}
3031

32+
impl<T> From<T> for MessageHeader
33+
where
34+
T: AsRef<[u8]>,
35+
{
36+
fn from(buf: T) -> Self {
37+
let buf = buf.as_ref();
38+
debug_assert!(buf.len() >= MESSAGE_HEADER_LENGTH);
39+
Self {
40+
length: BigEndian::read_u32(&buf[..4]),
41+
stream_id: BigEndian::read_u32(&buf[4..8]),
42+
type_: buf[8],
43+
flags: buf[9],
44+
}
45+
}
46+
}
47+
48+
impl From<MessageHeader> for Vec<u8> {
49+
fn from(mh: MessageHeader) -> Self {
50+
let mut buf = vec![0u8; MESSAGE_HEADER_LENGTH];
51+
mh.into_buf(&mut buf);
52+
buf
53+
}
54+
}
55+
56+
impl MessageHeader {
57+
pub(crate) fn into_buf(self, mut buf: impl AsMut<[u8]>) {
58+
let buf = buf.as_mut();
59+
debug_assert!(buf.len() >= MESSAGE_HEADER_LENGTH);
60+
61+
let covbuf: &mut [u8] = &mut buf[..4];
62+
BigEndian::write_u32(covbuf, self.length);
63+
let covbuf: &mut [u8] = &mut buf[4..8];
64+
BigEndian::write_u32(covbuf, self.stream_id);
65+
buf[8] = self.type_;
66+
buf[9] = self.flags;
67+
}
68+
}
69+
3170
pub(crate) fn do_listen(listener: RawFd) -> Result<()> {
3271
if let Err(e) = fcntl(listener, FcntlArg::F_SETFL(OFlag::O_NONBLOCK)) {
3372
return Err(Error::Others(format!(
@@ -207,8 +246,7 @@ pub const MESSAGE_TYPE_RESPONSE: u8 = 0x2;
207246

208247
#[cfg(test)]
209248
mod tests {
210-
use super::parse_sockaddr;
211-
use super::Domain;
249+
use super::*;
212250

213251
#[cfg(target_os = "linux")]
214252
#[test]
@@ -268,4 +306,23 @@ mod tests {
268306
}
269307
}
270308
}
309+
310+
#[test]
311+
fn message_header() {
312+
let buf = vec![
313+
0x10, 0x0, 0x0, 0x0, // length
314+
0x0, 0x0, 0x0, 0x03, // stream_id
315+
0x2, // type_
316+
0xef, // flags
317+
];
318+
let mh = MessageHeader::from(&buf);
319+
assert_eq!(mh.length, 0x1000_0000);
320+
assert_eq!(mh.stream_id, 0x3);
321+
assert_eq!(mh.type_, MESSAGE_TYPE_RESPONSE);
322+
assert_eq!(mh.flags, 0xef);
323+
324+
let mut buf2 = vec![0; MESSAGE_HEADER_LENGTH];
325+
mh.into_buf(&mut buf2);
326+
assert_eq!(&buf, &buf2);
327+
}
271328
}

src/error.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,6 @@ pub fn sock_error_msg(size: usize, msg: String) -> Error {
5959
get_rpc_status(Code::INVALID_ARGUMENT, msg)
6060
}
6161

62-
macro_rules! err_to_rpc_err {
63-
($c: expr, $e: ident, $s: expr) => {
64-
|$e| get_rpc_status($c, $s.to_string() + &$e.to_string())
65-
};
66-
}
67-
6862
macro_rules! err_to_others_err {
6963
($e: ident, $s: expr) => {
7064
|$e| Error::Others($s.to_string() + &$e.to_string())

src/sync/channel.rs

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use byteorder::{BigEndian, ByteOrder, ReadBytesExt};
1615
use nix::sys::socket::*;
1716
use std::os::unix::io::RawFd;
1817

@@ -96,19 +95,7 @@ fn read_message_header(fd: RawFd) -> Result<MessageHeader> {
9695
));
9796
}
9897

99-
let mut mh = MessageHeader::default();
100-
let mut covbuf: &[u8] = &buf[..4];
101-
mh.length =
102-
covbuf
103-
.read_u32::<BigEndian>()
104-
.map_err(err_to_rpc_err!(Code::INVALID_ARGUMENT, e, ""))?;
105-
let mut covbuf: &[u8] = &buf[4..8];
106-
mh.stream_id =
107-
covbuf
108-
.read_u32::<BigEndian>()
109-
.map_err(err_to_rpc_err!(Code::INVALID_ARGUMENT, e, ""))?;
110-
mh.type_ = buf[8];
111-
mh.flags = buf[9];
98+
let mh = MessageHeader::from(&buf);
11299

113100
Ok(mh)
114101
}
@@ -141,14 +128,7 @@ pub fn read_message(fd: RawFd) -> Result<(MessageHeader, Vec<u8>)> {
141128
}
142129

143130
fn write_message_header(fd: RawFd, mh: MessageHeader) -> Result<()> {
144-
let mut buf = [0u8; MESSAGE_HEADER_LENGTH];
145-
146-
let covbuf: &mut [u8] = &mut buf[..4];
147-
BigEndian::write_u32(covbuf, mh.length);
148-
let covbuf: &mut [u8] = &mut buf[4..8];
149-
BigEndian::write_u32(covbuf, mh.stream_id);
150-
buf[8] = mh.type_;
151-
buf[9] = mh.flags;
131+
let buf: Vec<u8> = mh.into();
152132

153133
let size = write_count(fd, &buf, MESSAGE_HEADER_LENGTH)?;
154134
if size != MESSAGE_HEADER_LENGTH {

0 commit comments

Comments
 (0)