Skip to content

Commit a9f0b17

Browse files
committed
feat: instrument receiving loop
1 parent 1ffdcc9 commit a9f0b17

File tree

1 file changed

+39
-2
lines changed

1 file changed

+39
-2
lines changed

src/lib.rs

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ impl Server {
4747
.await
4848
}
4949

50+
#[tracing::instrument(skip(self))]
5051
async fn recv_batch(&self, src: &PeerInfo) -> Vec<Message> {
5152
let (_, discovery) = self
5253
.get(&src.group_id, &src.peer_id, RESERVED_CONN_ID_DISCOVERY)
@@ -60,6 +61,7 @@ impl Server {
6061

6162
loop {
6263
let mut msg: Option<Message> = None;
64+
tracing::trace!("receiving");
6365
tokio::select! {
6466
m = discovery.recv_async() => {
6567
msg = m.ok();
@@ -72,8 +74,9 @@ impl Server {
7274
_ = poll_timeout.tick() => {}
7375
}
7476

75-
if let Some(msgs) = msg {
76-
set.insert(msgs);
77+
if let Some(msg) = msg {
78+
tracing::trace!("received: {:?}", msg);
79+
set.insert(msg);
7780
} else {
7881
break;
7982
}
@@ -221,6 +224,40 @@ mod test {
221224
assert_msgs(&resp.msgs, &msgs);
222225
}
223226

227+
#[tokio::test]
228+
async fn recv_normal_many() {
229+
let (s, peer1, peer2) = setup();
230+
let msgs = vec![
231+
dummy_msg(peer1.clone(), peer2.clone(), 0),
232+
dummy_msg(peer1.clone(), peer2.clone(), 1),
233+
];
234+
let (send1, send2, recv) = tokio::join!(
235+
s.send(
236+
dummy_ctx(),
237+
SendReq {
238+
msg: Some(msgs[0].clone()),
239+
},
240+
),
241+
s.send(
242+
dummy_ctx(),
243+
SendReq {
244+
msg: Some(msgs[1].clone()),
245+
},
246+
),
247+
s.recv(
248+
dummy_ctx(),
249+
RecvReq {
250+
src: Some(peer2.clone()),
251+
},
252+
)
253+
);
254+
255+
send1.unwrap();
256+
send2.unwrap();
257+
let resp = recv.unwrap();
258+
assert_msgs(&resp.msgs, &msgs);
259+
}
260+
224261
#[tokio::test]
225262
async fn recv_first_then_send() {
226263
let (s, peer1, peer2) = setup();

0 commit comments

Comments
 (0)