Skip to content

Commit 06a07ad

Browse files
authored
Merge pull request #241 from Libvisual/pulseaudio-asynchronous-api
libvisual-plugins: Migrate PulseAudio input plugin to PulseAudio's Asynchronous API (fixes #33)
2 parents 3624cfa + a98e463 commit 06a07ad

File tree

2 files changed

+155
-98
lines changed

2 files changed

+155
-98
lines changed

libvisual-plugins/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ IF(ENABLE_NEBULUS)
261261
ENDIF()
262262

263263
IF(ENABLE_PULSEAUDIO)
264-
PKG_CHECK_MODULES(PULSE libpulse>=${PULSE_REQUIRED_VERSION} libpulse-simple>=${PULSE_REQUIRED_VERSION} IMPORTED_TARGET)
264+
PKG_CHECK_MODULES(PULSE libpulse>=${PULSE_REQUIRED_VERSION} IMPORTED_TARGET)
265265
IF(NOT PULSE_FOUND)
266266
MESSAGE(WARNING "PulseAudio >= ${PULSE_REQUIRED_VERSION} not found. The PulseAudio input plugin will not be built.")
267267
SET(ENABLE_PULSEAUDIO no)

libvisual-plugins/plugins/input/pulseaudio/input_pulseaudio.c

Lines changed: 154 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -15,32 +15,53 @@
1515
*/
1616

1717
#include <libvisual/libvisual.h>
18-
#include "gettext.h"
19-
#include <stdlib.h>
20-
#include <string.h>
21-
#include <pulse/simple.h>
22-
#include <pulse/error.h>
18+
#include <pulse/mainloop.h>
19+
#include <pulse/stream.h>
20+
#include <pulse/thread-mainloop.h>
21+
#include <assert.h>
2322

2423
VISUAL_PLUGIN_API_VERSION_VALIDATOR
2524

26-
#define SAMPLES 1024
27-
#define BUFFERS 2
25+
#define SAMPLE_RATE 44100
26+
#define SAMPLE_RATE_TYPE_LV VISUAL_AUDIO_SAMPLE_RATE_44100
27+
#define CHANNELS 2
28+
#define CHANNELS_TYPE_LV VISUAL_AUDIO_SAMPLE_CHANNEL_STEREO
29+
#define SAMPLE_TYPE int16_t
30+
#define SAMPLE_FORMAT_PA PA_SAMPLE_S16LE
31+
#define SAMPLE_FORMAT_LV VISUAL_AUDIO_SAMPLE_FORMAT_S16
32+
33+
#define FRAMES 512
34+
#define CHUNK_SIZE_BYTES (FRAMES * CHANNELS * sizeof(SAMPLE_TYPE))
35+
#define CHUNKS (2 * SAMPLE_RATE / FRAMES + 1) // i.e. 2+ seconds of audio
36+
37+
#if defined(__clang__) || defined(__GNUC__)
38+
# define ATOMIC_CONSUMER_LOAD_64(source, target) __atomic_load(&source, &target, __ATOMIC_ACQUIRE)
39+
# define ATOMIC_PRODUCER_STORE_64(target, source) __atomic_store(&target, &source, __ATOMIC_RELEASE)
40+
#else
41+
# error We need GCC or Clang for __atomic_load and __atomic_store
42+
#endif
2843

2944
pa_sample_spec sample_spec = {
30-
.format = PA_SAMPLE_S16LE,
31-
.rate = 44100,
32-
.channels = 2
45+
.format = SAMPLE_FORMAT_PA,
46+
.rate = SAMPLE_RATE,
47+
.channels = CHANNELS
3348
};
3449

3550
typedef struct {
36-
pa_simple *simple;
37-
int16_t pcm_data[SAMPLES*2];
51+
pa_threaded_mainloop * mainloop;
52+
pa_context * context;
53+
pa_stream * input_stream;
54+
55+
SAMPLE_TYPE pcm_data[CHUNKS][FRAMES * CHANNELS]; // ringbuffer of chunks
56+
uint64_t chunks_written;
57+
uint64_t chunk_write_offset_bytes;
58+
uint64_t chunks_read;
3859
} pulseaudio_priv_t;
3960

