@@ -15,7 +15,7 @@ namespace audio_tools {
15
15
/* *
16
16
* @brief Buffer implementation which is based on a RP2040 queue. This
17
17
* class is intended to be used to exchange data between the 2 different
18
- * cores.
18
+ * cores. Multi-core and IRQ safe queue implementation!
19
19
* @ingroup buffers
20
20
* @ingroup concurrency
21
21
* @author Phil Schatzmann
@@ -38,8 +38,10 @@ class BufferRP2040T : public BaseBuffer<T> {
38
38
if (buffer_size_total_bytes != buffer_size_req_bytes) {
39
39
LOGI (" resize %d -> %d" , buffer_size_total_bytes, buffer_size_req_bytes);
40
40
assert (buffer_size > 0 );
41
- write_buffer.resize (buffer_size);
42
- read_buffer.resize (buffer_size * 2 );
41
+ if (is_blocking_write){
42
+ write_buffer.resize (buffer_size);
43
+ read_buffer.resize (buffer_size * 2 );
44
+ }
43
45
// release existing queue
44
46
if (buffer_size_total_bytes > 0 ) {
45
47
queue_free (&queue);
@@ -71,6 +73,10 @@ class BufferRP2040T : public BaseBuffer<T> {
71
73
// reads multiple values
72
74
int readArray (T data[], int len) override {
73
75
LOGD (" readArray: %d" , len);
76
+ if (!is_blocking_write && read_buffer.size ()==0 ){
77
+ // make sure that the read buffer is big enough
78
+ read_buffer.resize (len + buffer_size);
79
+ }
74
80
// handle unalloc;ated queue
75
81
if (buffer_size_total_bytes == 0 ) return 0 ;
76
82
if (isEmpty () && read_buffer.isEmpty ()) return 0 ;
@@ -94,24 +100,17 @@ class BufferRP2040T : public BaseBuffer<T> {
94
100
95
101
int writeArray (const T data[], int len) override {
96
102
LOGD (" writeArray: %d" , len);
103
+ int result = 0 ;
97
104
// make sure that we have the data allocated
98
105
resize (buffer_size_req_bytes);
99
106
100
- // blocking write: wait for available space
101
- while (availableForWrite ()<=len){
102
- delay (5 );
103
- };
104
-
105
- // fill the write buffer and when it is full flush it to the queue
106
- for (int j = 0 ; j < len; j++) {
107
- write_buffer.write (data[j]);
108
- if (write_buffer.isFull ()) {
109
- LOGD (" queue_add_blocking" );
110
- queue_add_blocking (&queue, write_buffer.data ());
111
- write_buffer.reset ();
112
- }
107
+ if (is_blocking_write) {
108
+ result = writeBlocking (data, len);
109
+ } else {
110
+ result = writeNonBlocking (data, len);
113
111
}
114
- return len;
112
+
113
+ return result;
115
114
}
116
115
117
116
// checks if the buffer is full
@@ -150,13 +149,52 @@ class BufferRP2040T : public BaseBuffer<T> {
150
149
151
150
size_t size () { return buffer_size_req_bytes / sizeof (T); }
152
151
152
+ // / When we use a non blocking write, the write size must be identical with the buffer size
153
+ void setBlockingWrite (bool flag){
154
+ is_blocking_write = flag;
155
+ }
156
+
153
157
protected:
154
158
queue_t queue;
155
159
int buffer_size_total_bytes = 0 ;
156
160
int buffer_size_req_bytes = 0 ;
157
161
int buffer_size = 0 ;
158
162
SingleBuffer<T> write_buffer{0 };
159
163
audio_tools::RingBuffer<T> read_buffer{0 };
164
+ bool is_blocking_write = true ;
165
+
166
+ int writeBlocking (const T data[], int len) {
167
+ LOGD (" writeArray: %d" , len);
168
+
169
+ if (len > buffer_size){
170
+ LOGE (" write %d too big for buffer_size: %d" , len, buffer_size);
171
+ return 0 ;
172
+ }
173
+
174
+ // fill the write buffer and when it is full flush it to the queue
175
+ for (int j = 0 ; j < len; j++) {
176
+ write_buffer.write (data[j]);
177
+ if (write_buffer.isFull ()) {
178
+ LOGD (" queue_add_blocking" );
179
+ queue_add_blocking (&queue, write_buffer.data ());
180
+ write_buffer.reset ();
181
+ }
182
+ }
183
+ return len;
184
+ }
185
+
186
+ int writeNonBlocking (const T data[], int len) {
187
+ if (len != buffer_size){
188
+ LOGE (" write %d must be buffer_size: %d" , len, buffer_size);
189
+ return 0 ;
190
+ }
191
+
192
+ if (queue_try_add (&queue, write_buffer.data ())){
193
+ return len;
194
+ }
195
+ return 0 ;
196
+ }
197
+
160
198
};
161
199
162
200
using BufferRP2040 = BufferRP2040T<uint8_t >;
0 commit comments