22
33#include < nlohmann/json.hpp>
44
5- #include " common/logging.h"
6-
7- const int MAX_RETRIES = 10 ;
8-
9- std::shared_ptr<WebsocketService> WebsocketService::Create (Args args,
10- std::shared_ptr<Conductor> conductor,
11- boost::asio::io_context &ioc) {
5+ std::shared_ptr<WebsocketService>
6+ WebsocketService::Create (Args args, std::shared_ptr<Conductor> conductor, net::io_context &ioc) {
127 return std::make_shared<WebsocketService>(args, conductor, ioc);
138}
149
1510WebsocketService::WebsocketService (Args args, std::shared_ptr<Conductor> conductor,
16- boost::asio ::io_context &ioc)
11+ net ::io_context &ioc)
1712 : SignalingService(conductor),
1813 args_(args),
19- resolver_( ioc),
20- ws_( ioc) {}
14+ ws_(InitWebSocket( ioc) ),
15+ resolver_(net::make_strand( ioc) ) {}
2116
2217WebsocketService::~WebsocketService () { Disconnect (); }
2318
24- void WebsocketService::OnRemoteIce (const std::string &message) {
25- nlohmann::json res = nlohmann::json::parse (message);
26- std::string target = res[" target" ];
27- std::string canditateInit = res[" candidateInit" ];
19+ WebSocketVariant WebsocketService::InitWebSocket (net::io_context &ioc) {
20+ if (args_.use_tls ) {
21+ // The SSL context created via boost::asio::ssl::context uses the underlying BoringSSL
22+ // implementation (when linked with WebRTC or other BoringSSL-based libraries). BoringSSL is
23+ // not a drop-in replacement for OpenSSL and does not implement all OpenSSL APIs. As a
24+ // result, certain methods may be unsupported or behave differently.
25+ // Ensure that only compatible OpenSSL APIs are used when BoringSSL is present.
26+ DEBUG_PRINT (" Using TLS WebSocket, SSL version: %s" , OpenSSL_version (OPENSSL_VERSION));
2827
29- nlohmann::json canditateObj = nlohmann::json::parse (canditateInit);
30- std::string sdp_mid = canditateObj[" sdpMid" ];
31- int sdp_mline_index = canditateObj[" sdpMLineIndex" ];
32- std::string candidate = canditateObj[" candidate" ];
33- DEBUG_PRINT (" Received remote ICE: %s, %d, %s" , sdp_mid.c_str (), sdp_mline_index,
34- candidate.c_str ());
28+ ssl::context ctx (ssl::context::tls);
29+ ctx.set_default_verify_paths ();
30+ ctx.set_verify_mode (ssl::verify_peer);
3531
36- if (target == " PUBLISHER" ) {
37- pub_peer_->SetRemoteIce (sdp_mid, sdp_mline_index, candidate);
38- } else if (target == " SUBSCRIBER" ) {
39- sub_peer_->SetRemoteIce (sdp_mid, sdp_mline_index, candidate);
32+ return websocket::stream<ssl::stream<tcp::socket>>(net::make_strand (ioc), ctx);
33+ } else {
34+ return websocket::stream<tcp::socket>(net::make_strand (ioc));
4035 }
4136}
4237
4338void WebsocketService::Connect () {
39+ auto port = args_.use_tls ? 443 : 80 ;
40+ INFO_PRINT (" Connect to WebSocket %s:%d" , args_.ws_host .c_str (), port);
41+
4442 resolver_.async_resolve (
45- args_.ws_host , std::to_string (args_. ws_port ),
43+ args_.ws_host , std::to_string (port ),
4644 [this ](boost::system::error_code ec, tcp::resolver::results_type results) {
4745 OnResolve (ec, results);
4846 });
4947}
5048
5149void WebsocketService::Disconnect () {
52- if (ws_.is_open ()) {
53- ws_.async_close (websocket::close_code::normal, [this ](boost::system::error_code ec) {
54- if (ec) {
55- ERROR_PRINT (" Close Error: %s" , ec.message ().c_str ());
50+ std::visit (
51+ [](auto &ws) {
52+ if (ws.is_open ()) {
53+ ws.async_close (websocket::close_code::normal, [](boost::system::error_code ec) {
54+ if (ec) {
55+ ERROR_PRINT (" Close Error: %s" , ec.message ().c_str ());
56+ } else {
57+ INFO_PRINT (" WebSocket Closed" );
58+ }
59+ });
5660 } else {
57- INFO_PRINT (" WebSocket Closed " );
61+ INFO_PRINT (" WebSocket already closed " );
5862 }
59- });
60- } else {
61- INFO_PRINT (" WebSocket already closed" );
62- }
63+ },
64+ ws_);
6365}
6466
65- void WebsocketService::OnResolve (boost::system::error_code ec,
66- tcp::resolver::results_type results) {
67- if (!ec) {
68- net::async_connect (ws_.next_layer (), results,
69- [this ](boost::system::error_code ec, tcp::endpoint) {
70- OnConnect (ec);
71- });
72- } else {
67+ void WebsocketService::OnResolve (beast::error_code ec, tcp::resolver::results_type results) {
68+ if (ec) {
7369 ERROR_PRINT (" Failed to resolve: %s" , ec.message ().c_str ());
7470 return ;
7571 }
72+
73+ std::visit (
74+ [this , results](auto &ws) {
75+ net::async_connect (beast::get_lowest_layer (ws), results,
76+ [this , &ws](boost::system::error_code ec, tcp::endpoint) {
77+ OnConnect (ec);
78+ });
79+ },
80+ ws_);
7681}
7782
78- void WebsocketService::OnConnect (boost::system::error_code ec) {
79- if (!ec) {
80- std::string target = " /rtc?token=" + args_.ws_token ;
81- ws_.async_handshake (args_.ws_host , target, [this ](boost::system::error_code ec) {
82- OnHandshake (ec);
83- });
84- } else {
85- ERROR_PRINT (" Connection Error: %s" , ec.message ().c_str ());
83+ void WebsocketService::OnConnect (beast::error_code ec) {
84+ if (ec) {
85+ ERROR_PRINT (" Failed to connect: %s" , ec.message ().c_str ());
86+ return ;
8687 }
88+
89+ std::visit (
90+ [this ](auto &ws) {
91+ OnHandshake (ws);
92+ },
93+ ws_);
8794}
8895
89- void WebsocketService::OnHandshake (boost::system::error_code ec) {
90- if (!ec) {
91- INFO_PRINT (" WebSocket is connected!" );
92- Read ();
93- } else {
94- ERROR_PRINT (" Handshake Error: %s" , ec.message ().c_str ());
96+ void WebsocketService::OnHandshake (websocket::stream<tcp::socket> &ws) {
97+ std::string target = " /rtc?token=" + args_.ws_token ;
98+ ws.async_handshake (args_.ws_host , target, [this ](boost::system::error_code ec) {
99+ OnHandshake (ec);
100+ });
101+ }
102+
103+ void WebsocketService::OnHandshake (websocket::stream<ssl::stream<tcp::socket>> &ws) {
104+ ws.next_layer ().async_handshake (
105+ ssl::stream_base::client, [this , &ws](boost::system::error_code ec) {
106+ if (ec) {
107+ ERROR_PRINT (" Failed to tls handshake: %s" , ec.message ().c_str ());
108+ }
109+ std::string target = " /rtc?token=" + args_.ws_token ;
110+ ws.async_handshake (args_.ws_host , target, [this ](boost::system::error_code ec) {
111+ OnHandshake (ec);
112+ });
113+ });
114+ }
115+
116+ void WebsocketService::OnHandshake (beast::error_code ec) {
117+ if (ec) {
118+ ERROR_PRINT (" Failed to handshake: %s" , ec.message ().c_str ());
119+ return ;
95120 }
121+
122+ Read ();
96123}
97124
98125void WebsocketService::Read () {
99- ws_.async_read (buffer_, [this ](boost::system::error_code ec, std::size_t bytes_transferred) {
100- if (!ec) {
101- std::string req = beast::buffers_to_string (buffer_.data ());
102- OnMessage (req);
103- buffer_.consume (bytes_transferred);
104- Read ();
105- } else {
106- ERROR_PRINT (" Read Error: %s" , ec.message ().c_str ());
107- Disconnect ();
108- }
109- });
126+ std::visit (
127+ [this ](auto &ws) {
128+ ws.async_read (buffer_,
129+ [this ](boost::system::error_code ec, std::size_t bytes_transferred) {
130+ if (ec) {
131+ ERROR_PRINT (" Failed to read: %s" , ec.message ().c_str ());
132+ Disconnect ();
133+ }
134+ std::string req = beast::buffers_to_string (buffer_.data ());
135+ OnMessage (req);
136+ buffer_.consume (bytes_transferred);
137+ Read ();
138+ });
139+ },
140+ ws_);
110141}
111142
112143void WebsocketService::OnMessage (const std::string &req) {
@@ -163,6 +194,25 @@ void WebsocketService::OnMessage(const std::string &req) {
163194 }
164195}
165196
197+ void WebsocketService::OnRemoteIce (const std::string &message) {
198+ nlohmann::json res = nlohmann::json::parse (message);
199+ std::string target = res[" target" ];
200+ std::string canditateInit = res[" candidateInit" ];
201+
202+ nlohmann::json canditateObj = nlohmann::json::parse (canditateInit);
203+ std::string sdp_mid = canditateObj[" sdpMid" ];
204+ int sdp_mline_index = canditateObj[" sdpMLineIndex" ];
205+ std::string candidate = canditateObj[" candidate" ];
206+ DEBUG_PRINT (" Received remote ICE: %s, %d, %s" , sdp_mid.c_str (), sdp_mline_index,
207+ candidate.c_str ());
208+
209+ if (target == " PUBLISHER" ) {
210+ pub_peer_->SetRemoteIce (sdp_mid, sdp_mline_index, candidate);
211+ } else if (target == " SUBSCRIBER" ) {
212+ sub_peer_->SetRemoteIce (sdp_mid, sdp_mline_index, candidate);
213+ }
214+ }
215+
166216void WebsocketService::Write (const std::string &action, const std::string &message) {
167217 nlohmann::json request_json;
168218 request_json[" action" ] = action;
@@ -182,17 +232,22 @@ void WebsocketService::DoWrite() {
182232 if (write_queue_.empty ())
183233 return ;
184234
185- ws_.async_write (boost::asio::buffer (write_queue_.front ()),
186- [this ](boost::system::error_code ec, std::size_t bytes_transferred) {
187- std::lock_guard<std::mutex> lock (write_mutex_);
188- if (!ec) {
189- write_queue_.pop_front ();
190- if (!write_queue_.empty ()) {
191- DoWrite ();
192- }
193- } else {
194- ERROR_PRINT (" Write Error: %s" , ec.message ().c_str ());
195- Disconnect ();
196- }
197- });
235+ std::visit (
236+ [this ](auto &ws) {
237+ ws.async_write (net::buffer (write_queue_.front ()),
238+ [this ](boost::system::error_code ec, std::size_t bytes_transferred) {
239+ std::lock_guard<std::mutex> lock (write_mutex_);
240+ if (ec) {
241+ ERROR_PRINT (" Failed to write: %s" , ec.message ().c_str ());
242+ Disconnect ();
243+ }
244+
245+ write_queue_.pop_front ();
246+
247+ if (!write_queue_.empty ()) {
248+ DoWrite ();
249+ }
250+ });
251+ },
252+ ws_);
198253}
0 commit comments