Skip to content

Commit 9b5ecf9

Browse files
committed
Wip: use 'push' for send_bytes and send_text
1 parent 4fc0027 commit 9b5ecf9

File tree

4 files changed

+71
-36
lines changed

4 files changed

+71
-36
lines changed

examples/send_bytes/src/main.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,18 +55,15 @@ async fn run_receiver(
5555
println!("Waiting for LED control packets…");
5656
while let Some(event) = rx.recv().await {
5757
match event {
58-
RoomEvent::ByteStreamOpened { reader, topic, participant_identity: _ } => {
59-
if topic != LED_CONTROL_TOPIC {
58+
RoomEvent::BytesReceived { bytes, info, participant_identity: _ } => {
59+
if info.topic != LED_CONTROL_TOPIC {
6060
continue;
6161
};
62-
let Some(reader) = reader.take() else { continue };
63-
64-
let Ok(be_bytes) = reader.read_all().await?[..4].try_into() else {
62+
let Ok(be_bytes) = bytes[..4].try_into() else {
6563
log::warn!("Unexpected packet length");
6664
continue;
6765
};
6866
let packet = LedControlPacket::from(u32::from_be_bytes(be_bytes));
69-
7067
println!("[rx] {}", packet);
7168
}
7269
_ => {}

livekit/src/room/data_stream/outgoing.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ impl OutgoingStreamManager {
348348
pub async fn send_text(
349349
&self,
350350
text: &str,
351-
options: StreamTextOptions,
351+
mut options: StreamTextOptions,
352352
) -> StreamResult<TextStreamInfo> {
353353
let text_header = proto::data_stream::TextHeader {
354354
operation_type: options.operation_type.unwrap_or_default() as i32,
@@ -357,6 +357,8 @@ impl OutgoingStreamManager {
357357
attached_stream_ids: options.attached_stream_ids,
358358
generated: options.generated.unwrap_or_default(),
359359
};
360+
361+
options.attributes.insert("__push".to_owned(), "1".into());
360362
let header = proto::data_stream::Header {
361363
stream_id: options.id.unwrap_or_else(|| create_random_uuid()),
362364
timestamp: Utc::now().timestamp_millis(),
@@ -397,13 +399,13 @@ impl OutgoingStreamManager {
397399
pub async fn send_bytes(
398400
&self,
399401
data: impl AsRef<[u8]>,
400-
options: StreamByteOptions,
402+
mut options: StreamByteOptions,
401403
) -> StreamResult<ByteStreamInfo> {
402404
let bytes = data.as_ref();
403405

404-
let byte_header = proto::data_stream::ByteHeader {
405-
name: options.name.unwrap_or_default(),
406-
};
406+
let byte_header = proto::data_stream::ByteHeader { name: options.name.unwrap_or_default() };
407+
408+
options.attributes.insert("__push".to_owned(), "1".into());
407409
let header = proto::data_stream::Header {
408410
stream_id: options.id.unwrap_or_else(|| create_random_uuid()),
409411
timestamp: Utc::now().timestamp_millis(),

livekit/src/room/mod.rs

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use bmrng::unbounded::UnboundedRequestReceiver;
16+
use bytes::Bytes;
1617
use libwebrtc::{
1718
native::frame_cryptor::EncryptionState,
1819
prelude::{
@@ -186,6 +187,20 @@ pub enum RoomEvent {
186187
topic: String,
187188
participant_identity: ParticipantIdentity,
188189
},
190+
/// A byte buffer has been fully received from a remove participant
191+
/// sent using [`LocalParticipant::send_bytes`].
192+
BytesReceived {
193+
bytes: Bytes,
194+
info: ByteStreamInfo,
195+
participant_identity: ParticipantIdentity,
196+
},
197+
/// A string has been fully received from a remove participant
198+
/// sent using [`LocalParticipant::send_text`].
199+
TextReceived {
200+
text: String,
201+
info: TextStreamInfo,
202+
participant_identity: ParticipantIdentity,
203+
},
189204
#[deprecated(note = "Use high-level data streams API instead.")]
190205
StreamHeaderReceived {
191206
header: proto::data_stream::Header,
@@ -1701,17 +1716,8 @@ async fn incoming_data_stream_task(
17011716
loop {
17021717
tokio::select! {
17031718
Some((reader, identity)) = open_rx.recv() => {
1704-
match reader {
1705-
AnyStreamReader::Byte(reader) => dispatcher.dispatch(&RoomEvent::ByteStreamOpened {
1706-
topic: reader.info().topic.clone(),
1707-
reader: TakeCell::new(reader),
1708-
participant_identity: ParticipantIdentity(identity)
1709-
}),
1710-
AnyStreamReader::Text(reader) => dispatcher.dispatch(&RoomEvent::TextStreamOpened {
1711-
topic: reader.info().topic.clone(),
1712-
reader: TakeCell::new(reader),
1713-
participant_identity: ParticipantIdentity(identity)
1714-
}),
1719+
if let Some(event) = event_for_incoming_stream(reader, identity).await {
1720+
dispatcher.dispatch(&event)
17151721
}
17161722
},
17171723
_ = close_rx.recv() => {
@@ -1721,6 +1727,44 @@ async fn incoming_data_stream_task(
17211727
}
17221728
}
17231729

1730+
async fn event_for_incoming_stream(reader: AnyStreamReader, identity: String) -> Option<RoomEvent> {
1731+
let participant_identity = ParticipantIdentity(identity);
1732+
match reader {
1733+
AnyStreamReader::Byte(reader) => {
1734+
if reader.info().attributes.contains_key("__push") {
1735+
// TODO: avoid cloning info
1736+
let info = reader.info().clone();
1737+
let Ok(bytes) = reader.read_all().await else {
1738+
log::error!("Failed to read pushed bytes for stream '{}'", info.id);
1739+
return None;
1740+
};
1741+
return Some(RoomEvent::BytesReceived { bytes, info, participant_identity });
1742+
}
1743+
Some(RoomEvent::ByteStreamOpened {
1744+
topic: reader.info().topic.clone(),
1745+
reader: TakeCell::new(reader),
1746+
participant_identity,
1747+
})
1748+
}
1749+
AnyStreamReader::Text(reader) => {
1750+
if reader.info().attributes.contains_key("__push") {
1751+
// TODO: avoid cloning info
1752+
let info = reader.info().clone();
1753+
let Ok(text) = reader.read_all().await else {
1754+
log::error!("Failed to read pushed text for stream '{}'", info.id);
1755+
return None;
1756+
};
1757+
return Some(RoomEvent::TextReceived { text, info, participant_identity });
1758+
}
1759+
Some(RoomEvent::TextStreamOpened {
1760+
topic: reader.info().topic.clone(),
1761+
reader: TakeCell::new(reader),
1762+
participant_identity,
1763+
})
1764+
}
1765+
}
1766+
}
1767+
17241768
/// Receives packets from the outgoing stream manager and send them.
17251769
async fn outgoing_data_stream_task(
17261770
mut packet_rx: UnboundedRequestReceiver<proto::DataPacket, Result<(), EngineError>>,

livekit/tests/data_stream_test.rs

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,16 +40,12 @@ async fn test_send_bytes() -> Result<()> {
4040
};
4141
let receive_text = async move {
4242
while let Some(event) = receiving_event_rx.recv().await {
43-
let RoomEvent::ByteStreamOpened { reader, topic, participant_identity } = event else {
43+
let RoomEvent::BytesReceived { bytes, info, participant_identity } = event else {
4444
continue;
4545
};
46-
assert_eq!(topic, "some-topic");
46+
assert_eq!(info.topic, "some-topic");
4747
assert_eq!(participant_identity, sender_identity);
48-
49-
let Some(reader) = reader.take() else {
50-
return Err(anyhow!("Failed to take reader"));
51-
};
52-
assert_eq!(reader.read_all().await?, BYTES_TO_SEND);
48+
assert_eq!(bytes, BYTES_TO_SEND);
5349
break;
5450
}
5551
Ok(())
@@ -85,16 +81,12 @@ async fn test_send_text() -> Result<()> {
8581
};
8682
let receive_text = async move {
8783
while let Some(event) = receiving_event_rx.recv().await {
88-
let RoomEvent::TextStreamOpened { reader, topic, participant_identity } = event else {
84+
let RoomEvent::TextReceived { text, info, participant_identity } = event else {
8985
continue;
9086
};
91-
assert_eq!(topic, "some-topic");
87+
assert_eq!(info.topic, "some-topic");
9288
assert_eq!(participant_identity, sender_identity);
93-
94-
let Some(reader) = reader.take() else {
95-
return Err(anyhow!("Failed to take reader"));
96-
};
97-
assert_eq!(reader.read_all().await?, TEXT_TO_SEND);
89+
assert_eq!(text, TEXT_TO_SEND);
9890
break;
9991
}
10092
Ok(())

0 commit comments

Comments
 (0)