4061
static int inp_pulseaudio_init (VisPluginData *plugin);
4162
static void inp_pulseaudio_cleanup (VisPluginData *plugin);
4263
static int inp_pulseaudio_upload (VisPluginData *plugin, VisAudio *audio);
43-
static int inp_pulseaudio_events (VisPluginData *plugin, VisEventQueue *events);
64+
static void on_input_stream_data (pa_stream *p, size_t nbytes, void *userdata);
4465

4566
const VisPluginInfo *get_plugin_info( void ) {
4667
static VisInputPlugin input = {
@@ -52,128 +73,164 @@ const VisPluginInfo *get_plugin_info( void ) {
5273

5374
.plugname = "pulseaudio",
5475
.name = "Pulseaudio input plugin",
55-
.author = "Scott Sibley <[email protected]>",
56-
.version = "1.0",
76+
.author = "Scott Sibley <[email protected]>"
77+
", "
78+
"Sebastian Pipping <[email protected]>",
79+
.version = "2.0",
5780
.about = "Use input data from pulseaudio",
5881
.help = "",
59-
.license = VISUAL_PLUGIN_LICENSE_GPL,
82+
.license = VISUAL_PLUGIN_LICENSE_GPL, // v3 or later, see header
6083

6184
.init = inp_pulseaudio_init,
6285
.cleanup = inp_pulseaudio_cleanup,
63-
.events = inp_pulseaudio_events,
6486
.plugin = &input
6587
};
6688

6789
return &info;
6890
}
6991

7092
static int inp_pulseaudio_init( VisPluginData *plugin ) {
71-
72-
#if ENABLE_NLS
73-
bindtextdomain (GETTEXT_PACKAGE, LOCALE_DIR);
74-
#endif
75-
7693
pulseaudio_priv_t *priv = visual_mem_new0(pulseaudio_priv_t, 1);
7794
visual_plugin_set_private(plugin, priv);
7895

79-
VisParamList *params = visual_plugin_get_params (plugin);
80-
visual_param_list_add_many (params,
81-
visual_param_new_string ("device",
82-
N_("Device name"),
83-
"",
84-
NULL),
85-
NULL);
86-
87-
int error;
88-
89-
priv->simple = pa_simple_new(
90-
NULL,
91-
"lv-pulseaudio",
92-
PA_STREAM_RECORD,
93-
NULL,
94-
"record",
95-
&sample_spec, NULL, NULL, &error);
96-
97-
if( priv->simple == NULL ) {
98-
visual_log(VISUAL_LOG_CRITICAL, "pa_simple_new() failed: %s", pa_strerror(error));
99-
return FALSE;
96+
priv->mainloop = pa_threaded_mainloop_new();
97+
visual_return_val_if_fail(priv->mainloop != NULL, FALSE);
98+
99+
pa_mainloop_api *const mainloop_api = pa_threaded_mainloop_get_api(priv->mainloop);
100+
visual_return_val_if_fail(mainloop_api != NULL, FALSE);
101+
102+
priv->context = pa_context_new(mainloop_api, "lv-pulseaudio");
103+
visual_return_val_if_fail(priv->context != NULL, FALSE);
104+
105+
const int connect_res =
106+
pa_context_connect(priv->context, NULL, PA_CONTEXT_NOFLAGS, NULL);
107+
visual_return_val_if_fail(connect_res == 0, FALSE);
108+
109+
// NOTE: Starting the main loop prior to pa_context_connect would get us into
110+
// this situation:
111+
// Assertion 'c->callback' failed at
112+
// ../pulseaudio-16.1/src/pulsecore/socket-client.c:126, function
113+
// do_call(). Aborting.
114+
// https://gitlab.freedesktop.org/pulseaudio/pulseaudio/-/issues/991
115+
const int mainloop_start_ret = pa_threaded_mainloop_start(priv->mainloop);
116+
visual_return_val_if_fail(mainloop_start_ret == 0, FALSE);
117+
while (pa_context_get_state(priv->context) < PA_CONTEXT_READY) {
118+
visual_usleep(1000);
100119
}
101120

121+
priv->input_stream = pa_stream_new(priv->context, "Recording", &sample_spec, NULL);
122+
visual_return_val_if_fail(priv->input_stream != NULL, FALSE);
123+
pa_stream_set_read_callback(priv->input_stream, on_input_stream_data, priv);
124+
125+
const pa_buffer_attr input_buffer_attr = {
126+
.fragsize = CHUNK_SIZE_BYTES,
127+
.maxlength = CHUNK_SIZE_BYTES,
128+
};
129+
const int input_connect_res = pa_stream_connect_record(
130+
priv->input_stream, NULL, &input_buffer_attr, PA_STREAM_ADJUST_LATENCY);
131+
visual_return_val_if_fail(input_connect_res == 0, FALSE);
132+
102133
return TRUE;
103134
}
104135

105136
static void inp_pulseaudio_cleanup( VisPluginData *plugin )
106137
{
107138
pulseaudio_priv_t *priv = visual_plugin_get_private(plugin);
139+
visual_return_if_fail(priv != NULL);
108140

109-
pa_simple_free(priv->simple);
141+
pa_stream_disconnect(priv->input_stream);
142+
pa_stream_unref(priv->input_stream);
143+
pa_context_disconnect(priv->context);
144+
pa_context_unref(priv->context);
145+
pa_threaded_mainloop_stop(priv->mainloop);
146+
pa_threaded_mainloop_free(priv->mainloop);
110147

111148
visual_mem_free (priv);
112149
}
113150

114-
static int inp_pulseaudio_events (VisPluginData *plugin, VisEventQueue *events)
151+
static int inp_pulseaudio_upload( VisPluginData *plugin, VisAudio *audio )
115152
{
116-
pulseaudio_priv_t *priv = visual_plugin_get_private (plugin);
117-
VisEvent ev;
118-
VisParam *param;
119-
const char *tmp;
120-
int error;
121-
122-
while (visual_event_queue_poll (events, &ev)) {
123-
switch (ev.type) {
124-
case VISUAL_EVENT_PARAM:
125-
param = ev.event.param.param;
126-
127-
if (visual_param_has_name (param, "device")) {
128-
tmp = visual_param_get_value_string (param);
129-
130-
if(priv->simple != NULL)
131-
pa_simple_free(priv->simple);
132-
133-
priv->simple = pa_simple_new(
134-
NULL,
135-
"lv-pulseaudio",
136-
PA_STREAM_RECORD,
137-
tmp,
138-
"record",
139-
&sample_spec, NULL, NULL, &error);
140-
141-
if( priv->simple == NULL ) {
142-
visual_log(VISUAL_LOG_CRITICAL, "pa_simple_new() failed: %s", pa_strerror(error));
143-
return FALSE;
144-
}
145-
146-
}
147-
break;
148-
149-
default:
150-
break;
153+
pulseaudio_priv_t *priv = visual_plugin_get_private(plugin);
154+
visual_return_val_if_fail(priv != NULL, FALSE);
155+
156+
// `priv->chunks_written` is monotonically increasing in another thread in parallel,
157+
// so we make a snapshot to work with a single consistent value below.
158+
// Also, plain reads to 64bit are not atomic on 32bit platforms, so we add protection.
159+
// This is attomic `priv->chunks_written = frozen_chunks_written`.
160+
uint64_t frozen_chunks_written;
161+
ATOMIC_CONSUMER_LOAD_64(priv->chunks_written, frozen_chunks_written);
162+
163+
assert(priv->chunks_read <= frozen_chunks_written);
164+
if (priv->chunks_read == frozen_chunks_written) {
165+
return TRUE;
166+
}
151167

152-
}
168+
// The writing head keeps moving "in parallel" in another thread
169+
// and without locking. So if the reader get too far behind, the writer
170+
// overtakes the reader, in theory. It is not likely to happen because
171+
// (1) the reader drains all available bytes at once, (2) the buffer
172+
// is large enough to protect against temporary jittering, and (3)
173+
// because `.upload` is guaranteed to be called at least at rendering
174+
// FPS frequency.
175+
const uint64_t tolerable_behind_by = CHUNKS / 2;
176+
uint64_t behind_by = frozen_chunks_written - priv->chunks_read;
177+
if (behind_by > tolerable_behind_by) {
178+
priv->chunks_read = frozen_chunks_written - tolerable_behind_by;
179+
behind_by = tolerable_behind_by;
180+
}
181+
const uint64_t target_chunks_read = priv->chunks_read + behind_by;
182+
183+
while (priv->chunks_read < target_chunks_read) {
184+
void *const data = priv->pcm_data[priv->chunks_read % CHUNKS];
185+
VisBuffer * const visbuffer = visual_buffer_new_wrap_data (data, CHUNK_SIZE_BYTES, FALSE);
186+
visual_return_val_if_fail(visbuffer != NULL, FALSE);
187+
visual_audio_input(audio, visbuffer, SAMPLE_RATE_TYPE_LV,
188+
SAMPLE_FORMAT_LV, CHANNELS_TYPE_LV);
189+
priv->chunks_read++;
190+
visual_buffer_unref(visbuffer);
153191
}
154192

155193
return TRUE;
156194
}
157-
int inp_pulseaudio_upload( VisPluginData *plugin, VisAudio *audio )
158-
{
159-
pulseaudio_priv_t *priv = visual_plugin_get_private(plugin);
160195

161-
int error;
196+
static void on_input_stream_data(pa_stream *p, size_t nbytes, void *userdata) {
197+
pulseaudio_priv_t * const priv = (pulseaudio_priv_t *)userdata;
198+
visual_return_if_fail(priv != NULL);
162199

163-
if (pa_simple_read(priv->simple, priv->pcm_data, sizeof(priv->pcm_data), &error) < 0) {
164-
visual_log(VISUAL_LOG_CRITICAL, "pa_simple_read() failed: %s", pa_strerror(error));
165-
return FALSE;
166-
}
200+
const void *source = NULL;
201+
const int peek_res = pa_stream_peek(p, &source, &nbytes);
202+
visual_return_if_fail(peek_res == 0);
203+
visual_return_if_fail(source != 0);
167204

168-
VisBuffer *visbuffer = visual_buffer_new_wrap_data (priv->pcm_data, sizeof(priv->pcm_data), FALSE);
205+
const int drop_res = pa_stream_drop(p);
206+
visual_return_if_fail(drop_res == 0);
169207

170-
visual_audio_input(audio, visbuffer,
171-
VISUAL_AUDIO_SAMPLE_RATE_44100,
172-
VISUAL_AUDIO_SAMPLE_FORMAT_S16,
173-
VISUAL_AUDIO_SAMPLE_CHANNEL_STEREO);
208+
// Copy all readable bytes from `source` to the right place in `priv->pcm_data`.
209+
while (nbytes > 0) {
210+
void *const target = (void *)priv->pcm_data[priv->chunks_written % CHUNKS] + priv->chunk_write_offset_bytes;
211+
size_t round_nbytes = nbytes;
174212

175-
visual_buffer_unref (visbuffer);
213+
// Would a full write overflow the current chunk?
214+
if (priv->chunk_write_offset_bytes + round_nbytes > CHUNK_SIZE_BYTES) {
215+
// Cut down to fit the chunk
216+
round_nbytes = CHUNK_SIZE_BYTES - priv->chunk_write_offset_bytes;
217+
}
176218

177-
return TRUE;
178-
}
219+
visual_mem_copy(target, source, round_nbytes);
179220

221+
// Figure out write offset location for the next round
222+
if (priv->chunk_write_offset_bytes + round_nbytes < CHUNK_SIZE_BYTES) {
223+
// Same chunk but further behind
224+
priv->chunk_write_offset_bytes += round_nbytes;
225+
} else {
226+
// Start of the next chunk
227+
priv->chunk_write_offset_bytes = 0;
228+
229+
// This is atomic `priv->chunks_written++`
230+
uint64_t new_chunks_written = priv->chunks_written + 1;
231+
ATOMIC_PRODUCER_STORE_64(priv->chunks_written, new_chunks_written);
232+
}
233+
234+
nbytes -= round_nbytes;
235+
}
236+
}

0 commit comments

Comments
 (0)