Skip to content

Commit 0ac8c32

Browse files
committed
aplay/mixer: allow selection of single participant
Allow selection of a single participant to be sent back with a control socket (docuented in wiki).
1 parent d90d283 commit 0ac8c32

File tree

1 file changed

+71
-1
lines changed

1 file changed

+71
-1
lines changed

src/audio/playback/mixer.cpp

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,13 @@
6060
#include "debug.h"
6161
#include "host.h" // for get_commandline_param, uv_argv
6262
#include "lib_common.h"
63+
#include "messaging.h"
6364
#include "module.h"
6465
#include "rtp/rtp.h"
6566
#include "transmit.h"
6667
#include "types.h" // for tx_media_type
6768
#include "utils/audio_buffer.h"
69+
#include "utils/macros.h"
6870
#include "utils/net.h" // for get_sockaddr_addr_str
6971
#include "utils/thread.h"
7072

@@ -255,6 +257,8 @@ struct state_audio_mixer final {
255257
state_audio_mixer(const struct audio_playback_opts *opts) {
256258
parse_opts(opts);
257259

260+
only_sender.ss_family = AF_UNSPEC;
261+
258262
struct audio_codec_state *audio_coder =
259263
audio_codec_init_cfg(audio_codec.c_str(), AUDIO_CODER);
260264
if (!audio_coder) {
@@ -264,26 +268,83 @@ struct state_audio_mixer final {
264268
audio_codec_done(audio_coder);
265269
}
266270

271+
module_init_default(&mod);
272+
mod.cls = MODULE_CLASS_DATA;
273+
module_register(&mod, opts->parent);
274+
267275
thread_id = thread(&state_audio_mixer::worker, this);
268276
}
269277
~state_audio_mixer() {
270278
thread_id.join();
279+
module_done(&mod);
271280
}
272281
bool should_exit = false;
273282
state_audio_mixer(state_audio_mixer const&) = delete;
274283
state_audio_mixer& operator=(state_audio_mixer const&) = delete;
275284
void worker();
285+
void check_messages();
276286

277287
map<sockaddr_storage, am_participant, sockaddr_storage_less> participants;
278288
mutex participants_lock;
279289

280290
struct socket_udp_local *recv_socket{};
281291
string audio_codec{"PCM"};
292+
sockaddr_storage
293+
only_sender; ///< if !AF_UNSPEC, use stream just from this sender
282294
private:
295+
struct module mod;
283296
thread thread_id;
284297
unique_ptr<generic_mix_algo<sample_type_source, sample_type_mixed>> mixing_algorithm{new linear_mix_algo<sample_type_source, sample_type_mixed>()};
285298
};
286299

300+
void
301+
state_audio_mixer::check_messages()
302+
{
303+
struct message *msg = nullptr;
304+
while ((msg = check_message(&mod))) {
305+
auto *msg_univ = reinterpret_cast<struct msg_universal *>(msg);
306+
MSG(VERBOSE, "Received message: %s\n", msg_univ->text);
307+
if (strcmp(msg_univ->text, "help") == 0) {
308+
printf("Syntax:\n"
309+
"\trestrict <addr>\n"
310+
"\trestrict flush\n"
311+
"eg.:\n"
312+
"\trestrict [::ffff:10.0.1.20]:65426\n");
313+
free_message(msg, new_response(RESPONSE_OK, nullptr));
314+
continue;
315+
}
316+
if (strstr(msg_univ->text, "restrict ") != msg_univ->text) {
317+
MSG(ERROR,
318+
"Unknown message: %s!\nSend message \"help\" for "
319+
"syntax.\n",
320+
msg_univ->text);
321+
char resp_msg[sizeof msg_univ->text + 20];
322+
snprintf_ch(resp_msg, "unknown request: %s",
323+
msg_univ->text);
324+
free_message(
325+
msg, new_response(RESPONSE_BAD_REQUEST, resp_msg));
326+
continue;
327+
}
328+
const char *val = msg_univ->text + strlen("restrict ");
329+
if (strcmp(val, "flush") == 0) {
330+
MSG(INFO, "flushing the address restriction (defaulting to mix all)\n");
331+
only_sender.ss_family = AF_UNSPEC;
332+
} else {
333+
MSG(INFO, "restricting mixer to: %s\n", val);
334+
only_sender = get_sockaddr(val, 0);
335+
if (participants.find(only_sender) ==
336+
participants.end()) {
337+
MSG(WARNING,
338+
"The requested participant %s is not yet "
339+
"present...\n",
340+
val);
341+
}
342+
}
343+
344+
free_message(msg, new_response(RESPONSE_OK, nullptr));
345+
}
346+
}
347+
287348
void state_audio_mixer::worker()
288349
{
289350
set_thread_name(__func__);
@@ -304,6 +365,7 @@ void state_audio_mixer::worker()
304365
}
305366

306367
unique_lock<mutex> plk(participants_lock);
368+
check_messages();
307369
// check timeouts
308370
for (auto it = participants.cbegin(); it != participants.cend(); )
309371
{
@@ -433,8 +495,16 @@ static void audio_play_mixer_put_frame(void *state, const struct audio_frame *fr
433495
s->participants.emplace(ss, am_participant{s->recv_socket, &ss, s->audio_codec});
434496
}
435497

436-
audio_buffer_write(s->participants.at(ss).m_buffer, frame->data, frame->data_len);
437498
s->participants.at(ss).last_seen = chrono::steady_clock::now();
499+
500+
// if mixer restricted to a single sender and this isn't me
501+
if (s->only_sender.ss_family != AF_UNSPEC &&
502+
sockaddr_compare((const sockaddr *) &ss,
503+
(const sockaddr *) &s->only_sender) != 0) {
504+
return;
505+
}
506+
507+
audio_buffer_write(s->participants.at(ss).m_buffer, frame->data, frame->data_len);
438508
}
439509

440510
static void audio_play_mixer_done(void *state)

0 commit comments

Comments
 (0)