Skip to content

Commit 3ef0e4e

Browse files
committed
async ttrpc: Add size checks where packets are created
server: check before sending to client client: check before sending to server Signed-off-by: Tim Zhang <[email protected]>
1 parent 430466d commit 3ef0e4e

File tree

4 files changed

+27
-11
lines changed

4 files changed

+27
-11
lines changed

src/asynchronous/client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ impl Client {
6868
let timeout_nano = req.timeout_nano;
6969
let stream_id = self.next_stream_id.fetch_add(2, Ordering::Relaxed);
7070

71-
let msg: GenMessage = Message::new_request(stream_id, req)
71+
let msg: GenMessage = Message::new_request(stream_id, req)?
7272
.try_into()
7373
.map_err(|e: protobuf::Error| Error::Others(e.to_string()))?;
7474

@@ -118,7 +118,7 @@ impl Client {
118118
) -> Result<StreamInner> {
119119
let stream_id = self.next_stream_id.fetch_add(2, Ordering::Relaxed);
120120

121-
let mut msg: GenMessage = Message::new_request(stream_id, req)
121+
let mut msg: GenMessage = Message::new_request(stream_id, req)?
122122
.try_into()
123123
.map_err(|e: protobuf::Error| Error::Others(e.to_string()))?;
124124

src/asynchronous/server.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use async_trait::async_trait;
1818
use futures::stream::Stream;
1919
use futures::StreamExt as _;
2020
use nix::unistd;
21+
use protobuf::Message as _;
2122
use tokio::{
2223
self,
2324
io::{AsyncRead, AsyncWrite},
@@ -35,8 +36,8 @@ use crate::common::{self, Domain};
3536
use crate::context;
3637
use crate::error::{get_status, Error, Result};
3738
use crate::proto::{
38-
Code, Codec, GenMessage, Message, MessageHeader, Request, Response, Status, FLAG_NO_DATA,
39-
FLAG_REMOTE_CLOSED, FLAG_REMOTE_OPEN, MESSAGE_TYPE_DATA, MESSAGE_TYPE_REQUEST,
39+
check_oversize, Code, Codec, GenMessage, Message, MessageHeader, Request, Response, Status,
40+
FLAG_NO_DATA, FLAG_REMOTE_CLOSED, FLAG_REMOTE_OPEN, MESSAGE_TYPE_DATA, MESSAGE_TYPE_REQUEST,
4041
};
4142
use crate::r#async::connection::*;
4243
use crate::r#async::shutdown;
@@ -438,8 +439,13 @@ impl HandlerContext {
438439
match msg.header.type_ {
439440
MESSAGE_TYPE_REQUEST => match self.handle_request(msg).await {
440441
Ok(opt_msg) => match opt_msg {
441-
Some(msg) => {
442-
Self::respond(self.tx.clone(), stream_id, msg)
442+
Some(mut resp) => {
443+
// Server: check size before sending to client
444+
if let Err(e) = check_oversize(resp.compute_size() as usize, true) {
445+
resp = e.into();
446+
}
447+
448+
Self::respond(self.tx.clone(), stream_id, resp)
443449
.await
444450
.map_err(|e| {
445451
error!("respond got error {:?}", e);

src/asynchronous/stream.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,9 @@ impl StreamSender {
418418
header,
419419
payload: buf,
420420
};
421+
422+
msg.check()?;
423+
421424
_send(&self.tx, msg).await?;
422425

423426
Ok(())
@@ -447,6 +450,7 @@ impl StreamReceiver {
447450
return Err(Error::RemoteClosed);
448451
}
449452
let msg = _recv(&mut self.rx).await?;
453+
450454
let payload = match msg.header.type_ {
451455
MESSAGE_TYPE_RESPONSE => {
452456
debug_assert_eq!(self.kind, Kind::Client);

src/proto.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,10 @@ impl GenMessage {
249249
payload: content,
250250
})
251251
}
252+
253+
pub fn check(&self) -> TtResult<()> {
254+
check_oversize(self.header.length as usize, true)
255+
}
252256
}
253257

254258
/// TTRPC codec, only protobuf is supported.
@@ -318,11 +322,13 @@ where
318322
}
319323

320324
impl<C: Codec> Message<C> {
321-
pub fn new_request(stream_id: u32, message: C) -> Self {
322-
Self {
325+
pub fn new_request(stream_id: u32, message: C) -> TtResult<Self> {
326+
check_oversize(message.size() as usize, false)?;
327+
328+
Ok(Self {
323329
header: MessageHeader::new_request(stream_id, message.size()),
324330
payload: message,
325-
}
331+
})
326332
}
327333
}
328334

@@ -451,7 +457,7 @@ mod tests {
451457
#[test]
452458
fn gen_message_to_message() {
453459
let req = new_protobuf_request();
454-
let msg = Message::new_request(3, req);
460+
let msg = Message::new_request(3, req).unwrap();
455461
let msg_clone = msg.clone();
456462
let gen: GenMessage = msg.try_into().unwrap();
457463
let dmsg = Message::<Request>::try_from(gen).unwrap();
@@ -548,7 +554,7 @@ mod tests {
548554
assert_eq!(&msg.payload.metadata[0].value, "test_value1");
549555

550556
let req = new_protobuf_request();
551-
let mut dmsg = Message::new_request(u32::MAX, req);
557+
let mut dmsg = Message::new_request(u32::MAX, req).unwrap();
552558
dmsg.header.set_stream_id(0x123456);
553559
dmsg.header.set_flags(0xe0);
554560
dmsg.header.add_flags(0x0f);

0 commit comments

Comments
 (0)