Skip to content
This repository was archived by the owner on Oct 25, 2024. It is now read-only.

Commit 9ec5c2f

Browse files
authored
Parse signaling messages in P2PClient. (#720)
P2PClient scans signaling messages to see if a PeerConnectionChannel is needed to be created or not. However, scaning messages with std::string::find may miss some matches when extra spaces are added, although these spaces don't change the meaning of a JSON message, e.g.: {"type":"chat-closed"} is the same as { "type" : "chat-closed" }. Parsing signaling messages in P2PClient identifies message type more accurate.
1 parent 3ed25c3 commit 9ec5c2f

File tree

5 files changed

+37
-97
lines changed

5 files changed

+37
-97
lines changed

talk/owt/BUILD.gn

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,6 @@ static_library("owt_sdk_p2p") {
414414
"sdk/include/cpp/owt/p2p/p2pclient.h",
415415
"sdk/include/cpp/owt/p2p/p2ppublication.h",
416416
"sdk/include/cpp/owt/p2p/p2psignalingchannelinterface.h",
417-
"sdk/include/cpp/owt/p2p/p2psignalingreceiverinterface.h",
418417
"sdk/include/cpp/owt/p2p/p2psignalingsenderinterface.h",
419418
"sdk/p2p/p2pclient.cc",
420419
"sdk/p2p/p2ppeerconnectionchannel.cc",

talk/owt/sdk/include/cpp/owt/p2p/p2psignalingreceiverinterface.h

Lines changed: 0 additions & 25 deletions
This file was deleted.

talk/owt/sdk/p2p/p2pclient.cc

Lines changed: 33 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,13 @@
2323
using namespace rtc;
2424
namespace owt {
2525
namespace p2p {
26-
enum IcsP2PError : int {
27-
kWebrtcIceGatheringPolicyUnsupported = 2601,
28-
};
26+
27+
const std::string kMessageTypeKey = "type";
28+
const std::string kMessageDataKey = "data";
29+
const std::string kSessionDescriptionTypeKey = "type";
30+
const std::string kChatClosed = "chat-closed";
31+
const std::string kChatSignal = "chat-signal";
32+
const std::string kSdpTypeOffer = "offer";
2933

3034
P2PClient::P2PClient(
3135
P2PClientConfiguration& configuration,
@@ -239,7 +243,7 @@ void P2PClient::SetLocalId(const std::string& local_id) {
239243
}
240244
void P2PClient::OnSignalingMessage(const std::string& message,
241245
const std::string& remote_id) {
242-
RTC_LOG(LS_WARNING) << "Receiving signaling message from remote:" << message;
246+
RTC_LOG(LS_VERBOSE) << "Receiving signaling message from remote:" << message;
243247
std::weak_ptr<P2PClient> weak_this = shared_from_this();
244248
signaling_queue_->PostTask([weak_this, remote_id, message]() {
245249
auto that = weak_this.lock();
@@ -251,79 +255,52 @@ void P2PClient::OnSignalingMessage(const std::string& message,
251255
<< "Chat cannot be setup since the remote user is not allowed.";
252256
return;
253257
}
258+
Json::Reader reader;
259+
Json::Value json_message;
260+
if (!reader.parse(message, json_message)) {
261+
RTC_LOG(LS_WARNING) << "Cannot parse incoming message.";
262+
return;
263+
}
264+
std::string message_type;
265+
rtc::GetStringFromJsonObject(json_message, kMessageTypeKey, &message_type);
254266
if (!that->IsPeerConnectionChannelCreated(remote_id)) {
255-
if (message.find("\"type\":\"chat-closed\"") != std::string::npos) {
267+
if (message_type == kChatClosed) {
256268
RTC_LOG(LS_WARNING) << "Non-existed chat cannot be stopped.";
257269
return;
258270
}
259-
} else if (message.find("\"type\":\"offer\"") != std::string::npos) {
260-
RTC_LOG(LS_ERROR) << "Received offer from remote.";
261-
// If we don't have a PC before we receive an offer.
262-
auto pcc = that->GetPeerConnectionChannel(remote_id);
263-
if (pcc->HaveLocalOffer() && that->local_id_.compare(remote_id) > 0) {
264-
// If our ID is larger than remote, make the remote side as the
265-
// publisher (offerer) In case we already have an offer. So we
266-
// remove current PCC and create answer.
267-
std::shared_ptr<LocalStream> stream = pcc->GetLatestLocalStream();
268-
std::function<void()> success_callback =
269-
pcc->GetLatestPublishSuccessCallback();
270-
std::function<void(std::unique_ptr<Exception>)> failure_callback =
271-
pcc->GetLatestPublishFailureCallback();
272-
pcc->Stop(nullptr, nullptr);
273-
{
274-
// If we already created offer,
275-
const std::lock_guard<std::mutex> lock(that->pc_channels_mutex_);
276-
that->pc_channels_.erase(remote_id);
277-
}
278-
auto new_pcc = that->GetPeerConnectionChannel(remote_id);
279-
new_pcc->OnIncomingSignalingMessage(message);
280-
new_pcc->Publish(stream, success_callback, failure_callback);
281-
return;
282-
}
283-
} else if (message.find("\"type\":\"chat-closed\"") != std::string::npos) {
284-
RTC_LOG(LS_ERROR) << "Handle the situation that PCC is created by "
285-
"received Chat-closed.";
286-
int code = 0;
287-
std::string error = "";
288-
Json::Reader reader;
289-
Json::Value json_message;
290-
if (reader.parse(message, json_message)) {
291-
Json::Value stop_info;
292-
rtc::GetValueFromJsonObject(json_message, "data", &stop_info);
293-
rtc::GetIntFromJsonObject(stop_info, "code", &code);
294-
rtc::GetStringFromJsonObject(stop_info, "message", &error);
295-
296-
if (code == kWebrtcIceGatheringPolicyUnsupported) {
297-
auto pcc = that->GetPeerConnectionChannel(remote_id);
271+
} else if (message_type == kChatSignal){
272+
Json::Value signal;
273+
rtc::GetValueFromJsonObject(json_message, kMessageDataKey, &signal);
274+
std::string sdp_type;
275+
rtc::GetStringFromJsonObject(signal, kSessionDescriptionTypeKey, &sdp_type);
276+
if (sdp_type == kSdpTypeOffer) {
277+
// If we don't have a PC before we receive an offer.
278+
auto pcc = that->GetPeerConnectionChannel(remote_id);
279+
if (pcc->HaveLocalOffer() && that->local_id_.compare(remote_id) > 0) {
280+
// If our ID is larger than remote, make the remote side as the
281+
// publisher (offerer) In case we already have an offer. So we
282+
// remove current PCC and create answer.
298283
std::shared_ptr<LocalStream> stream = pcc->GetLatestLocalStream();
299284
std::function<void()> success_callback =
300285
pcc->GetLatestPublishSuccessCallback();
301286
std::function<void(std::unique_ptr<Exception>)> failure_callback =
302287
pcc->GetLatestPublishFailureCallback();
303-
pcc->SetAbandoned();
288+
pcc->Stop(nullptr, nullptr);
304289
{
290+
// If we already created offer,
305291
const std::lock_guard<std::mutex> lock(that->pc_channels_mutex_);
306292
that->pc_channels_.erase(remote_id);
307293
}
308-
309294
auto new_pcc = that->GetPeerConnectionChannel(remote_id);
295+
new_pcc->OnIncomingSignalingMessage(json_message);
310296
new_pcc->Publish(stream, success_callback, failure_callback);
311297
return;
312298
}
313-
auto pcc = that->GetPeerConnectionChannel(remote_id);
314-
// Don't send stop to remote.
315-
pcc->SetAbandoned();
316-
{
317-
const std::lock_guard<std::mutex> lock(that->pc_channels_mutex_);
318-
that->pc_channels_.erase(remote_id);
319-
}
320-
321-
return;
322299
}
323300
}
324301
// Secondly dispatch the message to pcc.
325302
auto pcc = that->GetPeerConnectionChannel(remote_id);
326-
pcc->OnIncomingSignalingMessage(message);
303+
pcc->OnIncomingSignalingMessage(json_message);
327304
});
328305
}
329306
void P2PClient::OnServerDisconnected() {

talk/owt/sdk/p2p/p2ppeerconnectionchannel.cc

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -375,17 +375,9 @@ void P2PPeerConnectionChannel::SendSignalingMessage(
375375
});
376376
}
377377
void P2PPeerConnectionChannel::OnIncomingSignalingMessage(
378-
const std::string& message) {
378+
const Json::Value& json_message) {
379379
if (ended_)
380380
return;
381-
RTC_LOG(LS_INFO) << "OnIncomingMessage: " << message;
382-
RTC_DCHECK(!message.empty());
383-
Json::Reader reader;
384-
Json::Value json_message;
385-
if (!reader.parse(message, json_message)) {
386-
RTC_LOG(LS_WARNING) << "Cannot parse incoming message.";
387-
return;
388-
}
389381
std::string message_type;
390382
rtc::GetStringFromJsonObject(json_message, kMessageTypeKey, &message_type);
391383
if (message_type.empty()) {

talk/owt/sdk/p2p/p2ppeerconnectionchannel.h

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
#include "talk/owt/sdk/include/cpp/owt/base/stream.h"
1414
#include "talk/owt/sdk/include/cpp/owt/base/exception.h"
1515
#include "talk/owt/sdk/include/cpp/owt/p2p/p2psignalingsenderinterface.h"
16-
#include "talk/owt/sdk/include/cpp/owt/p2p/p2psignalingreceiverinterface.h"
1716
#include "webrtc/sdk/media_constraints.h"
1817
#include "webrtc/rtc_base/strings/json.h"
1918
#include "webrtc/rtc_base/synchronization/mutex.h"
@@ -41,8 +40,7 @@ class P2PPeerConnectionChannelObserver {
4140
};
4241
// An instance of P2PPeerConnectionChannel manages a session for a specified
4342
// remote client.
44-
class P2PPeerConnectionChannel : public P2PSignalingReceiverInterface,
45-
public PeerConnectionChannel {
43+
class P2PPeerConnectionChannel : public PeerConnectionChannel {
4644
public:
4745
explicit P2PPeerConnectionChannel(
4846
PeerConnectionChannelConfiguration configuration,
@@ -66,9 +64,8 @@ class P2PPeerConnectionChannel : public P2PSignalingReceiverInterface,
6664
// Remove a P2PPeerConnectionChannel observer. If the observer doesn't exist,
6765
// it will do nothing.
6866
void RemoveObserver(P2PPeerConnectionChannelObserver* observer);
69-
// Implementation of P2PSignalingReceiverInterface. Handle signaling message
70-
// received from remote side.
71-
void OnIncomingSignalingMessage(const std::string& message) override;
67+
// Handle signaling message received from remote side.
68+
void OnIncomingSignalingMessage(const Json::Value& json_message);
7269
// Publish a local stream to remote user.
7370
void Publish(std::shared_ptr<LocalStream> stream,
7471
std::function<void()> on_success,

0 commit comments

Comments
 (0)