Skip to content

Commit 5da5061

Browse files
committed
Add proto.rs to include protocol related.
Move the MessageHeader from `common.rs` to the `proto.rs` file. Move the `compiled::ttrpc` from `common.rs` to `proto.rs` file and re-export. Signed-off-by: wllenyj <[email protected]>
1 parent 5def440 commit 5da5061

File tree

11 files changed

+144
-101
lines changed

11 files changed

+144
-101
lines changed

src/asynchronous/client.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,9 @@ use std::collections::HashMap;
99
use std::os::unix::io::RawFd;
1010
use std::sync::{Arc, Mutex};
1111

12-
use crate::common::{client_connect, MESSAGE_TYPE_RESPONSE};
12+
use crate::common::client_connect;
1313
use crate::error::{Error, Result};
14-
use crate::proto::{Code, Request, Response};
14+
use crate::proto::{Code, Request, Response, MESSAGE_TYPE_RESPONSE};
1515

1616
use crate::asynchronous::stream::{receive, to_req_buf};
1717
use crate::r#async::utils;
@@ -163,6 +163,7 @@ impl Client {
163163
Client { req_tx }
164164
}
165165

166+
/// Requsts a unary request and returns with response.
166167
pub async fn request(&self, req: Request) -> Result<Response> {
167168
let mut buf = Vec::with_capacity(req.compute_size() as usize);
168169
{

src/asynchronous/server.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,11 @@ use std::time::Duration;
1313

1414
use crate::asynchronous::stream::{receive, respond, respond_with_status};
1515
use crate::asynchronous::unix_incoming::UnixIncoming;
16-
use crate::common::{self, Domain, MESSAGE_TYPE_REQUEST};
16+
use crate::common::{self, Domain};
1717
use crate::context;
1818
use crate::error::{get_status, Error, Result};
19-
use crate::proto::{Code, Status};
19+
use crate::proto::{Code, MessageHeader, Status, MESSAGE_TYPE_REQUEST};
2020
use crate::r#async::{MethodHandler, TtrpcContext};
21-
use crate::MessageHeader;
2221
use futures::stream::Stream;
2322
use futures::StreamExt as _;
2423
use std::marker::Unpin;

src/asynchronous/stream.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
// SPDX-License-Identifier: Apache-2.0
44
//
55

6-
use crate::common::{MESSAGE_HEADER_LENGTH, MESSAGE_LENGTH_MAX, MESSAGE_TYPE_RESPONSE};
76
use crate::error::{get_rpc_status, sock_error_msg, Error, Result};
8-
use crate::proto::{Code, Response, Status};
7+
use crate::proto::{
8+
Code, Response, Status, MESSAGE_HEADER_LENGTH, MESSAGE_LENGTH_MAX, MESSAGE_TYPE_RESPONSE,
9+
};
910
use crate::r#async::utils;
1011
use crate::MessageHeader;
1112
use protobuf::Message;

src/asynchronous/utils.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
// SPDX-License-Identifier: Apache-2.0
44
//
55

6-
use crate::common::{MessageHeader, MESSAGE_TYPE_REQUEST, MESSAGE_TYPE_RESPONSE};
76
use crate::error::{get_status, Result};
8-
use crate::proto::{Code, Request, Status};
7+
use crate::proto::{
8+
Code, MessageHeader, Request, Status, MESSAGE_TYPE_REQUEST, MESSAGE_TYPE_RESPONSE,
9+
};
910
use async_trait::async_trait;
1011
use protobuf::{CodedInputStream, Message};
1112
use std::collections::HashMap;

src/common.rs

Lines changed: 0 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
//! Common functions and macros.
77
88
use crate::error::{Error, Result};
9-
use byteorder::{BigEndian, ByteOrder};
109
#[cfg(any(feature = "async", not(target_os = "linux")))]
1110
use nix::fcntl::FdFlag;
1211
use nix::fcntl::{fcntl, FcntlArg, OFlag};
@@ -20,53 +19,6 @@ pub(crate) enum Domain {
2019
Vsock,
2120
}
2221

23-
/// Message header of ttrpc.
24-
#[derive(Default, Debug)]
25-
pub struct MessageHeader {
26-
pub length: u32,
27-
pub stream_id: u32,
28-
pub type_: u8,
29-
pub flags: u8,
30-
}
31-
32-
impl<T> From<T> for MessageHeader
33-
where
34-
T: AsRef<[u8]>,
35-
{
36-
fn from(buf: T) -> Self {
37-
let buf = buf.as_ref();
38-
debug_assert!(buf.len() >= MESSAGE_HEADER_LENGTH);
39-
Self {
40-
length: BigEndian::read_u32(&buf[..4]),
41-
stream_id: BigEndian::read_u32(&buf[4..8]),
42-
type_: buf[8],
43-
flags: buf[9],
44-
}
45-
}
46-
}
47-
48-
impl From<MessageHeader> for Vec<u8> {
49-
fn from(mh: MessageHeader) -> Self {
50-
let mut buf = vec![0u8; MESSAGE_HEADER_LENGTH];
51-
mh.into_buf(&mut buf);
52-
buf
53-
}
54-
}
55-
56-
impl MessageHeader {
57-
pub(crate) fn into_buf(self, mut buf: impl AsMut<[u8]>) {
58-
let buf = buf.as_mut();
59-
debug_assert!(buf.len() >= MESSAGE_HEADER_LENGTH);
60-
61-
let covbuf: &mut [u8] = &mut buf[..4];
62-
BigEndian::write_u32(covbuf, self.length);
63-
let covbuf: &mut [u8] = &mut buf[4..8];
64-
BigEndian::write_u32(covbuf, self.stream_id);
65-
buf[8] = self.type_;
66-
buf[9] = self.flags;
67-
}
68-
}
69-
7022
pub(crate) fn do_listen(listener: RawFd) -> Result<()> {
7123
if let Err(e) = fcntl(listener, FcntlArg::F_SETFL(OFlag::O_NONBLOCK)) {
7224
return Err(Error::Others(format!(
@@ -238,12 +190,6 @@ macro_rules! cfg_async {
238190
}
239191
}
240192

241-
pub const MESSAGE_HEADER_LENGTH: usize = 10;
242-
pub const MESSAGE_LENGTH_MAX: usize = 4 << 20;
243-
244-
pub const MESSAGE_TYPE_REQUEST: u8 = 0x1;
245-
pub const MESSAGE_TYPE_RESPONSE: u8 = 0x2;
246-
247193
#[cfg(test)]
248194
mod tests {
249195
use super::*;
@@ -306,23 +252,4 @@ mod tests {
306252
}
307253
}
308254
}
309-
310-
#[test]
311-
fn message_header() {
312-
let buf = vec![
313-
0x10, 0x0, 0x0, 0x0, // length
314-
0x0, 0x0, 0x0, 0x03, // stream_id
315-
0x2, // type_
316-
0xef, // flags
317-
];
318-
let mh = MessageHeader::from(&buf);
319-
assert_eq!(mh.length, 0x1000_0000);
320-
assert_eq!(mh.stream_id, 0x3);
321-
assert_eq!(mh.type_, MESSAGE_TYPE_RESPONSE);
322-
assert_eq!(mh.flags, 0xef);
323-
324-
let mut buf2 = vec![0; MESSAGE_HEADER_LENGTH];
325-
mh.into_buf(&mut buf2);
326-
assert_eq!(&buf, &buf2);
327-
}
328255
}

src/lib.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,20 +48,15 @@ extern crate log;
4848
pub mod error;
4949
#[macro_use]
5050
mod common;
51-
#[allow(soft_unstable, clippy::type_complexity, clippy::too_many_arguments)]
52-
mod compiled {
53-
include!(concat!(env!("OUT_DIR"), "/mod.rs"));
54-
}
55-
pub use compiled::ttrpc as proto;
5651

5752
pub mod context;
5853

54+
pub mod proto;
5955
#[doc(inline)]
60-
pub use crate::common::MessageHeader;
56+
pub use self::proto::{Code, MessageHeader, Request, Response, Status};
57+
6158
#[doc(inline)]
6259
pub use crate::error::{get_status, Error, Result};
63-
#[doc(inline)]
64-
pub use proto::{Code, Request, Response, Status};
6560

6661
cfg_sync! {
6762
pub mod sync;

src/proto.rs

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
// Copyright 2022 Alibaba Cloud. All rights reserved.
2+
// Copyright (c) 2020 Ant Financial
3+
//
4+
// SPDX-License-Identifier: Apache-2.0
5+
//
6+
7+
#[allow(soft_unstable, clippy::type_complexity, clippy::too_many_arguments)]
8+
mod compiled {
9+
include!(concat!(env!("OUT_DIR"), "/mod.rs"));
10+
}
11+
pub use compiled::ttrpc::*;
12+
13+
use byteorder::{BigEndian, ByteOrder};
14+
15+
pub const MESSAGE_HEADER_LENGTH: usize = 10;
16+
pub const MESSAGE_LENGTH_MAX: usize = 4 << 20;
17+
18+
pub const MESSAGE_TYPE_REQUEST: u8 = 0x1;
19+
pub const MESSAGE_TYPE_RESPONSE: u8 = 0x2;
20+
21+
/// Message header of ttrpc.
22+
#[derive(Default, Debug)]
23+
pub struct MessageHeader {
24+
pub length: u32,
25+
pub stream_id: u32,
26+
pub type_: u8,
27+
pub flags: u8,
28+
}
29+
30+
impl<T> From<T> for MessageHeader
31+
where
32+
T: AsRef<[u8]>,
33+
{
34+
fn from(buf: T) -> Self {
35+
let buf = buf.as_ref();
36+
debug_assert!(buf.len() >= MESSAGE_HEADER_LENGTH);
37+
Self {
38+
length: BigEndian::read_u32(&buf[..4]),
39+
stream_id: BigEndian::read_u32(&buf[4..8]),
40+
type_: buf[8],
41+
flags: buf[9],
42+
}
43+
}
44+
}
45+
46+
impl From<MessageHeader> for Vec<u8> {
47+
fn from(mh: MessageHeader) -> Self {
48+
let mut buf = vec![0u8; MESSAGE_HEADER_LENGTH];
49+
mh.into_buf(&mut buf);
50+
buf
51+
}
52+
}
53+
54+
impl MessageHeader {
55+
/// Creates a request MessageHeader from stream_id and len.
56+
/// Use the default message type MESSAGE_TYPE_REQUEST, and default flags 0.
57+
pub fn new_request(stream_id: u32, len: u32) -> Self {
58+
Self {
59+
length: len,
60+
stream_id,
61+
type_: MESSAGE_TYPE_REQUEST,
62+
flags: 0,
63+
}
64+
}
65+
66+
/// Creates a response MessageHeader from stream_id and len.
67+
/// Use the default message type MESSAGE_TYPE_REQUEST, and default flags 0.
68+
pub fn new_response(stream_id: u32, len: u32) -> Self {
69+
Self {
70+
length: len,
71+
stream_id,
72+
type_: MESSAGE_TYPE_RESPONSE,
73+
flags: 0,
74+
}
75+
}
76+
77+
/// Set the flags of message using the given flags.
78+
pub fn set_flags(&mut self, flags: u8) {
79+
self.flags = flags;
80+
}
81+
82+
/// Add a new flags to the message.
83+
pub fn add_flags(&mut self, flags: u8) {
84+
self.flags |= flags;
85+
}
86+
87+
pub(crate) fn into_buf(self, mut buf: impl AsMut<[u8]>) {
88+
let buf = buf.as_mut();
89+
debug_assert!(buf.len() >= MESSAGE_HEADER_LENGTH);
90+
91+
let covbuf: &mut [u8] = &mut buf[..4];
92+
BigEndian::write_u32(covbuf, self.length);
93+
let covbuf: &mut [u8] = &mut buf[4..8];
94+
BigEndian::write_u32(covbuf, self.stream_id);
95+
buf[8] = self.type_;
96+
buf[9] = self.flags;
97+
}
98+
}
99+
100+
#[cfg(test)]
101+
mod tests {
102+
use super::*;
103+
104+
#[test]
105+
fn message_header() {
106+
let buf = vec![
107+
0x10, 0x0, 0x0, 0x0, // length
108+
0x0, 0x0, 0x0, 0x03, // stream_id
109+
0x2, // type_
110+
0xef, // flags
111+
];
112+
let mh = MessageHeader::from(&buf);
113+
assert_eq!(mh.length, 0x1000_0000);
114+
assert_eq!(mh.stream_id, 0x3);
115+
assert_eq!(mh.type_, MESSAGE_TYPE_RESPONSE);
116+
assert_eq!(mh.flags, 0xef);
117+
118+
let mut buf2 = vec![0; MESSAGE_HEADER_LENGTH];
119+
mh.into_buf(&mut buf2);
120+
assert_eq!(&buf, &buf2);
121+
}
122+
}

src/sync/channel.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@
1515
use nix::sys::socket::*;
1616
use std::os::unix::io::RawFd;
1717

18-
use crate::common::{MESSAGE_HEADER_LENGTH, MESSAGE_LENGTH_MAX};
1918
use crate::error::{get_rpc_status, sock_error_msg, Error, Result};
20-
use crate::proto::Code;
21-
use crate::MessageHeader;
19+
use crate::proto::{Code, MessageHeader, MESSAGE_HEADER_LENGTH, MESSAGE_LENGTH_MAX};
2220

2321
fn retryable(e: nix::Error) -> bool {
2422
use ::nix::Error;

src/sync/client.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@ use std::{io, thread};
2525

2626
#[cfg(target_os = "macos")]
2727
use crate::common::set_fd_close_exec;
28-
use crate::common::{client_connect, MESSAGE_TYPE_REQUEST, MESSAGE_TYPE_RESPONSE, SOCK_CLOEXEC};
28+
use crate::common::{client_connect, SOCK_CLOEXEC};
2929
use crate::error::{Error, Result};
30-
use crate::proto::{Code, Request, Response};
30+
use crate::proto::{
31+
Code, MessageHeader, Request, Response, MESSAGE_TYPE_REQUEST, MESSAGE_TYPE_RESPONSE,
32+
};
3133
use crate::sync::channel::{read_message, write_message};
32-
use crate::MessageHeader;
3334
use std::time::Duration;
3435

3536
type Sender = mpsc::Sender<(Vec<u8>, mpsc::SyncSender<Result<Vec<u8>>>)>;

src/sync/server.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,13 @@ use std::thread::JoinHandle;
2626
use std::{io, thread};
2727

2828
use super::utils::response_to_channel;
29+
use crate::common;
2930
#[cfg(not(target_os = "linux"))]
3031
use crate::common::set_fd_close_exec;
31-
use crate::common::{self, MESSAGE_TYPE_REQUEST};
3232
use crate::context;
3333
use crate::error::{get_status, Error, Result};
34-
use crate::proto::{Code, Request, Response};
34+
use crate::proto::{Code, MessageHeader, Request, Response, MESSAGE_TYPE_REQUEST};
3535
use crate::sync::channel::{read_message, write_message};
36-
use crate::MessageHeader;
3736
use crate::{MethodHandler, TtrpcContext};
3837

3938
// poll_queue will create WAIT_THREAD_COUNT_DEFAULT threads in begin.

0 commit comments

Comments
 (0)