Skip to content

Commit 56178e3

Browse files
bcherryladvoc
andauthored
Add send_bytes method (#691)
* Add send_bytes * Document integration testing * Add data stream tests * Reorganize E2E tests * Define FFI messages for send bytes * Implement FFI bindings * Create example for send bytes * Warn if total_length option specified --------- Co-authored-by: Jacob Gelman <3182119+ladvoc@users.noreply.github.com>
1 parent 8bcbf55 commit 56178e3

File tree

19 files changed

+3641
-74
lines changed

19 files changed

+3641
-74
lines changed

examples/send_bytes/Cargo.lock

Lines changed: 3082 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/send_bytes/Cargo.toml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
[package]
2+
name = "send_bytes"
3+
version = "0.1.0"
4+
edition = "2024"
5+
6+
[dependencies]
7+
tokio = { version = "1.47.1", features = ["full"] }
8+
livekit = { path = "../../livekit", features = ["native-tls"] }
9+
log = "0.4.28"
10+
env_logger = "0.11.8"
11+
bitfield-struct = "0.11.0"
12+
rand = "0.9.2"
13+
colored = "3.0.0"
14+
15+
[workspace]

examples/send_bytes/README.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# Send Bytes
2+
3+
Example demonstrating the `send_bytes` method for sending a custom packet to participants in a room.
4+
5+
## Usage
6+
7+
1. Run the example in sender mode:
8+
9+
```sh
10+
export LIVEKIT_URL="..."
11+
export LIVEKIT_TOKEN="<first participant token>"
12+
cargo run -- sender
13+
```
14+
15+
2. In a second terminal, run the example in receiver mode:
16+
17+
```sh
18+
export LIVEKIT_URL="..."
19+
export LIVEKIT_TOKEN="<second participant token>"
20+
cargo run
21+
```
22+
23+
## Custom Packet
24+
25+
This example uses the following hypothetical 4-byte packet structure to teleoperate 16 discrete LED indicators by setting their power states and RGB values:
26+
27+
```mermaid
28+
---
29+
title: "LED Control Packet"
30+
config:
31+
packet:
32+
bitsPerRow: 8
33+
---
34+
packet
35+
+2: "Version"
36+
+5: "Channel"
37+
+1: "On"
38+
+8: "Red"
39+
+8: "Green"
40+
+8: "Blue"
41+
```
42+
43+
The [_bitfield-struct_](https://crates.io/crates/bitfield-struct) crate is used to create a type-safe wrapper for getting and setting the bitfields by name.

examples/send_bytes/src/main.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
use livekit::{Room, RoomEvent, RoomOptions, StreamByteOptions, StreamReader};
2+
use packet::LedControlPacket;
3+
use rand::Rng;
4+
use std::{env, error::Error, time::Duration};
5+
use tokio::{sync::mpsc::UnboundedReceiver, time::sleep};
6+
7+
mod packet;
8+
9+
const LED_CONTROL_TOPIC: &str = "led-control";
10+
11+
#[tokio::main]
12+
async fn main() -> Result<(), Box<dyn Error>> {
13+
env_logger::init();
14+
15+
let url = env::var("LIVEKIT_URL").expect("LIVEKIT_URL is not set");
16+
let token = env::var("LIVEKIT_TOKEN").expect("LIVEKIT_TOKEN is not set");
17+
18+
let is_sender = env::args().nth(1).map_or(false, |arg| arg == "sender");
19+
20+
let (room, rx) = Room::connect(&url, &token, RoomOptions::default()).await?;
21+
println!("Connected to room: {} - {}", room.name(), room.sid().await);
22+
23+
if is_sender { run_sender(room).await } else { run_receiver(room, rx).await }
24+
}
25+
26+
async fn run_sender(room: Room) -> Result<(), Box<dyn Error>> {
27+
println!("Running as sender");
28+
let mut rng = rand::rng();
29+
30+
loop {
31+
// Send control packets with randomized channel and color.
32+
let packet = packet::LedControlPacket::new()
33+
.with_version(1)
34+
.with_channel(rng.random_range(0..16))
35+
.with_is_on(true)
36+
.with_red(rng.random())
37+
.with_green(rng.random())
38+
.with_blue(rng.random());
39+
40+
println!("[tx] {}", packet);
41+
42+
let options = StreamByteOptions { topic: LED_CONTROL_TOPIC.into(), ..Default::default() };
43+
let be_bytes = packet.into_bits().to_be_bytes();
44+
room.local_participant().send_bytes(&be_bytes, options).await?;
45+
46+
sleep(Duration::from_millis(500)).await;
47+
}
48+
}
49+
50+
async fn run_receiver(
51+
_room: Room,
52+
mut rx: UnboundedReceiver<RoomEvent>,
53+
) -> Result<(), Box<dyn Error>> {
54+
println!("Running as receiver");
55+
println!("Waiting for LED control packets…");
56+
while let Some(event) = rx.recv().await {
57+
match event {
58+
RoomEvent::ByteStreamOpened { reader, topic, participant_identity: _ } => {
59+
if topic != LED_CONTROL_TOPIC {
60+
continue;
61+
};
62+
let Some(reader) = reader.take() else { continue };
63+
64+
let Ok(be_bytes) = reader.read_all().await?[..4].try_into() else {
65+
log::warn!("Unexpected packet length");
66+
continue;
67+
};
68+
let packet = LedControlPacket::from(u32::from_be_bytes(be_bytes));
69+
70+
println!("[rx] {}", packet);
71+
}
72+
_ => {}
73+
}
74+
}
75+
Ok(())
76+
}

examples/send_bytes/src/packet.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
use bitfield_struct::bitfield;
2+
use colored::Colorize;
3+
use std::fmt::Display;
4+
5+
/// Custom 4-byte packet structure used for controlling LED
6+
/// state through a LiveKit room.
7+
#[bitfield(u32)]
8+
pub struct LedControlPacket {
9+
/// Packet version (0-4).
10+
#[bits(2)]
11+
pub version: u8,
12+
/// Which LED is being controlled (0-15).
13+
#[bits(5)]
14+
pub channel: u8,
15+
/// Whether or not the channel is on.
16+
#[bits(1)]
17+
pub is_on: bool,
18+
/// Red intensity (0-255).
19+
#[bits(8)]
20+
pub red: u8,
21+
/// Green intensity (0-255).
22+
#[bits(8)]
23+
pub green: u8,
24+
/// Blue intensity (0-255).
25+
#[bits(8)]
26+
pub blue: u8,
27+
}
28+
29+
impl Display for LedControlPacket {
30+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31+
let color_display = if colored::control::SHOULD_COLORIZE.should_colorize() {
32+
" ".on_truecolor(self.red(), self.green(), self.blue())
33+
} else {
34+
// Display RGB value if terminal color is disabled.
35+
format!("rgb({:>3}, {:>3}, {:>3})", self.red(), self.green(), self.blue()).into()
36+
};
37+
write!(f, "Channel {:02} => {}", self.channel(), color_display)
38+
}
39+
}
40+
41+
#[cfg(test)]
42+
mod tests {
43+
use super::LedControlPacket;
44+
45+
#[test]
46+
fn test_bit_representation() {
47+
let packet = LedControlPacket::new()
48+
.with_version(1)
49+
.with_channel(4)
50+
.with_is_on(true)
51+
.with_red(31)
52+
.with_green(213)
53+
.with_blue(249);
54+
assert_eq!(packet.into_bits(), 0xF9D51F91);
55+
}
56+
}

livekit-ffi/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ edition = "2021"
55
license = "Apache-2.0"
66
description = "FFI interface for bindings in other languages"
77
repository = "https://github.com/livekit/rust-sdks"
8+
readme = "README.md"
89

910
[features]
1011
default = ["rustls-tls-native-roots"]

livekit-ffi/README.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# LiveKit FFI
2+
3+
Foreign function interface (FFI) bindings for LiveKit, used to support the following client SDKs:
4+
5+
- [Python](https://github.com/livekit/python-sdks)
6+
- [NodeJS](https://github.com/livekit/node-sdks)
7+
- [Unity](https://github.com/livekit/client-sdk-unity)
8+
9+
This crate is compiled as dynamic library, allowing client languages to invoke APIs from the core [_livekit_](https://crates.io/crates/livekit) crate.

livekit-ffi/protocol/data_stream.proto

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,28 @@ message StreamSendFileCallback {
156156
}
157157
}
158158

159+
// MARK: - Send bytes
160+
161+
// Sends bytes over a data stream.
162+
message StreamSendBytesRequest {
163+
required uint64 local_participant_handle = 1;
164+
165+
required StreamByteOptions options = 2;
166+
167+
// Bytes to send.
168+
required bytes bytes = 3;
169+
}
170+
message StreamSendBytesResponse {
171+
required uint64 async_id = 1;
172+
}
173+
message StreamSendBytesCallback {
174+
required uint64 async_id = 1;
175+
oneof result {
176+
ByteStreamInfo info = 2;
177+
StreamError error = 3;
178+
}
179+
}
180+
159181
// MARK: - Send text
160182

161183
// Sends text over a data stream.

livekit-ffi/protocol/ffi.proto

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,9 @@ message FfiRequest {
148148
TextStreamWriterWriteRequest text_stream_write = 65;
149149
TextStreamWriterCloseRequest text_stream_close = 66;
150150

151-
// NEXT_ID: 67
151+
StreamSendBytesRequest send_bytes = 67;
152+
153+
// NEXT_ID: 68
152154
}
153155
}
154156

@@ -243,7 +245,9 @@ message FfiResponse {
243245
TextStreamWriterWriteResponse text_stream_write = 64;
244246
TextStreamWriterCloseResponse text_stream_close = 65;
245247

246-
// NEXT_ID: 66
248+
StreamSendBytesResponse send_bytes = 66;
249+
250+
// NEXT_ID: 67
247251
}
248252
}
249253

@@ -298,6 +302,7 @@ message FfiEvent {
298302
TextStreamWriterWriteCallback text_stream_writer_write = 38;
299303
TextStreamWriterCloseCallback text_stream_writer_close = 39;
300304
StreamSendTextCallback send_text = 40;
305+
StreamSendBytesCallback send_bytes = 41;
301306
}
302307
}
303308

livekit-ffi/src/livekit.proto.rs

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2447,6 +2447,45 @@ pub mod stream_send_file_callback {
24472447
Error(super::StreamError),
24482448
}
24492449
}
2450+
// MARK: - Send bytes
2451+
2452+
/// Sends bytes over a data stream.
2453+
#[allow(clippy::derive_partial_eq_without_eq)]
2454+
#[derive(Clone, PartialEq, ::prost::Message)]
2455+
pub struct StreamSendBytesRequest {
2456+
#[prost(uint64, required, tag="1")]
2457+
pub local_participant_handle: u64,
2458+
#[prost(message, required, tag="2")]
2459+
pub options: StreamByteOptions,
2460+
/// Bytes to send.
2461+
#[prost(bytes="vec", required, tag="3")]
2462+
pub bytes: ::prost::alloc::vec::Vec<u8>,
2463+
}
2464+
#[allow(clippy::derive_partial_eq_without_eq)]
2465+
#[derive(Clone, PartialEq, ::prost::Message)]
2466+
pub struct StreamSendBytesResponse {
2467+
#[prost(uint64, required, tag="1")]
2468+
pub async_id: u64,
2469+
}
2470+
#[allow(clippy::derive_partial_eq_without_eq)]
2471+
#[derive(Clone, PartialEq, ::prost::Message)]
2472+
pub struct StreamSendBytesCallback {
2473+
#[prost(uint64, required, tag="1")]
2474+
pub async_id: u64,
2475+
#[prost(oneof="stream_send_bytes_callback::Result", tags="2, 3")]
2476+
pub result: ::core::option::Option<stream_send_bytes_callback::Result>,
2477+
}
2478+
/// Nested message and enum types in `StreamSendBytesCallback`.
2479+
pub mod stream_send_bytes_callback {
2480+
#[allow(clippy::derive_partial_eq_without_eq)]
2481+
#[derive(Clone, PartialEq, ::prost::Oneof)]
2482+
pub enum Result {
2483+
#[prost(message, tag="2")]
2484+
Info(super::ByteStreamInfo),
2485+
#[prost(message, tag="3")]
2486+
Error(super::StreamError),
2487+
}
2488+
}
24502489
// MARK: - Send text
24512490

