Skip to content

Commit f4f2599

Browse files
committed
feat: relay ipc msg via sfu datachannel
1 parent 8d5fc32 commit f4f2599

19 files changed

+369
-103
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
node_modules/
21
*.o
32
*.tmp
43
*.jpg
@@ -13,5 +12,6 @@ Makefile
1312

1413
# folder
1514
build
15+
proto
1616

1717
!/doc/*.jpg

.vscode/c_cpp_properties.json

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,19 @@
33
{
44
"name": "Linux",
55
"includePath": [
6-
"${workspaceFolder}/**",
6+
"${workspaceFolder}/src",
7+
"${workspaceFolder}/proto",
8+
"${workspaceFolder}/external/livekit-protocol/protobufs",
79
"/usr/include",
810
"/usr/local/include",
9-
"/usr/include/libcamera/**",
10-
"/usr/local/include/webrtc/*",
11-
"/usr/local/include/webrtc/third_party/abseil-cpp/*",
12-
"/usr/local/include/webrtc/third_party/libyuv/include/*",
13-
"/usr/local/include/webrtc/tools/json_schema_compiler/*",
14-
"/usr/local/include/webrtc/third_party/boringssl/src/include/*",
15-
"/usr/include/boost/program_options/*",
16-
"${workspaceFolder}/src"
11+
"/usr/include/libcamera",
12+
"/usr/local/include/webrtc",
13+
"/usr/local/include/webrtc/third_party/abseil-cpp",
14+
"/usr/local/include/webrtc/third_party/libyuv/include",
15+
"/usr/local/include/webrtc/tools/json_schema_compiler",
16+
"/usr/local/include/webrtc/third_party/boringssl/src/include",
17+
"/usr/include/boost/program_options",
18+
"${workspaceFolder}"
1719
],
1820
"defines": [],
1921
"compilerPath": "/usr/bin/clang",

doc/BUILD.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
* Install the lib from official repo [[tutorial](https://repo.mosquitto.org/debian/README.txt)]. (recommended)
99
4. Install essential packages
1010
```bash
11-
sudo apt install cmake clang clang-format mosquitto-dev libboost-program-options-dev libavformat-dev libavcodec-dev libavutil-dev libswscale-dev libpulse-dev libasound2-dev libjpeg-dev libcamera-dev libmosquitto-dev
11+
sudo apt install cmake clang clang-format mosquitto-dev libboost-program-options-dev libavformat-dev libavcodec-dev libavutil-dev libswscale-dev libpulse-dev libasound2-dev libjpeg-dev libcamera-dev libmosquitto-dev protobuf-compiler libprotobuf-dev
1212
```
1313
5. Copy the [nlohmann/json.hpp](https://github.com/nlohmann/json/blob/develop/single_include/nlohmann/json.hpp) to `/usr/local/include`
1414
```bash

example/unix_socket_client.py

100644100755
File mode changed.

scripts/gen_proto.sh

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
#!/bin/bash
2+
3+
PROTO_SRC_DIR=external/livekit-protocol/protobufs
4+
PROTO_OUT_DIR=proto
5+
6+
mkdir -p $PROTO_OUT_DIR
7+
8+
protoc -I=$PROTO_SRC_DIR --cpp_out=$PROTO_OUT_DIR $PROTO_SRC_DIR/*.proto

src/args.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ struct Args {
105105
// ipc
106106
bool enable_ipc = false;
107107
std::string socket_path = "/tmp/pi-webrtc-ipc.sock";
108+
std::string ipc_channel = "both";
109+
int ipc_channel_mode = -1;
108110

109111
// webrtc
110112
int jpeg_quality = 30;

src/parser.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,12 @@ static const std::unordered_map<std::string, int> v4l2_fmt_table = {
1616
{"yuyv", V4L2_PIX_FMT_YUYV},
1717
};
1818

19+
static const std::unordered_map<std::string, int> ipc_mode_table = {
20+
{"both", -1},
21+
{"lossy", ChannelMode::Lossy},
22+
{"reliable", ChannelMode::Reliable},
23+
};
24+
1925
static const std::unordered_map<std::string, int> ae_metering_table = {
2026
{"centre", libcamera::controls::MeteringCentreWeighted},
2127
{"spot", libcamera::controls::MeteringSpot},
@@ -144,6 +150,8 @@ void Parser::ParseArgs(int argc, char *argv[], Args &args) {
144150
"the output resolution will remain fixed regardless of network or device conditions.")
145151
("enable-ipc", bpo::bool_switch(&args.enable_ipc)->default_value(args.enable_ipc),
146152
"Enable IPC relay using a WebRTC DataChannel, lossy (UDP-like) or reliable (TCP-like) based on client preference.")
153+
("ipc-channel", bpo::value<std::string>(&args.ipc_channel)->default_value(args.ipc_channel),
154+
"IPC channel mode: both, lossy, reliable")
147155
("socket-path", bpo::value<std::string>(&args.socket_path)->default_value(args.socket_path),
148156
"Specifies the Unix domain socket path used to bridge messages between "
149157
"the WebRTC DataChannel and local IPC applications.")
@@ -254,6 +262,8 @@ void Parser::ParseArgs(int argc, char *argv[], Args &args) {
254262

255263
args.jpeg_quality = std::clamp(args.jpeg_quality, 0, 100);
256264

265+
args.ipc_channel_mode = ParseEnum(ipc_mode_table, args.ipc_channel);
266+
257267
ParseDevice(args);
258268
}
259269

src/rtc/CMakeLists.txt

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,53 @@
11
project(rtc)
22

3+
find_package(Protobuf REQUIRED)
4+
5+
# path of the proto files
6+
set(PROTO_SRC_DIR "${CMAKE_SOURCE_DIR}/external/livekit-protocol/protobufs")
7+
set(PROTO_OUT_DIR "${CMAKE_SOURCE_DIR}/proto")
8+
9+
file(MAKE_DIRECTORY ${PROTO_OUT_DIR})
10+
11+
# collect all .proto
12+
file(GLOB PROTO_FILES "${PROTO_SRC_DIR}/*.proto")
13+
14+
# generate `*.pb.cc` and `*.pb.h` to `proto/`
15+
foreach(proto_file ${PROTO_FILES})
16+
get_filename_component(proto_name ${proto_file} NAME_WE)
17+
18+
set(proto_cc "${PROTO_OUT_DIR}/${proto_name}.pb.cc")
19+
set(proto_h "${PROTO_OUT_DIR}/${proto_name}.pb.h")
20+
21+
add_custom_command(
22+
OUTPUT ${proto_cc} ${proto_h}
23+
COMMAND ${Protobuf_PROTOC_EXECUTABLE}
24+
ARGS --cpp_out=${PROTO_OUT_DIR}
25+
-I ${PROTO_SRC_DIR}
26+
${proto_file}
27+
DEPENDS ${proto_file}
28+
)
29+
30+
list(APPEND PROTO_SRCS ${proto_cc})
31+
list(APPEND PROTO_HDRS ${proto_h})
32+
endforeach()
33+
334
aux_source_directory(${PROJECT_SOURCE_DIR} RTC_FILES)
435

5-
add_library(${PROJECT_NAME} ${RTC_FILES})
36+
add_library(${PROJECT_NAME}
37+
${RTC_FILES}
38+
${PROTO_SRCS}
39+
)
40+
41+
target_include_directories(${PROJECT_NAME} PUBLIC
42+
${CMAKE_SOURCE_DIR}
43+
${PROTO_OUT_DIR}
44+
)
645

7-
target_link_libraries(${PROJECT_NAME} PUBLIC common track capturer v4l2_codecs ipc)
46+
target_link_libraries(${PROJECT_NAME} PUBLIC
47+
common
48+
track
49+
capturer
50+
v4l2_codecs
51+
ipc
52+
protobuf::libprotobuf
53+
)

src/rtc/conductor.cpp

Lines changed: 77 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -137,17 +137,49 @@ rtc::scoped_refptr<RtcPeer> Conductor::CreatePeerConnection(PeerConfig config) {
137137

138138
peer->SetPeer(result.MoveValue());
139139

140-
if (config.is_sfu_peer) {
141-
if (!config.is_publisher) {
142-
return peer;
140+
InitializeDataChannels(peer);
141+
142+
AddTracks(peer->GetPeer());
143+
144+
DEBUG_PRINT("Peer connection(%s) is created! ", peer->GetId().c_str());
145+
return peer;
146+
}
147+
148+
void Conductor::InitializeDataChannels(rtc::scoped_refptr<RtcPeer> peer) {
149+
if (peer->isSfuPeer() && !peer->isPublisher()) {
150+
peer->SetOnDataChannelCallback([this](std::shared_ptr<RtcChannel> channel) {
151+
DEBUG_PRINT("Remote channel (%s) from sfu subscriber peer [%s]",
152+
channel->label().c_str(), channel->id().c_str());
153+
BindDataChannelToIpcReceiver(channel);
154+
});
155+
return;
156+
}
157+
158+
// Essential data channels(Lossy / Reliable / Command)
159+
auto lossy_channel = peer->CreateDataChannel(ChannelMode::Lossy);
160+
auto reliable_channel = peer->CreateDataChannel(ChannelMode::Reliable);
161+
162+
if (args.enable_ipc) {
163+
switch (args.ipc_channel_mode) {
164+
case ChannelMode::Lossy:
165+
BindIpcToDataChannel(lossy_channel);
166+
break;
167+
case ChannelMode::Reliable:
168+
BindIpcToDataChannel(reliable_channel);
169+
break;
170+
default:
171+
BindIpcToDataChannel(lossy_channel);
172+
BindIpcToDataChannel(reliable_channel);
173+
break;
143174
}
144-
peer->CreateDataChannel(ChannelMode::Lossy);
145-
peer->CreateDataChannel(ChannelMode::Reliable);
146-
} else if (args.enable_ipc) {
147-
SetupIpcDataChannel(peer, ChannelMode::Lossy);
148-
SetupIpcDataChannel(peer, ChannelMode::Reliable);
149175
}
150176

177+
if (!peer->isSfuPeer()) {
178+
InitializeCommandChannel(peer);
179+
}
180+
}
181+
182+
void Conductor::InitializeCommandChannel(rtc::scoped_refptr<RtcPeer> peer) {
151183
auto cmd_channel = peer->CreateDataChannel(ChannelMode::Command);
152184
cmd_channel->RegisterHandler(
153185
CommandType::SNAPSHOT,
@@ -169,15 +201,9 @@ rtc::scoped_refptr<RtcPeer> Conductor::CreatePeerConnection(PeerConfig config) {
169201
[this](std::shared_ptr<RtcChannel> datachannel, const std::string &msg) {
170202
OnCameraOption(datachannel, msg);
171203
});
172-
173-
AddTracks(peer->GetPeer());
174-
175-
DEBUG_PRINT("Peer connection(%s) is created! ", peer->GetId().c_str());
176-
return peer;
177204
}
178205

179-
void Conductor::OnSnapshot(std::shared_ptr<RtcChannel> datachannel,
180-
const std::string &msg) {
206+
void Conductor::OnSnapshot(std::shared_ptr<RtcChannel> datachannel, const std::string &msg) {
181207
try {
182208
std::stringstream ss(msg);
183209
int num;
@@ -193,8 +219,7 @@ void Conductor::OnSnapshot(std::shared_ptr<RtcChannel> datachannel,
193219
}
194220
}
195221

196-
void Conductor::OnMetadata(std::shared_ptr<RtcChannel> datachannel,
197-
const std::string &msg) {
222+
void Conductor::OnMetadata(std::shared_ptr<RtcChannel> datachannel, const std::string &msg) {
198223
DEBUG_PRINT("OnMetadata msg: %s", msg.c_str());
199224
json jsonObj = json::parse(msg.c_str());
200225

@@ -242,8 +267,7 @@ void Conductor::OnRecord(std::shared_ptr<RtcChannel> datachannel, const std::str
242267
}
243268
}
244269

245-
void Conductor::OnCameraOption(std::shared_ptr<RtcChannel> datachannel,
246-
const std::string &msg) {
270+
void Conductor::OnCameraOption(std::shared_ptr<RtcChannel> datachannel, const std::string &msg) {
247271
DEBUG_PRINT("OnCameraControl msg: %s", msg.c_str());
248272
json jsonObj = json::parse(msg.c_str());
249273

@@ -325,19 +349,40 @@ void Conductor::InitializeIpcServer() {
325349
}
326350
}
327351

328-
void Conductor::SetupIpcDataChannel(rtc::scoped_refptr<RtcPeer> peer, ChannelMode mode) {
329-
auto channel = peer->CreateDataChannel(mode);
330-
if (channel && ipc_server_) {
331-
ipc_server_->RegisterPeerCallback(channel->label(), [channel](const std::string &msg) {
332-
channel->Send(msg);
333-
});
334-
335-
channel->OnClosed([this](const std::string &label) {
336-
ipc_server_->UnregisterPeerCallback(label);
337-
});
352+
void Conductor::BindIpcToDataChannel(std::shared_ptr<RtcChannel> channel) {
353+
BindIpcToDataChannelSender(channel);
354+
BindDataChannelToIpcReceiver(channel);
355+
}
338356

339-
channel->RegisterHandler(CommandType::CUSTOM, [this](const std::string &msg) {
340-
ipc_server_->Write(msg);
341-
});
357+
void Conductor::BindIpcToDataChannelSender(std::shared_ptr<RtcChannel> channel) {
358+
if (!channel || !ipc_server_) {
359+
ERROR_PRINT("IPC or DataChannel is not found!");
360+
return;
342361
}
362+
363+
const auto id = channel->id();
364+
const auto label = channel->label();
365+
366+
ipc_server_->RegisterPeerCallback(id, [channel](const std::string &msg) {
367+
channel->Send(msg);
368+
});
369+
DEBUG_PRINT("[%s] DataChannel (%s) registered to IPC server for sending.", id.c_str(),
370+
label.c_str());
371+
372+
channel->OnClosed([this, id, label]() {
373+
ipc_server_->UnregisterPeerCallback(id);
374+
DEBUG_PRINT("[%s] DataChannel (%s) unregistered from IPC server.", id.c_str(),
375+
label.c_str());
376+
});
377+
}
378+
379+
void Conductor::BindDataChannelToIpcReceiver(std::shared_ptr<RtcChannel> channel) {
380+
if (!channel || !ipc_server_)
381+
return;
382+
383+
channel->RegisterHandler(CommandType::CUSTOM, [this](const std::string &msg) {
384+
ipc_server_->Write(msg);
385+
});
386+
DEBUG_PRINT("DataChannel (%s) connected to IPC server for receiving.",
387+
channel->label().c_str());
343388
}

src/rtc/conductor.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,13 @@ class Conductor {
3434
void InitializePeerConnectionFactory();
3535
void InitializeTracks();
3636
void InitializeIpcServer();
37+
void InitializeDataChannels(rtc::scoped_refptr<RtcPeer> peer);
38+
void InitializeCommandChannel(rtc::scoped_refptr<RtcPeer> peer);
39+
40+
void BindIpcToDataChannel(std::shared_ptr<RtcChannel> channel);
41+
void BindIpcToDataChannelSender(std::shared_ptr<RtcChannel> channel);
42+
void BindDataChannelToIpcReceiver(std::shared_ptr<RtcChannel> channel);
3743

38-
void SetupIpcDataChannel(rtc::scoped_refptr<RtcPeer> peer, ChannelMode mode);
3944
void AddTracks(rtc::scoped_refptr<webrtc::PeerConnectionInterface> peer_connection);
4045
void OnSnapshot(std::shared_ptr<RtcChannel> datachannel, const std::string &msg);
4146
void OnMetadata(std::shared_ptr<RtcChannel> datachannel, const std::string &path);

0 commit comments

Comments
 (0)