Skip to content
This repository was archived by the owner on Oct 25, 2024. It is now read-only.

Commit eb6cc8c

Browse files
committed
Create a new worker thread for RTP module.
The worker thread needs its own run loop to periodically run some tasks in the background. Main WebAssembly thread (media worker) already has its loop for JavaScript events.
1 parent 24d2cd8 commit eb6cc8c

File tree

6 files changed

+85
-69
lines changed

6 files changed

+85
-69
lines changed

talk/owt/BUILD.gn

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,11 +337,12 @@ if (is_wasm) {
337337
name = "owt"
338338
deps = [
339339
"//third_party/boringssl",
340+
"//third_party/webrtc/api/rtc_event_log:rtc_event_log_factory",
340341
"//third_party/webrtc/call",
342+
"//third_party/webrtc/logging:fake_rtc_event_log",
341343
"//third_party/webrtc/modules/rtp_rtcp:rtp_rtcp",
342344
"//third_party/webrtc/rtc_base:rtc_json",
343345
"//third_party/webrtc/rtc_base:rtc_task_queue_stdlib",
344-
"//third_party/webrtc/api/rtc_event_log:rtc_event_log_factory",
345346
]
346347
sources = [
347348
"sdk/wasm/binding.h",

talk/owt/sdk/wasm/gn/wasm.gni

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -42,19 +42,13 @@ template("wasm_lib") {
4242
_target_ldflags = [
4343
"-s",
4444
"WASM=1",
45-
# "-s",
46-
# "DISABLE_EXCEPTION_CATCHING=1",
4745
"-s",
4846
"INITIAL_MEMORY=33554432",
4947
"-s",
5048
"ALLOW_TABLE_GROWTH=1",
51-
# "-s",
52-
# "WASM_ASYNC_COMPILATION=0",
5349
"-s",
5450
"USE_PTHREADS=1",
5551
"-s",
56-
"PROXY_TO_PTHREAD=1",
57-
"-s",
5852
"LLD_REPORT_UNDEFINED",
5953
"-s",
6054
"EXPORTED_RUNTIME_METHODS=" + _exports,
@@ -74,15 +68,6 @@ template("wasm_lib") {
7468

7569
"--bind",
7670

77-
#"-fno-stack-protector",
78-
79-
# This is to prevent that two different wasm modules end up generating
80-
# JS that overrides the same global variable (var Module = ...)
81-
# "-s",
82-
# "EXPORT_NAME=${target_name}",
83-
84-
# "-lworkerfs.js", # For FS.filesystems.WORKERFS
85-
8671
"-s",
8772
"PTHREAD_POOL_SIZE=8",
8873
"-s",
@@ -92,16 +77,8 @@ template("wasm_lib") {
9277
]
9378
if (is_debug) {
9479
_target_ldflags += [
95-
# "-s",
96-
# "ASSERTIONS=2",
97-
# "-s",
98-
# "SAFE_HEAP=1",
99-
# "-s",
100-
# "STACK_OVERFLOW_CHECK=1",
10180
"-O0",
10281
"-gsource-map",
103-
"--source-map-base",
104-
"http://jianjunz-nuc-ubuntu.sh.intel.com:9900/out/wasm/",
10582
]
10683
} else {
10784
_target_ldflags += [
@@ -127,7 +104,6 @@ template("wasm_lib") {
127104
}
128105

129106
_vars_to_forward = [
130-
#"cflags",
131107
"defines",
132108
"deps",
133109
"includes",

talk/owt/sdk/wasm/media_session.cc

Lines changed: 42 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,19 @@
33
// SPDX-License-Identifier: Apache-2.0
44

55
#include "talk/owt/sdk/wasm/media_session.h"
6+
#include <emscripten/threading.h>
67
#include "third_party/webrtc/api/rtc_event_log/rtc_event_log_factory.h"
78
#include "third_party/webrtc/api/transport/field_trial_based_config.h"
9+
#include "third_party/webrtc/logging/rtc_event_log/fake_rtc_event_log_factory.h"
10+
#include "third_party/webrtc/rtc_base/event.h"
811
#include "third_party/webrtc/rtc_base/task_queue_stdlib.h"
912

1013
namespace owt {
1114
namespace wasm {
1215
MediaSession::MediaSession()
13-
: task_queue_factory_(webrtc::CreateTaskQueueStdlibFactory()),
14-
event_log_factory_(std::make_unique<webrtc::RtcEventLogFactory>(
15-
task_queue_factory_.get())),
16+
: worker_thread_(rtc::Thread::Create()),
17+
task_queue_factory_(webrtc::CreateTaskQueueStdlibFactory()),
18+
event_log_factory_(std::make_unique<webrtc::FakeRtcEventLogFactory>()),
1619
event_log_(event_log_factory_->CreateRtcEventLog(
1720
webrtc::RtcEventLog::EncodingType::NewFormat)),
1821
web_transport_session_(std::make_unique<WebTransportSession>()),
@@ -22,32 +25,34 @@ MediaSession::MediaSession()
2225
receiver_process_thread_(nullptr),
2326
video_receiver_(nullptr),
2427
rtcp_callback_(emscripten::val::null()) {
25-
rtc::ThreadManager::Instance()->WrapCurrentThread();
26-
receiver_process_thread_ =
27-
webrtc::ProcessThread::Create("ReceiverProcessThread");
28-
webrtc::Call::Config config(event_log_.get());
29-
config.task_queue_factory = task_queue_factory_.get();
30-
webrtc::FieldTrialBasedConfig trials;
31-
config.trials = &trials;
32-
call_ = std::unique_ptr<webrtc::Call>(webrtc::Call::Create(config));
28+
RTC_CHECK(worker_thread_);
29+
worker_thread_->Start();
30+
worker_thread_->Invoke<void>(RTC_FROM_HERE, [&] {
31+
receiver_process_thread_ =
32+
webrtc::ProcessThread::Create("ReceiverProcessThread");
33+
webrtc::Call::Config config(event_log_.get());
34+
config.task_queue_factory = task_queue_factory_.get();
35+
webrtc::FieldTrialBasedConfig trials;
36+
config.trials = &trials;
37+
call_ = std::unique_ptr<webrtc::Call>(webrtc::Call::Create(config));
38+
});
3339
}
3440

3541
std::shared_ptr<RtpVideoReceiver> MediaSession::CreateRtpVideoReceiver(
3642
uint32_t remote_ssrc) {
37-
webrtc::VideoReceiveStream::Config config(this);
38-
config.rtp.remote_ssrc = remote_ssrc;
39-
config.rtp.local_ssrc = 1;
40-
// Same as `kNackRtpHistoryMs` in third_party/webrtc/media/engine/webrtc_voice_engine.cc.
41-
config.rtp.nack.rtp_history_ms = 5000;
42-
// TODO: Use call_->worker_thread() when libwebrtc is rolled to a newer version.
43-
webrtc::TaskQueueBase* current = webrtc::TaskQueueBase::Current();
44-
if (!current)
45-
current = rtc::ThreadManager::Instance()->CurrentThread();
46-
RTC_DCHECK(current);
47-
video_receiver_ = std::make_shared<RtpVideoReceiver>(
48-
current, this, &config, receive_statistics_.get(),
49-
receiver_process_thread_.get());
50-
return video_receiver_;
43+
return worker_thread_->Invoke<std::shared_ptr<RtpVideoReceiver>>(
44+
RTC_FROM_HERE, [&] {
45+
webrtc::VideoReceiveStream::Config config(this);
46+
config.rtp.remote_ssrc = remote_ssrc;
47+
config.rtp.local_ssrc = 1;
48+
// Same as `kNackRtpHistoryMs` in
49+
// third_party/webrtc/media/engine/webrtc_voice_engine.cc.
50+
config.rtp.nack.rtp_history_ms = 5000;
51+
video_receiver_ = std::make_shared<RtpVideoReceiver>(
52+
worker_thread_.get(), this, &config, receive_statistics_.get(),
53+
receiver_process_thread_.get());
54+
return video_receiver_;
55+
});
5156
}
5257

5358
void MediaSession::SetRtcpCallback(emscripten::val callback) {
@@ -64,10 +69,20 @@ bool MediaSession::SendRtcp(const uint8_t* packet, size_t length) {
6469
if (!rtcp_callback_) {
6570
return false;
6671
}
67-
emscripten::val buffer(emscripten::typed_memory_view(length, packet));
68-
rtcp_callback_(buffer);
72+
emscripten_sync_run_in_main_runtime_thread(
73+
EM_FUNC_SIG_VIII, MediaSession::RunRtcpCallback, this, packet, length);
6974
return true;
7075
}
7176

77+
void MediaSession::RunRtcpCallback(MediaSession* session,
78+
const uint8_t* packet,
79+
size_t length) {
80+
if (!session->rtcp_callback_) {
81+
return;
82+
}
83+
emscripten::val buffer(emscripten::typed_memory_view(length, packet));
84+
session->rtcp_callback_(buffer);
85+
}
86+
7287
} // namespace wasm
7388
} // namespace owt

talk/owt/sdk/wasm/media_session.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include "talk/owt/sdk/wasm/rtp_video_receiver.h"
1111
#include "talk/owt/sdk/wasm/web_transport_session.h"
1212
#include "third_party/webrtc/api/rtc_event_log/rtc_event_log_factory_interface.h"
13+
#include "third_party/webrtc/rtc_base/thread.h"
1314

1415
namespace owt {
1516
namespace wasm {
@@ -29,6 +30,12 @@ class MediaSession : public webrtc::Transport {
2930
bool SendRtcp(const uint8_t* packet, size_t length) override;
3031

3132
private:
33+
// Expected to run on main WebAssembly thread.
34+
static void RunRtcpCallback(MediaSession* session,
35+
const uint8_t* packet,
36+
size_t length);
37+
38+
std::unique_ptr<rtc::Thread> worker_thread_;
3239
std::unique_ptr<webrtc::TaskQueueFactory> task_queue_factory_;
3340
std::unique_ptr<webrtc::RtcEventLogFactoryInterface> event_log_factory_;
3441
std::unique_ptr<webrtc::RtcEventLog> event_log_;

talk/owt/sdk/wasm/rtp_video_receiver.cc

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,25 @@
33
// SPDX-License-Identifier: Apache-2.0
44

55
#include "talk/owt/sdk/wasm/rtp_video_receiver.h"
6+
#include <emscripten/threading.h>
67
#include <cstdint>
8+
#include "third_party/webrtc/rtc_base/location.h"
79
#include "third_party/webrtc/rtc_base/logging.h"
810

911
namespace owt {
1012
namespace wasm {
1113

1214
RtpVideoReceiver::RtpVideoReceiver(
13-
webrtc::TaskQueueBase* current_queue,
15+
rtc::Thread* worker_thread,
1416
webrtc::Transport* transport,
1517
const webrtc::VideoReceiveStream::Config* config,
1618
webrtc::ReceiveStatistics* rtp_receive_statistics,
1719
webrtc::ProcessThread* process_thread)
18-
: receiver_(nullptr), complete_frame_callback_(emscripten::val::null()) {
20+
: worker_thread_(worker_thread),
21+
receiver_(nullptr),
22+
complete_frame_callback_(emscripten::val::null()) {
1923
receiver_ = std::make_unique<webrtc::RtpVideoStreamReceiver2>(
20-
current_queue, webrtc::Clock::GetRealTimeClock(), transport, nullptr,
24+
worker_thread, webrtc::Clock::GetRealTimeClock(), transport, nullptr,
2125
nullptr, config, rtp_receive_statistics, nullptr, nullptr, process_thread,
2226
this, nullptr, this, nullptr, nullptr);
2327
webrtc::VideoCodec codec;
@@ -29,24 +33,33 @@ RtpVideoReceiver::RtpVideoReceiver(
2933
receiver_->StartReceive();
3034
}
3135

32-
bool RtpVideoReceiver::OnRtpPacket(uintptr_t packet_ptr, size_t packet_size) {
33-
const uint8_t* packet = reinterpret_cast<uint8_t*>(packet_ptr);
34-
// TODO: Create `rtp_packet_received` from `packet`.
35-
webrtc::RtpPacketReceived rtp_packet_received;
36-
rtp_packet_received.Parse(packet, packet_size);
37-
receiver_->OnRtpPacket(rtp_packet_received);
38-
return false;
36+
void RtpVideoReceiver::OnRtpPacket(uintptr_t packet_ptr, size_t packet_size) {
37+
worker_thread_->Invoke<void>(RTC_FROM_HERE, [&] {
38+
const uint8_t* packet = reinterpret_cast<uint8_t*>(packet_ptr);
39+
// TODO: Create `rtp_packet_received` from `packet`.
40+
webrtc::RtpPacketReceived rtp_packet_received;
41+
rtp_packet_received.Parse(packet, packet_size);
42+
receiver_->OnRtpPacket(rtp_packet_received);
43+
});
44+
}
45+
46+
void RtpVideoReceiver::RunCompleteFrameCallback(
47+
RtpVideoReceiver* receiver,
48+
uint8_t* buffer,
49+
size_t size) {
50+
emscripten::val value(emscripten::typed_memory_view(size, buffer));
51+
receiver->complete_frame_callback_(value);
3952
}
4053

4154
void RtpVideoReceiver::OnCompleteFrame(
4255
std::unique_ptr<webrtc::video_coding::EncodedFrame> frame) {
43-
if(complete_frame_callback_.isNull()){
56+
if (complete_frame_callback_.isNull()) {
4457
RTC_LOG(LS_WARNING) << "No callback registered for complete frames.";
4558
return;
4659
}
47-
emscripten::val buffer(emscripten::typed_memory_view(
48-
frame->EncodedImage().size(), frame->EncodedImage().data()));
49-
complete_frame_callback_(buffer);
60+
emscripten_sync_run_in_main_runtime_thread(
61+
EM_FUNC_SIG_VIII, RtpVideoReceiver::RunCompleteFrameCallback, this,
62+
frame->EncodedImage().data(), frame->EncodedImage().size());
5063
}
5164

5265
void RtpVideoReceiver::SetCompleteFrameCallback(emscripten::val callback) {
@@ -58,7 +71,6 @@ void RtpVideoReceiver::SendNack(const std::vector<uint16_t>& sequence_numbers,
5871
// Only buffering_allowed == true is supported. Same as
5972
// https://source.chromium.org/chromium/chromium/src/+/main:third_party/webrtc/video/video_receive_stream2.h;l=157;bpv=1;bpt=1
6073
RTC_DCHECK(buffering_allowed);
61-
RTC_LOG(LS_INFO)<<"Request packet rtx";
6274
receiver_->RequestPacketRetransmit(sequence_numbers);
6375
}
6476

talk/owt/sdk/wasm/rtp_video_receiver.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@ class RtpVideoReceiver
1414
: public webrtc::RtpVideoStreamReceiver2::OnCompleteFrameCallback,
1515
public webrtc::NackSender {
1616
public:
17-
explicit RtpVideoReceiver(webrtc::TaskQueueBase* current_queue,
17+
explicit RtpVideoReceiver(rtc::Thread* worker_thread,
1818
webrtc::Transport* transport,
1919
const webrtc::VideoReceiveStream::Config* config,
2020
webrtc::ReceiveStatistics* rtp_receive_statistics,
2121
webrtc::ProcessThread* process_thread);
2222
virtual ~RtpVideoReceiver() {}
23-
virtual bool OnRtpPacket(uintptr_t packet_ptr, size_t packet_size);
23+
virtual void OnRtpPacket(uintptr_t packet_ptr, size_t packet_size);
2424

2525
// Overrides webrtc::RtpVideoStreamReceiver2::OnCompleteFrameCallback.
2626
void OnCompleteFrame(
@@ -33,6 +33,11 @@ class RtpVideoReceiver
3333
bool buffering_allowed) override;
3434

3535
private:
36+
// Run on main WebAssembly thread.
37+
static void RunCompleteFrameCallback(RtpVideoReceiver* receiver,
38+
uint8_t* buffer,
39+
size_t size);
40+
rtc::Thread* worker_thread_;
3641
std::unique_ptr<webrtc::RtpVideoStreamReceiver2> receiver_;
3742
emscripten::val complete_frame_callback_;
3843
};

0 commit comments

Comments
 (0)