Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit d01407e

Browse files
authored
Use fixed number of task queues in WebRTC stack (#531)
1 parent 546dfa1 commit d01407e

File tree

6 files changed

+256
-3
lines changed

6 files changed

+256
-3
lines changed

source/agent/webrtc/rtcFrame/binding.gyp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
'<(source_rel_dir)/core/rtc_adapter/VideoReceiveAdapter.cc',
6969
'<(source_rel_dir)/core/rtc_adapter/VideoSendAdapter.cc',
7070
'<(source_rel_dir)/core/rtc_adapter/AudioSendAdapter.cc',
71+
'<(source_rel_dir)/core/rtc_adapter/thread/StaticTaskQueueFactory.cc',
7172
'<(source_rel_dir)/core/owt_base/SsrcGenerator.cc',
7273
'<(source_rel_dir)/core/owt_base/AudioUtilitiesNew.cpp',
7374
'<(source_rel_dir)/core/owt_base/TaskRunnerPool.cpp',

source/core/rtc_adapter/RtcAdapter.cc

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,41 @@
77
#include <RtcAdapter.h>
88
#include <VideoReceiveAdapter.h>
99
#include <VideoSendAdapter.h>
10+
#include <thread/ProcessThreadProxy.h>
11+
#include <thread/StaticTaskQueueFactory.h>
1012

1113
#include <memory>
14+
#include <mutex>
15+
16+
#include <system_wrappers/include/clock.h>
1217

1318
namespace rtc_adapter {
1419

20+
class RTCProcessThread {
21+
public:
22+
RTCProcessThread(const char* task_name)
23+
: m_processThread(webrtc::ProcessThread::Create(task_name))
24+
{
25+
m_processThread->Start();
26+
}
27+
~RTCProcessThread()
28+
{
29+
m_processThread->Stop();
30+
}
31+
32+
webrtc::ProcessThread* unwrap()
33+
{
34+
return m_processThread.get();
35+
}
36+
private:
37+
std::unique_ptr<webrtc::ProcessThread> m_processThread;
38+
};
39+
40+
static std::unique_ptr<RTCProcessThread> g_moduleThread
41+
= std::make_unique<RTCProcessThread>("ModuleProcessThread");
42+
static std::unique_ptr<RTCProcessThread> g_pacerThread
43+
= std::make_unique<RTCProcessThread>("PacerThread");
44+
1545
class RtcAdapterImpl : public RtcAdapter,
1646
public CallOwner {
1747
public:
@@ -47,7 +77,7 @@ class RtcAdapterImpl : public RtcAdapter,
4777
};
4878

4979
RtcAdapterImpl::RtcAdapterImpl()
50-
: m_taskQueueFactory(webrtc::CreateDefaultTaskQueueFactory())
80+
: m_taskQueueFactory(createStaticTaskQueueFactory())
5181
, m_taskQueue(std::make_shared<rtc::TaskQueue>(m_taskQueueFactory->CreateTaskQueue(
5282
"CallTaskQueue",
5383
webrtc::TaskQueueFactory::Priority::NORMAL)))
@@ -66,7 +96,15 @@ void RtcAdapterImpl::initCall()
6696
if (!m_call) {
6797
webrtc::Call::Config call_config(m_eventLog.get());
6898
call_config.task_queue_factory = m_taskQueueFactory.get();
69-
m_call.reset(webrtc::Call::Create(call_config));
99+
100+
std::unique_ptr<webrtc::ProcessThread> moduleThreadProxy =
101+
std::make_unique<ProcessThreadProxy>(g_moduleThread->unwrap());
102+
std::unique_ptr<webrtc::ProcessThread> pacerThreadProxy =
103+
std::make_unique<ProcessThreadProxy>(g_pacerThread->unwrap());
104+
m_call.reset(webrtc::Call::Create(
105+
call_config, webrtc::Clock::GetRealTimeClock(),
106+
std::move(moduleThreadProxy),
107+
std::move(pacerThreadProxy)));
70108
}
71109
});
72110
}

source/core/rtc_adapter/VideoReceiveAdapter.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
#include <AdapterInternalDefinitions.h>
99
#include <RtcAdapter.h>
1010

