Skip to content

Commit c77ba34

Browse files
committed
modify message stacked bug
1 parent c2cf1a2 commit c77ba34

File tree

2 files changed

+43
-33
lines changed

2 files changed

+43
-33
lines changed

moqt-server/src/modules/message_handlers/control_message.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,12 @@ pub async fn control_message_handler(
9797
return MessageProcessResult::Fragment;
9898
}
9999

100+
if read_cur.get_ref().len() < (read_cur.position() as usize + payload_length as usize) {
101+
// The length is insufficient, so do nothing. Do not synchronize with the cursor.
102+
tracing::error!("fragmented {}", read_buf.len());
103+
return MessageProcessResult::Fragment;
104+
}
105+
100106
read_buf.advance(read_cur.position() as usize);
101107
let mut payload_buf = read_buf.split_to(payload_length as usize);
102108
let mut write_buf = BytesMut::new();

moqt-server/src/modules/server_processes/control_stream/handler.rs

Lines changed: 37 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -48,42 +48,46 @@ pub(crate) async fn handle_control_stream(
4848
let mut buf = buf.lock().await;
4949
buf.extend_from_slice(&read_buf);
5050

51-
let message_result: MessageProcessResult;
52-
{
53-
let mut client = client.lock().await;
54-
// TODO: Move the implementation of control_message_handler to the server side since it is only used by the server
55-
message_result = control_message_handler(
56-
&mut buf,
57-
UnderlayType::WebTransport,
58-
&mut client,
59-
senders.start_forwarder_txes().clone(),
60-
&mut pubsub_relation_manager,
61-
&mut control_message_dispatcher,
62-
&mut object_cache_storage,
63-
)
64-
.await;
65-
}
51+
while !buf.is_empty() {
52+
let message_result: MessageProcessResult;
53+
{
54+
let mut client = client.lock().await;
55+
// TODO: Move the implementation of control_message_handler to the server side since it is only used by the server
56+
message_result = control_message_handler(
57+
&mut buf,
58+
UnderlayType::WebTransport,
59+
&mut client,
60+
senders.start_forwarder_txes().clone(),
61+
&mut pubsub_relation_manager,
62+
&mut control_message_dispatcher,
63+
&mut object_cache_storage,
64+
)
65+
.await;
66+
}
6667

67-
tracing::debug!("message_result: {:?}", message_result);
68+
tracing::debug!("message_result: {:?}", message_result);
6869

69-
match message_result {
70-
MessageProcessResult::Success(buf) => {
71-
let mut shared_send_stream = shared_send_stream.lock().await;
72-
shared_send_stream.write_all(&buf).await?;
70+
match message_result {
71+
MessageProcessResult::Success(buf) => {
72+
let mut shared_send_stream = shared_send_stream.lock().await;
73+
shared_send_stream.write_all(&buf).await?;
7374

74-
tracing::info!("Message is sent.");
75-
tracing::debug!("sent message: {:x?}", buf.to_vec());
76-
}
77-
MessageProcessResult::SuccessWithoutResponse => {}
78-
MessageProcessResult::Failure(code, message) => {
79-
senders
80-
.close_session_tx()
81-
.send((u8::from(code) as u64, message))
82-
.await?;
83-
break;
84-
}
85-
MessageProcessResult::Fragment => (),
86-
};
75+
tracing::info!("Message is sent.");
76+
tracing::debug!("sent message: {:x?}", buf.to_vec());
77+
}
78+
MessageProcessResult::SuccessWithoutResponse => {}
79+
MessageProcessResult::Failure(code, message) => {
80+
senders
81+
.close_session_tx()
82+
.send((u8::from(code) as u64, message))
83+
.await?;
84+
break;
85+
}
86+
MessageProcessResult::Fragment => {
87+
break;
88+
}
89+
};
90+
}
8791
}
8892

8993
senders

0 commit comments

Comments
 (0)