Skip to content

Commit 842864d

Browse files
authored
Merge pull request #208 from Tim-Zhang/fix-stream-recv-default-data
stream: Fix default data won't be received issue
2 parents 555c412 + e16aa7b commit 842864d

File tree

5 files changed

+53
-11
lines changed

5 files changed

+53
-11
lines changed

example/async-stream-client.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,12 @@ async fn main() {
4141
let sc1 = sc.clone();
4242
let t5 = tokio::spawn(echo_null(sc1));
4343

44-
let t6 = tokio::spawn(echo_null_stream(sc));
44+
let sc1 = sc.clone();
45+
let t6 = tokio::spawn(echo_null_stream(sc1));
46+
47+
let t7 = tokio::spawn(echo_default_value(sc));
4548

46-
let _ = tokio::join!(t1, t2, t3, t4, t5, t6);
49+
let _ = tokio::join!(t1, t2, t3, t4, t5, t6, t7);
4750
}
4851

4952
fn default_ctx() -> Context {
@@ -185,3 +188,16 @@ async fn echo_null_stream(cli: streaming_ttrpc::StreamingClient) {
185188
.unwrap()
186189
.unwrap();
187190
}
191+
192+
#[cfg(unix)]
193+
async fn echo_default_value(cli: streaming_ttrpc::StreamingClient) {
194+
let mut stream = cli
195+
.echo_default_value(default_ctx(), &Default::default()) // send default value to verify #208
196+
.await
197+
.unwrap();
198+
199+
let received = stream.recv().await.unwrap().unwrap();
200+
201+
assert_eq!(received.seq, 0);
202+
assert_eq!(received.msg, "");
203+
}

example/async-stream-server.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use log::{info, LevelFilter};
1313
#[cfg(unix)]
1414
use protocols::r#async::{empty, streaming, streaming_ttrpc};
1515
#[cfg(unix)]
16-
use ttrpc::asynchronous::Server;
16+
use ttrpc::{asynchronous::Server, Error};
1717

1818
#[cfg(unix)]
1919
use async_trait::async_trait;
@@ -134,6 +134,24 @@ impl streaming_ttrpc::Streaming for StreamingService {
134134
}
135135
Ok(())
136136
}
137+
138+
// It verifies PR #208
139+
async fn echo_default_value(
140+
&self,
141+
_ctx: &::ttrpc::r#async::TtrpcContext,
142+
e: streaming::EchoPayload,
143+
s: ::ttrpc::r#async::ServerStreamSender<streaming::EchoPayload>,
144+
) -> ::ttrpc::Result<()> {
145+
if e.seq != 0 || !e.msg.is_empty() {
146+
return Err(Error::Others(
147+
"Expect a request with empty payload to verify #208".to_string(),
148+
));
149+
}
150+
151+
s.send(&e).await.unwrap();
152+
153+
Ok(())
154+
}
137155
}
138156

139157
#[cfg(windows)]

example/protocols/protos/streaming.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ service Streaming {
3232
rpc DivideStream(Sum) returns (stream Part);
3333
rpc EchoNull(stream EchoPayload) returns (google.protobuf.Empty);
3434
rpc EchoNullStream(stream EchoPayload) returns (stream google.protobuf.Empty);
35+
rpc EchoDefaultValue(EchoPayload) returns (stream EchoPayload);
3536
}
3637

3738
message EchoPayload {

src/asynchronous/client.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ use nix::unistd::close;
1515
use tokio::{self, sync::mpsc, task};
1616

1717
use crate::common::client_connect;
18-
use crate::error::{Error, Result};
18+
use crate::error::{get_rpc_status, Error, Result};
1919
use crate::proto::{
20-
Code, Codec, GenMessage, Message, MessageHeader, Request, Response, FLAG_REMOTE_CLOSED,
21-
FLAG_REMOTE_OPEN, MESSAGE_TYPE_DATA, MESSAGE_TYPE_RESPONSE,
20+
Code, Codec, GenMessage, Message, MessageHeader, Request, Response, FLAG_NO_DATA,
21+
FLAG_REMOTE_CLOSED, FLAG_REMOTE_OPEN, MESSAGE_TYPE_DATA, MESSAGE_TYPE_RESPONSE,
2222
};
2323
use crate::r#async::connection::*;
2424
use crate::r#async::shutdown;
@@ -117,13 +117,20 @@ impl Client {
117117
streaming_server: bool,
118118
) -> Result<StreamInner> {
119119
let stream_id = self.next_stream_id.fetch_add(2, Ordering::Relaxed);
120+
let is_req_payload_empty = req.payload.is_empty();
120121

121122
let mut msg: GenMessage = Message::new_request(stream_id, req)?
122123
.try_into()
123124
.map_err(|e: protobuf::Error| Error::Others(e.to_string()))?;
124125

125126
if streaming_client {
126-
msg.header.add_flags(FLAG_REMOTE_OPEN);
127+
if !is_req_payload_empty {
128+
return Err(get_rpc_status(
129+
Code::INVALID_ARGUMENT,
130+
"Creating a ClientStream and sending payload at the same time is not allowed",
131+
));
132+
}
133+
msg.header.add_flags(FLAG_REMOTE_OPEN | FLAG_NO_DATA);
127134
} else {
128135
msg.header.add_flags(FLAG_REMOTE_CLOSED);
129136
}

src/asynchronous/server.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use crate::context;
3737
use crate::error::{get_status, Error, Result};
3838
use crate::proto::{
3939
check_oversize, Code, Codec, GenMessage, Message, MessageHeader, Request, Response, Status,
40-
FLAG_NO_DATA, FLAG_REMOTE_CLOSED, FLAG_REMOTE_OPEN, MESSAGE_TYPE_DATA, MESSAGE_TYPE_REQUEST,
40+
FLAG_NO_DATA, FLAG_REMOTE_CLOSED, MESSAGE_TYPE_DATA, MESSAGE_TYPE_REQUEST,
4141
};
4242
use crate::r#async::connection::*;
4343
use crate::r#async::shutdown;
@@ -606,8 +606,8 @@ impl HandlerContext {
606606
let stream_tx = tx.clone();
607607
self.streams.lock().unwrap().insert(stream_id, tx);
608608

609-
let _remote_close = (req_msg.header.flags & FLAG_REMOTE_CLOSED) == FLAG_REMOTE_CLOSED;
610-
let _remote_open = (req_msg.header.flags & FLAG_REMOTE_OPEN) == FLAG_REMOTE_OPEN;
609+
let no_data = (req_msg.header.flags & FLAG_NO_DATA) == FLAG_NO_DATA;
610+
611611
let si = StreamInner::new(
612612
stream_id,
613613
self.tx.clone(),
@@ -627,7 +627,7 @@ impl HandlerContext {
627627

628628
let task = spawn(async move { stream.handler(ctx, si).await });
629629

630-
if !req.payload.is_empty() {
630+
if !no_data {
631631
// Fake the first data message.
632632
let msg = GenMessage {
633633
header: MessageHeader::new_data(stream_id, req.payload.len() as u32),

0 commit comments

Comments
 (0)