Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions native_with_state/internal/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,7 @@ cc_library(
srcs = ["media_api_audio_device_module.cc"],
hdrs = ["media_api_audio_device_module.h"],
deps = [
"@com_google_absl//absl/base:core_headers",
"@com_google_absl//absl/log",
"@com_google_absl//absl/synchronization",
"@com_google_absl//absl/log:check",
"@webrtc",
],
)
Expand Down
3 changes: 0 additions & 3 deletions native_with_state/internal/conference_media_tracks.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@

namespace meet {
// Meet uses this magic number to indicate the loudest speaker.
//
// Packets that contain this CSRC should be ignored, as they are not part of the
// audio stream and are only used for signalling the loudest speaker.
inline constexpr int kLoudestSpeakerCsrc = 42;

// Adapter class for webrtc::AudioTrackSinkInterface that converts
Expand Down
92 changes: 45 additions & 47 deletions native_with_state/internal/media_api_audio_device_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,89 +19,87 @@
#include <algorithm>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <vector>

#include "absl/log/log.h"
#include "absl/synchronization/mutex.h"
#include "absl/log/check.h"
#include "webrtc/api/audio/audio_device_defines.h"
#include "webrtc/api/task_queue/pending_task_safety_flag.h"
#include "webrtc/api/units/time_delta.h"
#include "webrtc/rtc_base/thread.h"
#include "webrtc/rtc_base/time_utils.h"

namespace meet {
namespace {

// Audio is sampled at 48000 Hz (cycles per second)
constexpr int kAudioSampleRatePerMillisecond = 48;
constexpr int kSamplingIntervalMillis = 10;
// This should be configurable but we will produce a mono output for now.
constexpr int kNumberOfAudioChannels = 1;
constexpr int kBytesPerSample = sizeof(int16_t);

} // namespace

int32_t MediaApiAudioDeviceModule::RegisterAudioCallback(
webrtc::AudioTransport* callback) {
absl::MutexLock lock(&mutex_);
DCHECK(worker_thread_.IsCurrent());
audio_callback_ = callback;
return 0;
}

int32_t MediaApiAudioDeviceModule::InitPlayout() {
if (task_thread_ != nullptr) {
LOG(WARNING) << "Init called on an already initialized "
"MediaApiAudioDeviceModule.";
return -1;
int32_t MediaApiAudioDeviceModule::StartPlayout() {
DCHECK(worker_thread_.IsCurrent());
if (is_playing_) {
return 0;
}
is_playing_ = true;

task_thread_ = rtc::Thread::Create();
task_thread_->SetName("media_api_audio_thread", nullptr);
task_thread_->Start();
task_thread_->PostTask([this]() { ProcessPlayData(); });
worker_thread_.PostTask(
SafeTask(safety_flag_, [this]() { ProcessPlayData(); }));
return 0;
}

int32_t MediaApiAudioDeviceModule::StopPlayout() {
if (task_thread_ == nullptr) {
return 0;
}
DCHECK(worker_thread_.IsCurrent());
is_playing_ = false;
return 0;
}

bool MediaApiAudioDeviceModule::Playing() const {
DCHECK(worker_thread_.IsCurrent());
return is_playing_;
}

task_thread_->Stop();
task_thread_.reset();
int32_t MediaApiAudioDeviceModule::Terminate() {
DCHECK(worker_thread_.IsCurrent());
safety_flag_->SetNotAlive();
return 0;
}

void MediaApiAudioDeviceModule::ProcessPlayData() {
DCHECK(worker_thread_.IsCurrent());
if (!is_playing_) {
return;
}

int64_t process_start_time = rtc::TimeMillis();
const size_t number_of_samples = kAudioSampleRatePerMillisecond *
kSamplingIntervalMillis *
sampling_interval_.ms() *
kNumberOfAudioChannels;
std::vector<int16_t> sample_buffer(number_of_samples);
size_t samples_out = 0;
int64_t elapsed_time_ms = -1;
int64_t ntp_time_ms = -1;

{
absl::MutexLock lock(&mutex_);
if (audio_callback_ != nullptr) {
audio_callback_->NeedMorePlayData(
number_of_samples, kBytesPerSample, kNumberOfAudioChannels,
// Sampling rate in Hz
kAudioSampleRatePerMillisecond * 1000, sample_buffer.data(),
samples_out, &elapsed_time_ms, &ntp_time_ms);
}
if (audio_callback_ != nullptr) {
audio_callback_->NeedMorePlayData(
number_of_samples, kBytesPerSample, kNumberOfAudioChannels,
// Sampling rate in samples per second (i.e. Hz).
kAudioSampleRatePerMillisecond * 1000, sample_buffer.data(),
samples_out, &elapsed_time_ms, &ntp_time_ms);
}
int64_t process_end_time = rtc::TimeMillis();

task_thread_->PostDelayedHighPrecisionTask(
[this]() { ProcessPlayData(); },
// Delay the next sampling for either:
// 1. (sampling interval) - (time to process current sample)
// 2. No delay if current processing took longer than the desired 10ms
std::max(webrtc::TimeDelta::Millis(
(process_start_time + kSamplingIntervalMillis) -
rtc::TimeMillis()),
webrtc::TimeDelta::Zero()));
// Delay the next sampling for either:
// 1. (sampling interval) - (time to process current sample)
// 2. No delay if current processing took longer than the desired 10ms
// TODO: Improve testing around this computation.
webrtc::TimeDelta delay = std::max(
webrtc::TimeDelta::Millis((process_start_time + sampling_interval_.ms()) -
process_end_time),
webrtc::TimeDelta::Zero());
worker_thread_.PostDelayedHighPrecisionTask(
SafeTask(safety_flag_, [this]() { ProcessPlayData(); }), delay);
}

} // namespace meet
87 changes: 75 additions & 12 deletions native_with_state/internal/media_api_audio_device_module.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,27 @@
#ifndef NATIVE_WITH_STATE_INTERNAL_MEDIA_API_AUDIO_DEVICE_MODULE_H_
#define NATIVE_WITH_STATE_INTERNAL_MEDIA_API_AUDIO_DEVICE_MODULE_H_

#include <stdbool.h>

#include <cstdint>
#include <memory>
#include <utility>

#include "absl/base/thread_annotations.h"
#include "absl/synchronization/mutex.h"
#include "webrtc/api/audio/audio_device.h"
#include "webrtc/api/audio/audio_device_defines.h"
#include "webrtc/api/scoped_refptr.h"
#include "webrtc/api/task_queue/pending_task_safety_flag.h"
#include "webrtc/api/units/time_delta.h"
#include "webrtc/modules/audio_device/include/audio_device_default.h"
#include "webrtc/rtc_base/thread.h"

namespace meet {

// Audio is sampled at 48000Hz.
constexpr int kAudioSampleRatePerMillisecond = 48;
// Produce mono audio (i.e. 1 channel).
constexpr int kNumberOfAudioChannels = 1;
constexpr int kBytesPerSample = sizeof(int16_t);

// Very simple implementation of an AudioDeviceModule.
//
// WebRTC has platform dependent implementations. However they are not fully
Expand All @@ -49,26 +58,80 @@ class MediaApiAudioDeviceModule
: public webrtc::webrtc_impl::AudioDeviceModuleDefault<
webrtc::AudioDeviceModule> {
public:
MediaApiAudioDeviceModule() = default;
// Default constructor for production use.
//
// In production, audio should be sampled at 48000 Hz every 10ms.
explicit MediaApiAudioDeviceModule(rtc::Thread& worker_thread)
: MediaApiAudioDeviceModule(worker_thread,
webrtc::TimeDelta::Millis(10)) {}

~MediaApiAudioDeviceModule() override { StopPlayout(); };
// Constructor for testing with configurable sampling interval; the default
// sampling interval of 10ms is too small to write non-flaky tests with.
MediaApiAudioDeviceModule(rtc::Thread& worker_thread,
webrtc::TimeDelta sampling_interval)
: worker_thread_(worker_thread),
sampling_interval_(std::move(sampling_interval)) {
safety_flag_ = webrtc::PendingTaskSafetyFlag::CreateAttachedToTaskQueue(
/*alive=*/true, &worker_thread_);
};

int32_t InitPlayout() override;
int32_t StopPlayout() override final;
int32_t RegisterAudioCallback(webrtc::AudioTransport* callback) override;
bool Playing() const override { return task_thread_ != nullptr; };
int32_t StartPlayout() override;
int32_t StopPlayout() override;
int32_t Terminate() override;
bool Playing() const override;

private:
// Periodically calls the registered audio callback, registered by WebRTC
// internals, to provide audio data. It is to be invoked every 10ms with a
// sampling rate of 48000 Hz. If this is not done, no audio will be provided
// to the registered audio sinks with the RTPReceiver of the RTPTransceiver
// to the audio sinks registered with the RTPReceiver of the RTPTransceiver
// that remote audio is being received on.
void ProcessPlayData();

mutable absl::Mutex mutex_;
webrtc::AudioTransport* audio_callback_ ABSL_GUARDED_BY(mutex_) = nullptr;
std::unique_ptr<rtc::Thread> task_thread_;
// Note that this MUST be the same worker thread used when creating the peer
// connection.
//
// Not only does this remove the need for synchronization in this class (as
// all methods are called on the worker thread by WebRTC), it also prevents
// a deadlock when closing the peer connection:
//
// When audio data is passed to `ConferenceAudioTrack::OnData()`, it is
// called on whatever thread `audio_callback_` is called on. When attempting
// to read the audio csrcs and ssrcs from
// `RtpReceiverInterface::GetSources()`, a blocking call will be made to the
// worker thread (via the rtp receiver proxy layer) if the current thread is
// NOT the worker thread.
//
// `ConferenceAudioTrack::OnData()` is called while holding a mutex in
// WebRTC's `AudioMixerImpl::Mix()` method (also running on whatever thread
// `audio_callback_` is called on).
//
// At the same time, when closing the peer connection,
// `AudioMixerImpl::RemoveSource()` is called on the worker thread and
// attempts to acquire the mutex held by `AudioMixerImpl::Mix()`, blocking
// the worker thread.
//
// Therefore, it is possible for the worker thread to be blocked while
// waiting for the `AudioMixerImpl` mutex, while
// `ConferenceAudioTrack::OnData()` is blocked waiting for the worker thread
// to read the audio csrcs and ssrcs.
//
// By ensuring that this class is always called on the worker thread, this
// deadlock is avoided, as:
// 1. The worker thread is a task queue, and task queue operatons are
// executed sequentially.
// 2. `ConferenceAudioTrack::OnData()` is called on the worker thread and
// therefore does not need to switch to the worker thread to read the
// audio csrcs and ssrcs.
rtc::Thread& worker_thread_;
// Used to ensure that tasks are not posted after `Terminate()` is called,
// since this class does not own the worker thread.
rtc::scoped_refptr<webrtc::PendingTaskSafetyFlag> safety_flag_;
webrtc::TimeDelta sampling_interval_;

webrtc::AudioTransport* audio_callback_ = nullptr;
bool is_playing_ = false;
};

} // namespace meet
Expand Down
Loading