Skip to content

Commit b2f4a58

Browse files
tyler-albertcopybara-github
authored andcommitted
fix: Refactor MediaApiAudioDeviceModule to use worker thread and is_playing_ flag
PiperOrigin-RevId: 725881463
1 parent f5d5b27 commit b2f4a58

File tree

6 files changed

+263
-178
lines changed

6 files changed

+263
-178
lines changed

native_with_state/internal/BUILD

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,7 @@ cc_library(
8080
srcs = ["media_api_audio_device_module.cc"],
8181
hdrs = ["media_api_audio_device_module.h"],
8282
deps = [
83-
"@com_google_absl//absl/base:core_headers",
84-
"@com_google_absl//absl/log",
85-
"@com_google_absl//absl/synchronization",
83+
"@com_google_absl//absl/log:check",
8684
"@webrtc",
8785
],
8886
)

native_with_state/internal/conference_media_tracks.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,6 @@
3535

3636
namespace meet {
3737
// Meet uses this magic number to indicate the loudest speaker.
38-
//
39-
// Packets that contain this CSRC should be ignored, as they are not part of the
40-
// audio stream and are only used for signalling the loudest speaker.
4138
inline constexpr int kLoudestSpeakerCsrc = 42;
4239

4340
// Adapter class for webrtc::AudioTrackSinkInterface that converts

native_with_state/internal/media_api_audio_device_module.cc

Lines changed: 45 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -19,89 +19,87 @@
1919
#include <algorithm>
2020
#include <cstddef>
2121
#include <cstdint>
22-
#include <memory>
2322
#include <vector>
2423

25-
#include "absl/log/log.h"
26-
#include "absl/synchronization/mutex.h"
24+
#include "absl/log/check.h"
2725
#include "webrtc/api/audio/audio_device_defines.h"
26+
#include "webrtc/api/task_queue/pending_task_safety_flag.h"
2827
#include "webrtc/api/units/time_delta.h"
2928
#include "webrtc/rtc_base/thread.h"
3029
#include "webrtc/rtc_base/time_utils.h"
3130

3231
namespace meet {
33-
namespace {
34-
35-
// Audio is sampled at 48000 Hz (cycles per second)
36-
constexpr int kAudioSampleRatePerMillisecond = 48;
37-
constexpr int kSamplingIntervalMillis = 10;
38-
// This should be configurable but we will produce a mono output for now.
39-
constexpr int kNumberOfAudioChannels = 1;
40-
constexpr int kBytesPerSample = sizeof(int16_t);
41-
42-
} // namespace
4332

4433
int32_t MediaApiAudioDeviceModule::RegisterAudioCallback(
4534
webrtc::AudioTransport* callback) {
46-
absl::MutexLock lock(&mutex_);
35+
DCHECK(worker_thread_.IsCurrent());
4736
audio_callback_ = callback;
4837
return 0;
4938
}
5039

51-
int32_t MediaApiAudioDeviceModule::InitPlayout() {
52-
if (task_thread_ != nullptr) {
53-
LOG(WARNING) << "Init called on an already initialized "
54-
"MediaApiAudioDeviceModule.";
55-
return -1;
40+
int32_t MediaApiAudioDeviceModule::StartPlayout() {
41+
DCHECK(worker_thread_.IsCurrent());
42+
if (is_playing_) {
43+
return 0;
5644
}
45+
is_playing_ = true;
5746

58-
task_thread_ = rtc::Thread::Create();
59-
task_thread_->SetName("media_api_audio_thread", nullptr);
60-
task_thread_->Start();
61-
task_thread_->PostTask([this]() { ProcessPlayData(); });
47+
worker_thread_.PostTask(
48+
SafeTask(safety_flag_, [this]() { ProcessPlayData(); }));
6249
return 0;
6350
}
6451

6552
int32_t MediaApiAudioDeviceModule::StopPlayout() {
66-
if (task_thread_ == nullptr) {
67-
return 0;
68-
}
53+
DCHECK(worker_thread_.IsCurrent());
54+
is_playing_ = false;
55+
return 0;
56+
}
57+
58+
bool MediaApiAudioDeviceModule::Playing() const {
59+
DCHECK(worker_thread_.IsCurrent());
60+
return is_playing_;
61+
}
6962

70-
task_thread_->Stop();
71-
task_thread_.reset();
63+
int32_t MediaApiAudioDeviceModule::Terminate() {
64+
DCHECK(worker_thread_.IsCurrent());
65+
safety_flag_->SetNotAlive();
7266
return 0;
7367
}
7468

7569
void MediaApiAudioDeviceModule::ProcessPlayData() {
70+
DCHECK(worker_thread_.IsCurrent());
71+
if (!is_playing_) {
72+
return;
73+
}
74+
7675
int64_t process_start_time = rtc::TimeMillis();
7776
const size_t number_of_samples = kAudioSampleRatePerMillisecond *
78-
kSamplingIntervalMillis *
77+
sampling_interval_.ms() *
7978
kNumberOfAudioChannels;
8079
std::vector<int16_t> sample_buffer(number_of_samples);
8180
size_t samples_out = 0;
8281
int64_t elapsed_time_ms = -1;
8382
int64_t ntp_time_ms = -1;
8483

85-
{
86-
absl::MutexLock lock(&mutex_);
87-
if (audio_callback_ != nullptr) {
88-
audio_callback_->NeedMorePlayData(
89-
number_of_samples, kBytesPerSample, kNumberOfAudioChannels,
90-
// Sampling rate in Hz
91-
kAudioSampleRatePerMillisecond * 1000, sample_buffer.data(),
92-
samples_out, &elapsed_time_ms, &ntp_time_ms);
93-
}
84+
if (audio_callback_ != nullptr) {
85+
audio_callback_->NeedMorePlayData(
86+
number_of_samples, kBytesPerSample, kNumberOfAudioChannels,
87+
// Sampling rate in samples per second (i.e. Hz).
88+
kAudioSampleRatePerMillisecond * 1000, sample_buffer.data(),
89+
samples_out, &elapsed_time_ms, &ntp_time_ms);
9490
}
91+
int64_t process_end_time = rtc::TimeMillis();
9592

96-
task_thread_->PostDelayedHighPrecisionTask(
97-
[this]() { ProcessPlayData(); },
98-
// Delay the next sampling for either:
99-
// 1. (sampling interval) - (time to process current sample)
100-
// 2. No delay if current processing took longer than the desired 10ms
101-
std::max(webrtc::TimeDelta::Millis(
102-
(process_start_time + kSamplingIntervalMillis) -
103-
rtc::TimeMillis()),
104-
webrtc::TimeDelta::Zero()));
93+
// Delay the next sampling for either:
94+
// 1. (sampling interval) - (time to process current sample)
95+
// 2. No delay if current processing took longer than the desired 10ms
96+
// TODO: Improve testing around this computation.
97+
webrtc::TimeDelta delay = std::max(
98+
webrtc::TimeDelta::Millis((process_start_time + sampling_interval_.ms()) -
99+
process_end_time),
100+
webrtc::TimeDelta::Zero());
101+
worker_thread_.PostDelayedHighPrecisionTask(
102+
SafeTask(safety_flag_, [this]() { ProcessPlayData(); }), delay);
105103
}
106104

107105
} // namespace meet

native_with_state/internal/media_api_audio_device_module.h

Lines changed: 75 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,27 @@
1717
#ifndef NATIVE_WITH_STATE_INTERNAL_MEDIA_API_AUDIO_DEVICE_MODULE_H_
1818
#define NATIVE_WITH_STATE_INTERNAL_MEDIA_API_AUDIO_DEVICE_MODULE_H_
1919

20+
#include <stdbool.h>
21+
2022
#include <cstdint>
21-
#include <memory>
23+
#include <utility>
2224

23-
#include "absl/base/thread_annotations.h"
24-
#include "absl/synchronization/mutex.h"
2525
#include "webrtc/api/audio/audio_device.h"
2626
#include "webrtc/api/audio/audio_device_defines.h"
27+
#include "webrtc/api/scoped_refptr.h"
28+
#include "webrtc/api/task_queue/pending_task_safety_flag.h"
29+
#include "webrtc/api/units/time_delta.h"
2730
#include "webrtc/modules/audio_device/include/audio_device_default.h"
2831
#include "webrtc/rtc_base/thread.h"
2932

3033
namespace meet {
3134

35+
// Audio is sampled at 48000Hz.
36+
constexpr int kAudioSampleRatePerMillisecond = 48;
37+
// Produce mono audio (i.e. 1 channel).
38+
constexpr int kNumberOfAudioChannels = 1;
39+
constexpr int kBytesPerSample = sizeof(int16_t);
40+
3241
// Very simple implementation of an AudioDeviceModule.
3342
//
3443
// WebRTC has platform dependent implementations. However they are not fully
@@ -49,26 +58,80 @@ class MediaApiAudioDeviceModule
4958
: public webrtc::webrtc_impl::AudioDeviceModuleDefault<
5059
webrtc::AudioDeviceModule> {
5160
public:
52-
MediaApiAudioDeviceModule() = default;
61+
// Default constructor for production use.
62+
//
63+
// In production, audio should be sampled at 48000 Hz every 10ms.
64+
explicit MediaApiAudioDeviceModule(rtc::Thread& worker_thread)
65+
: MediaApiAudioDeviceModule(worker_thread,
66+
webrtc::TimeDelta::Millis(10)) {}
5367

54-
~MediaApiAudioDeviceModule() override { StopPlayout(); };
68+
// Constructor for testing with configurable sampling interval; the default
69+
// sampling interval of 10ms is too small to write non-flaky tests with.
70+
MediaApiAudioDeviceModule(rtc::Thread& worker_thread,
71+
webrtc::TimeDelta sampling_interval)
72+
: worker_thread_(worker_thread),
73+
sampling_interval_(std::move(sampling_interval)) {
74+
safety_flag_ = webrtc::PendingTaskSafetyFlag::CreateAttachedToTaskQueue(
75+
/*alive=*/true, &worker_thread_);
76+
};
5577

56-
int32_t InitPlayout() override;
57-
int32_t StopPlayout() override final;
5878
int32_t RegisterAudioCallback(webrtc::AudioTransport* callback) override;
59-
bool Playing() const override { return task_thread_ != nullptr; };
79+
int32_t StartPlayout() override;
80+
int32_t StopPlayout() override;
81+
int32_t Terminate() override;
82+
bool Playing() const override;
6083

6184
private:
6285
// Periodically calls the registered audio callback, registered by WebRTC
6386
// internals, to provide audio data. It is to be invoked every 10ms with a
6487
// sampling rate of 48000 Hz. If this is not done, no audio will be provided
65-
// to the registered audio sinks with the RTPReceiver of the RTPTransceiver
88+
// to the audio sinks registered with the RTPReceiver of the RTPTransceiver
6689
// that remote audio is being received on.
6790
void ProcessPlayData();
6891

69-
mutable absl::Mutex mutex_;
70-
webrtc::AudioTransport* audio_callback_ ABSL_GUARDED_BY(mutex_) = nullptr;
71-
std::unique_ptr<rtc::Thread> task_thread_;
92+
// Note that this MUST be the same worker thread used when creating the peer
93+
// connection.
94+
//
95+
// Not only does this remove the need for synchronization in this class (as
96+
// all methods are called on the worker thread by WebRTC), it also prevents
97+
// a deadlock when closing the peer connection:
98+
//
99+
// When audio data is passed to `ConferenceAudioTrack::OnData()`, it is
100+
// called on whatever thread `audio_callback_` is called on. When attempting
101+
// to read the audio csrcs and ssrcs from
102+
// `RtpReceiverInterface::GetSources()`, a blocking call will be made to the
103+
// worker thread (via the rtp receiver proxy layer) if the current thread is
104+
// NOT the worker thread.
105+
//
106+
// `ConferenceAudioTrack::OnData()` is called while holding a mutex in
107+
// WebRTC's `AudioMixerImpl::Mix()` method (also running on whatever thread
108+
// `audio_callback_` is called on).
109+
//
110+
// At the same time, when closing the peer connection,
111+
// `AudioMixerImpl::RemoveSource()` is called on the worker thread and
112+
// attempts to acquire the mutex held by `AudioMixerImpl::Mix()`, blocking
113+
// the worker thread.
114+
//
115+
// Therefore, it is possible for the worker thread to be blocked while
116+
// waiting for the `AudioMixerImpl` mutex, while
117+
// `ConferenceAudioTrack::OnData()` is blocked waiting for the worker thread
118+
// to read the audio csrcs and ssrcs.
119+
//
120+
// By ensuring that this class is always called on the worker thread, this
121+
// deadlock is avoided, as:
122+
// 1. The worker thread is a task queue, and task queue operatons are
123+
// executed sequentially.
124+
// 2. `ConferenceAudioTrack::OnData()` is called on the worker thread and
125+
// therefore does not need to switch to the worker thread to read the
126+
// audio csrcs and ssrcs.
127+
rtc::Thread& worker_thread_;
128+
// Used to ensure that tasks are not posted after `Terminate()` is called,
129+
// since this class does not own the worker thread.
130+
rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> safety_flag_;
131+
webrtc::TimeDelta sampling_interval_;
132+
133+
webrtc::AudioTransport* audio_callback_ = nullptr;
134+
bool is_playing_ = false;
72135
};
73136

74137
} // namespace meet

0 commit comments

Comments
 (0)