11-
#include <api/task_queue/default_task_queue_factory.h>
1211
#include <api/video_codecs/video_codec.h>
1312
#include <api/video_codecs/video_decoder.h>
1413
#include <api/video_codecs/video_decoder_factory.h>
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright (C) <2020> Intel Corporation
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
#ifndef RTC_ADAPTER_THREAD_PROCESS_THREAD_PROXY_
6+
#define RTC_ADAPTER_THREAD_PROCESS_THREAD_PROXY_
7+
8+
#include <modules/utility/include/process_thread.h>
9+
#include <rtc_base/checks.h>
10+
#include <unordered_set>
11+
12+
namespace rtc_adapter {
13+
14+
// ProcessThreadProxy holds a pointer to actual ProcessThread
15+
class ProcessThreadProxy : public webrtc::ProcessThread {
16+
public:
17+
ProcessThreadProxy(webrtc::ProcessThread* processThread)
18+
: m_processThread(processThread)
19+
{
20+
RTC_DCHECK(m_processThread);
21+
}
22+
23+
// Implements ProcessThread
24+
virtual void Start() override {}
25+
26+
// Implements ProcessThread
27+
// Stop() has no effect on proxy
28+
virtual void Stop() override {}
29+
30+
// Implements ProcessThread
31+
// Call actual ProcessThread's WakeUp
32+
virtual void WakeUp(webrtc::Module* module) override
33+
{
34+
m_processThread->WakeUp(module);
35+
}
36+
37+
// Implements ProcessThread
38+
// Call actual ProcessThread's PostTask
39+
virtual void PostTask(std::unique_ptr<webrtc::QueuedTask> task) override
40+
{
41+
m_processThread->PostTask(std::move(task));
42+
}
43+
44+
// Implements ProcessThread
45+
// Call actual ProcessThread's RegisterModule
46+
virtual void RegisterModule(webrtc::Module* module, const rtc::Location& from) override
47+
{
48+
m_processThread->RegisterModule(module, from);
49+
}
50+
51+
// Implements ProcessThread
52+
// Call actual ProcessThread's DeRegisterModule
53+
virtual void DeRegisterModule(webrtc::Module* module) override
54+
{
55+
m_processThread->DeRegisterModule(module);
56+
}
57+
58+
private:
59+
webrtc::ProcessThread* m_processThread;
60+
// std::unordered_set<webrtc::Module*> m_modules;
61+
};
62+
63+
} // namespace rtc_adapter
64+
65+
#endif
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
// Copyright (C) <2020> Intel Corporation
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
#include "StaticTaskQueueFactory.h"
6+
#include <rtc_base/logging.h>
7+
#include <rtc_base/checks.h>
8+
#include <rtc_base/event.h>
9+
#include <rtc_base/task_utils/to_queued_task.h>
10+
#include <api/task_queue/task_queue_base.h>
11+
#include <api/task_queue/default_task_queue_factory.h>
12+
13+
namespace rtc_adapter {
14+
15+
// TaskQueueDummy never execute tasks
16+
class TaskQueueDummy final : public webrtc::TaskQueueBase {
17+
public:
18+
TaskQueueDummy() {}
19+
~TaskQueueDummy() override = default;
20+
21+
// Implements webrtc::TaskQueueBase
22+
void Delete() override {}
23+
void PostTask(std::unique_ptr<webrtc::QueuedTask> task) override {}
24+
void PostDelayedTask(std::unique_ptr<webrtc::QueuedTask> task,
25+
uint32_t milliseconds) override {}
26+
};
27+
28+
// QueuedTaskProxy only execute when the owner shared_ptr exists
29+
class QueuedTaskProxy : public webrtc::QueuedTask {
30+
public:
31+
QueuedTaskProxy(std::unique_ptr<webrtc::QueuedTask> task, std::shared_ptr<int> owner)
32+
: m_task(std::move(task)), m_owner(owner) {}
33+
34+
// Implements webrtc::QueuedTask
35+
bool Run() override
36+
{
37+
if (auto owner = m_owner.lock()) {
38+
// Only run when owner exists
39+
return m_task->Run();
40+
}
41+
return true;
42+
}
43+
private:
44+
std::unique_ptr<webrtc::QueuedTask> m_task;
45+
std::weak_ptr<int> m_owner;
46+
};
47+
48+
// TaskQueueProxy holds a TaskQueueBase* and proxy its method without Delete
49+
class TaskQueueProxy : public webrtc::TaskQueueBase {
50+
public:
51+
TaskQueueProxy(webrtc::TaskQueueBase* taskQueue)
52+
: m_taskQueue(taskQueue), m_sp(std::make_shared<int>(1))
53+
{
54+
RTC_CHECK(m_taskQueue);
55+
}
56+
~TaskQueueProxy() override = default;
57+
58+
// Implements webrtc::TaskQueueBase
59+
void Delete() override
60+
{
61+
// Clear the shared_ptr so related tasks won't be run
62+
rtc::Event done;
63+
m_taskQueue->PostTask(webrtc::ToQueuedTask([this, &done] {
64+
m_sp.reset();
65+
done.Set();
66+
}));
67+
done.Wait(rtc::Event::kForever);
68+
}
69+
// Implements webrtc::TaskQueueBase
70+
void PostTask(std::unique_ptr<webrtc::QueuedTask> task) override
71+
{
72+
m_taskQueue->PostTask(
73+
std::make_unique<QueuedTaskProxy>(std::move(task), m_sp));
74+
}
75+
// Implements webrtc::TaskQueueBase
76+
void PostDelayedTask(std::unique_ptr<webrtc::QueuedTask> task,
77+
uint32_t milliseconds) override
78+
{
79+
m_taskQueue->PostDelayedTask(
80+
std::make_unique<QueuedTaskProxy>(std::move(task), m_sp), milliseconds);
81+
}
82+
private:
83+
webrtc::TaskQueueBase* m_taskQueue;
84+
// Use shared_ptr to track its tasks
85+
std::shared_ptr<int> m_sp;
86+
};
87+
88+
// Provide static TaskQueues
89+
class StaticTaskQueueFactory final : public webrtc::TaskQueueFactory {
90+
public:
91+
// Implements webrtc::TaskQueueFactory
92+
std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter> CreateTaskQueue(
93+
absl::string_view name,
94+
webrtc::TaskQueueFactory::Priority priority) const override
95+
{
96+
// Use a pool if following threads take too heavy load
97+
static std::unique_ptr<webrtc::TaskQueueFactory> defaultTaskQueueFactory =
98+
webrtc::CreateDefaultTaskQueueFactory();
99+
static std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter> callTaskQueue =
100+
defaultTaskQueueFactory->CreateTaskQueue(
101+
"CallTaskQueue", webrtc::TaskQueueFactory::Priority::NORMAL);
102+
static std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter> decodingQueue =
103+
defaultTaskQueueFactory->CreateTaskQueue(
104+
"DecodingQueue", webrtc::TaskQueueFactory::Priority::HIGH);
105+
static std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter> rtpSendCtrlQueue =
106+
defaultTaskQueueFactory->CreateTaskQueue(
107+
"rtp_send_controller", webrtc::TaskQueueFactory::Priority::NORMAL);
108+
109+
if (name == absl::string_view("CallTaskQueue")) {
110+
return std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>(
111+
new TaskQueueProxy(callTaskQueue.get()));
112+
} else if (name == absl::string_view("DecodingQueue")) {
113+
return std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>(
114+
new TaskQueueProxy(decodingQueue.get()));
115+
} else if (name == absl::string_view("rtp_send_controller")) {
116+
return std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>(
117+
new TaskQueueProxy(rtpSendCtrlQueue.get()));
118+
} else {
119+
// Return dummy task queue for other names like "IncomingVideoStream"
120+
RTC_DLOG(LS_INFO) << "Dummy TaskQueue for " << name;
121+
return std::unique_ptr<webrtc::TaskQueueBase, webrtc::TaskQueueDeleter>(
122+
new TaskQueueDummy());
123+
}
124+
}
125+
};
126+
127+
std::unique_ptr<webrtc::TaskQueueFactory> createStaticTaskQueueFactory()
128+
{
129+
return std::unique_ptr<webrtc::TaskQueueFactory>(new StaticTaskQueueFactory());
130+
}
131+
132+
} // namespace rtc_adapter
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
// Copyright (C) <2020> Intel Corporation
2+
//
3+
// SPDX-License-Identifier: Apache-2.0
4+
5+
#ifndef RTC_ADAPTER_THREAD_STATIC_TASK_QUEUE_FACTORY_
6+
#define RTC_ADAPTER_THREAD_STATIC_TASK_QUEUE_FACTORY_
7+
8+
#include <memory>
9+
10+
#include "api/task_queue/task_queue_factory.h"
11+
12+
namespace rtc_adapter {
13+
14+
std::unique_ptr<webrtc::TaskQueueFactory> createStaticTaskQueueFactory();
15+
16+
} // namespace webrtc
17+
18+
#endif // RTC_ADAPTER_THREAD_STATIC_TASK_QUEUE_FACTORY_

0 commit comments

Comments
 (0)