Skip to content

Commit 57db5bc

Browse files
committed
feat: push streaming to sfu through websocket signaling
1 parent 066c9ef commit 57db5bc

File tree

10 files changed

+306
-14
lines changed

10 files changed

+306
-14
lines changed

src/args.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ struct Args {
2121
bool use_libcamera = false;
2222
bool use_mqtt = false;
2323
bool use_whep = false;
24+
bool use_websocket = false;
2425
bool fixed_resolution = false;
2526
uint32_t format = V4L2_PIX_FMT_MJPEG;
2627
std::string v4l2_format = "mjpeg";
@@ -40,6 +41,16 @@ struct Args {
4041

4142
// http signaling
4243
uint16_t http_port = 8080;
44+
45+
// websocket signaling
46+
int ws_port = 8080;
47+
std::string ws_host = "192.168.4.21";
48+
std::string ws_token =
49+
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9."
50+
"eyJleHAiOjE3NzQ3NDU2MzksImlzcyI6IkFQSVduUVRzNHRtVVp2QSIsIm5iZiI6MTc0MzIwOTYzOSwic3ViIjoiZj"
51+
"JkNzRiOTUtMmYxNi00ODRiLTg3NjctYThjNWY3NzFlZWY2IiwidmlkZW8iOnsiY2FuUHVibGlzaCI6dHJ1ZSwiY2Fu"
52+
"UHVibGlzaERhdGEiOnRydWUsImNhblN1YnNjcmliZSI6ZmFsc2UsInJvb20iOiJkZXZpY2UtMSIsInJvb21Kb2luIj"
53+
"p0cnVlfX0.o7e-gjkqfMpeDjATwaWWLKUWPa8RlaoIOcuw3p8FxQk";
4354
};
4455

4556
#endif // ARGS_H_

src/conductor.cpp

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,17 +79,15 @@ void Conductor::AddTracks(rtc::scoped_refptr<webrtc::PeerConnectionInterface> pe
7979
return;
8080
}
8181

82-
std::string stream_id = "test_stream_id";
83-
8482
if (audio_track_) {
85-
auto audio_res = peer_connection->AddTrack(audio_track_, {stream_id});
83+
auto audio_res = peer_connection->AddTrack(audio_track_, {args.uid});
8684
if (!audio_res.ok()) {
8785
ERROR_PRINT("Failed to add audio track, %s", audio_res.error().message());
8886
}
8987
}
9088

9189
if (video_track_) {
92-
auto video_res = peer_connection->AddTrack(video_track_, {stream_id});
90+
auto video_res = peer_connection->AddTrack(video_track_, {args.uid});
9391
if (!video_res.ok()) {
9492
ERROR_PRINT("Failed to add video track, %s", video_res.error().message());
9593
}
@@ -101,8 +99,7 @@ void Conductor::AddTracks(rtc::scoped_refptr<webrtc::PeerConnectionInterface> pe
10199
}
102100
}
103101

104-
rtc::scoped_refptr<RtcPeer> Conductor::CreatePeerConnection(PeerConfig peer_config) {
105-
webrtc::PeerConnectionInterface::RTCConfiguration config;
102+
rtc::scoped_refptr<RtcPeer> Conductor::CreatePeerConnection(PeerConfig config) {
106103
config.sdp_semantics = webrtc::SdpSemantics::kUnifiedPlan;
107104
webrtc::PeerConnectionInterface::IceServer server;
108105
server.uri = args.stun_url;
@@ -116,8 +113,8 @@ rtc::scoped_refptr<RtcPeer> Conductor::CreatePeerConnection(PeerConfig peer_conf
116113
config.servers.push_back(turn_server);
117114
}
118115

119-
peer_config.timeout = args.peer_timeout;
120-
auto peer = RtcPeer::Create(std::move(peer_config));
116+
config.timeout = args.peer_timeout;
117+
auto peer = RtcPeer::Create(std::move(config));
121118
auto result = peer_connection_factory_->CreatePeerConnectionOrError(
122119
config, webrtc::PeerConnectionDependencies(peer.get()));
123120

@@ -127,6 +124,11 @@ rtc::scoped_refptr<RtcPeer> Conductor::CreatePeerConnection(PeerConfig peer_conf
127124
}
128125

129126
peer->SetPeer(result.MoveValue());
127+
128+
if (!config.is_publisher) {
129+
return peer;
130+
}
131+
130132
peer->CreateDataChannel();
131133
peer->OnSnapshot([this](std::shared_ptr<DataChannelSubject> datachannel, std::string msg) {
132134
OnSnapshot(datachannel, msg);

src/main.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include "recorder/recorder_manager.h"
99
#include "signaling/http_service.h"
1010
#include "signaling/mqtt_service.h"
11+
#include "signaling/websocket_service.h"
1112

1213
int main(int argc, char *argv[]) {
1314
Args args;
@@ -33,6 +34,10 @@ int main(int argc, char *argv[]) {
3334
services.push_back(HttpService::Create(args, conductor, ioc_));
3435
}
3536

37+
if (args.use_websocket) {
38+
services.push_back(WebsocketService::Create(args, conductor, ioc_));
39+
}
40+
3641
if (args.use_mqtt) {
3742
services.push_back(MqttService::Create(args, conductor));
3843
}

src/parser.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ void Parser::ParseArgs(int argc, char *argv[], Args &args) {
5959
("mqtt_password", bpo::value<std::string>()->default_value(args.mqtt_password),
6060
"Mqtt server password")
6161
("http_port", bpo::value<uint16_t>()->default_value(args.http_port), "Http server port")
62+
("ws_host", bpo::value<std::string>()->default_value(args.ws_host),
63+
"Websocket server host")
64+
("ws_port", bpo::value<int>()->default_value(args.ws_port), "Websocket server port")
65+
("ws_token", bpo::value<std::string>()->default_value(args.ws_token),
66+
"Websocket server token")
6267
("record_path", bpo::value<std::string>()->default_value(args.record_path),
6368
"The path to save the recording video files. The recorder won't start if it's empty")
6469
("hw_accel", bpo::bool_switch()->default_value(args.hw_accel),
@@ -67,6 +72,8 @@ void Parser::ParseArgs(int argc, char *argv[], Args &args) {
6772
"Use mqtt to exchange sdp and ice candidates")
6873
("use_whep", bpo::bool_switch()->default_value(args.use_whep),
6974
"Use whep to exchange sdp and ice candidates")
75+
("use_websocket", bpo::bool_switch()->default_value(args.use_websocket),
76+
"Use websocket to exchange sdp and ice candidates")
7077
("v4l2_format", bpo::value<std::string>()->default_value(args.v4l2_format),
7178
"Set v4l2 camera capture format to `i420`, `mjpeg`, `h264`. The `h264` can pass "
7279
"packets into mp4 without encoding to reduce cpu usage."
@@ -106,13 +113,17 @@ void Parser::ParseArgs(int argc, char *argv[], Args &args) {
106113
SetIfExists(vm, "mqtt_username", args.mqtt_username);
107114
SetIfExists(vm, "mqtt_password", args.mqtt_password);
108115
SetIfExists(vm, "http_port", args.http_port);
116+
SetIfExists(vm, "ws_host", args.ws_host);
117+
SetIfExists(vm, "ws_port", args.ws_port);
118+
SetIfExists(vm, "ws_token", args.ws_token);
109119
SetIfExists(vm, "record_path", args.record_path);
110120

111121
args.fixed_resolution = vm["fixed_resolution"].as<bool>();
112122
args.no_audio = vm["no_audio"].as<bool>();
113123
args.hw_accel = vm["hw_accel"].as<bool>();
114124
args.use_mqtt = vm["use_mqtt"].as<bool>();
115125
args.use_whep = vm["use_whep"].as<bool>();
126+
args.use_websocket = vm["use_websocket"].as<bool>();
116127

117128
if (!args.stun_url.empty() && args.stun_url.substr(0, 4) != "stun") {
118129
std::cout << "Stun url should not be empty and start with \"stun:\"" << std::endl;

src/rtc_peer.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,14 @@ RtcPeer::~RtcPeer() {
1818
DEBUG_PRINT("peer connection (%s) was destroyed!", id_.c_str());
1919
}
2020

21+
void RtcPeer::CreateOffer() {
22+
if (signaling_state_ == webrtc::PeerConnectionInterface::SignalingState::kHaveLocalOffer) {
23+
return;
24+
}
25+
26+
peer_connection_->CreateOffer(this, webrtc::PeerConnectionInterface::RTCOfferAnswerOptions());
27+
}
28+
2129
void RtcPeer::Terminate() {
2230
is_connected_.store(false);
2331
is_complete_.store(true);
@@ -105,6 +113,10 @@ void RtcPeer::OnCameraOption(OnCommand func) {
105113
}
106114

107115
void RtcPeer::SubscribeCommandChannel(CommandType type, OnCommand func) {
116+
if (!data_channel_subject_) {
117+
ERROR_PRINT("Data channel is not created!");
118+
return;
119+
}
108120
auto observer = data_channel_subject_->AsObservable(type);
109121
observer->Subscribe([this, func](std::string message) {
110122
if (!message.empty()) {
@@ -114,6 +126,7 @@ void RtcPeer::SubscribeCommandChannel(CommandType type, OnCommand func) {
114126
}
115127

116128
void RtcPeer::OnSignalingChange(webrtc::PeerConnectionInterface::SignalingState new_state) {
129+
signaling_state_ = new_state;
117130
auto state = webrtc::PeerConnectionInterface::AsString(new_state);
118131
DEBUG_PRINT("OnSignalingChange => %s", std::string(state).c_str());
119132
if (new_state == webrtc::PeerConnectionInterface::SignalingState::kHaveRemoteOffer) {

src/rtc_peer.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@
1212
#include "common/logging.h"
1313
#include "data_channel_subject.h"
1414

15-
struct PeerConfig {
15+
struct PeerConfig : public webrtc::PeerConnectionInterface::RTCConfiguration {
1616
int timeout = 10;
17+
bool is_publisher = true;
1718
bool has_candidates_in_sdp = false;
1819
};
1920

@@ -82,6 +83,7 @@ class RtcPeer : public webrtc::PeerConnectionObserver,
8283

8384
RtcPeer(PeerConfig config);
8485
~RtcPeer();
86+
void CreateOffer();
8587
void Terminate();
8688
bool IsConnected() const;
8789
std::string GetId() const;
@@ -129,6 +131,7 @@ class RtcPeer : public webrtc::PeerConnectionObserver,
129131

130132
std::string modified_sdp_;
131133
webrtc::SdpParseError *modified_desc_error_;
134+
webrtc::PeerConnectionInterface::SignalingState signaling_state_;
132135
std::unique_ptr<webrtc::SessionDescriptionInterface> modified_desc_;
133136

134137
std::shared_ptr<DataChannelSubject> data_channel_subject_;

src/signaling/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ endif()
88
add_library(${PROJECT_NAME}
99
mqtt_service.cpp
1010
http_service.cpp
11+
websocket_service.cpp
1112
)
1213

1314
target_link_libraries(${PROJECT_NAME} PUBLIC ${MOSQUITTO_LIBS})

src/signaling/signaling_service.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
class SignalingService {
99
public:
1010
SignalingService(std::shared_ptr<Conductor> conductor, bool has_candidates_in_sdp = false)
11-
: conductor_(conductor),
11+
: conductor(conductor),
1212
has_candidates_in_sdp_(has_candidates_in_sdp) {}
1313

1414
void Start() {
@@ -20,11 +20,10 @@ class SignalingService {
2020
Connect();
2121
}
2222

23-
rtc::scoped_refptr<RtcPeer> CreatePeer() {
24-
PeerConfig config;
23+
rtc::scoped_refptr<RtcPeer> CreatePeer(PeerConfig config = PeerConfig{}) {
2524
config.has_candidates_in_sdp = has_candidates_in_sdp_;
2625

27-
auto peer = conductor_->CreatePeerConnection(std::move(config));
26+
auto peer = conductor->CreatePeerConnection(std::move(config));
2827
peer_map_[peer->GetId()] = peer;
2928
return peer;
3029
}
@@ -58,10 +57,11 @@ class SignalingService {
5857
virtual void Connect() = 0;
5958
virtual void Disconnect() = 0;
6059

60+
std::shared_ptr<Conductor> conductor;
61+
6162
private:
6263
bool has_candidates_in_sdp_;
6364
std::unique_ptr<Worker> worker_;
64-
std::shared_ptr<Conductor> conductor_;
6565
std::unordered_map<std::string, rtc::scoped_refptr<RtcPeer>> peer_map_;
6666
};
6767

0 commit comments

Comments
 (0)