24522491
/// Sends text over a data stream.
@@ -4889,7 +4928,7 @@ pub struct RpcMethodInvocationEvent {
48894928
#[allow(clippy::derive_partial_eq_without_eq)]
48904929
#[derive(Clone, PartialEq, ::prost::Message)]
48914930
pub struct FfiRequest {
4892-
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 48, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66")]
4931+
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 48, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67")]
48934932
pub message: ::core::option::Option<ffi_request::Message>,
48944933
}
48954934
/// Nested message and enum types in `FfiRequest`.
@@ -5037,13 +5076,15 @@ pub mod ffi_request {
50375076
TextStreamWrite(super::TextStreamWriterWriteRequest),
50385077
#[prost(message, tag="66")]
50395078
TextStreamClose(super::TextStreamWriterCloseRequest),
5079+
#[prost(message, tag="67")]
5080+
SendBytes(super::StreamSendBytesRequest),
50405081
}
50415082
}
50425083
/// This is the output of livekit_ffi_request function.
50435084
#[allow(clippy::derive_partial_eq_without_eq)]
50445085
#[derive(Clone, PartialEq, ::prost::Message)]
50455086
pub struct FfiResponse {
5046-
#[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 47, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65")]
5087+
#[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 47, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66")]
50475088
pub message: ::core::option::Option<ffi_response::Message>,
50485089
}
50495090
/// Nested message and enum types in `FfiResponse`.
@@ -5189,6 +5230,8 @@ pub mod ffi_response {
51895230
TextStreamWrite(super::TextStreamWriterWriteResponse),
51905231
#[prost(message, tag="65")]
51915232
TextStreamClose(super::TextStreamWriterCloseResponse),
5233+
#[prost(message, tag="66")]
5234+
SendBytes(super::StreamSendBytesResponse),
51925235
}
51935236
}
51945237
/// To minimize complexity, participant events are not included in the protocol.
@@ -5197,7 +5240,7 @@ pub mod ffi_response {
51975240
#[allow(clippy::derive_partial_eq_without_eq)]
51985241
#[derive(Clone, PartialEq, ::prost::Message)]
51995242
pub struct FfiEvent {
5200-
#[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40")]
5243+
#[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41")]
52015244
pub message: ::core::option::Option<ffi_event::Message>,
52025245
}
52035246
/// Nested message and enum types in `FfiEvent`.
@@ -5285,6 +5328,8 @@ pub mod ffi_event {
52855328
TextStreamWriterClose(super::TextStreamWriterCloseCallback),
52865329
#[prost(message, tag="40")]
52875330
SendText(super::StreamSendTextCallback),
5331+
#[prost(message, tag="41")]
5332+
SendBytes(super::StreamSendBytesCallback),
52885333
}
52895334
}
52905335
/// Stop all rooms synchronously (Do we need async here?).

0 commit comments

Comments
 (0)