@@ -16,6 +16,11 @@ namespace audio_tools {
16
16
* @brief Buffer implementation which is based on a RP2040 queue. This
17
17
* class is intended to be used to exchange data between the 2 different
18
18
* cores. Multi-core and IRQ safe queue implementation!
19
+ *
20
+ * In order to increase the efficiency we to not enqueue individual items
21
+ * but write them into a temporary buffer of bufferSize and write this
22
+ * array to the queue when it is full.
23
+ *
19
24
* @ingroup buffers
20
25
* @ingroup concurrency
21
26
* @author Phil Schatzmann
@@ -25,34 +30,40 @@ namespace audio_tools {
25
30
template <typename T>
26
31
class BufferRP2040T : public BaseBuffer <T> {
27
32
public:
28
- BufferRP2040T (size_t bufferSize, int bufferCount = 2 ) : BaseBuffer<T>() {
29
- buffer_size = bufferSize * sizeof (T);
30
- buffer_size_req_bytes = buffer_size * bufferCount;
33
+ BufferRP2040T (int bufferCount) : BaseBuffer<T>() {
34
+ buffer_size = 1 ;
35
+ buffer_size_bytes = sizeof (T);
36
+ buffer_size_req_bytes = buffer_size_bytes * bufferCount;
37
+ }
38
+
39
+ BufferRP2040T (size_t bufferSize, int bufferCount) : BaseBuffer<T>() {
40
+ buffer_size = bufferSize;
41
+ buffer_size_bytes = bufferSize * sizeof (T);
42
+ buffer_size_req_bytes = buffer_size_bytes * bufferCount;
31
43
}
32
44
33
45
~BufferRP2040T () { reset (); }
34
46
35
- // / Re-Allocats the memory and the queue
47
+ // / Re-Allocats the memory and the queue (size is in entries)
36
48
bool resize (size_t size) {
37
- buffer_size_req_bytes = size;
38
- if (buffer_size_total_bytes != buffer_size_req_bytes) {
39
- LOGI (" resize %d -> %d" , buffer_size_total_bytes, buffer_size_req_bytes);
40
- assert (buffer_size > 0 );
41
- if (is_blocking_write){
42
- write_buffer.resize (buffer_size);
43
- read_buffer.resize (buffer_size * 2 );
44
- }
45
- // release existing queue
46
- if (buffer_size_total_bytes > 0 ) {
47
- queue_free (&queue);
48
- }
49
+ int req_bytes = size * sizeof (T);
50
+ if (buffer_size_alloc_bytes < req_bytes) {
51
+ LOGI (" resize %d -> %d" , buffer_size_alloc_bytes / sizeof (T), size);
52
+ assert (buffer_size_bytes > 0 );
53
+ write_buffer.resize (buffer_size);
54
+ read_buffer.resize (buffer_size * 2 );
49
55
// create new queu
50
- if (buffer_size_req_bytes > 0 ) {
51
- int count = buffer_size_req_bytes / buffer_size;
52
- LOGI (" queue_init(size:%d, count:%d)" , buffer_size, count);
53
- queue_init (&queue, buffer_size, count);
56
+ if (req_bytes > buffer_size_alloc_bytes) {
57
+ // release existing queue
58
+ if (buffer_size_alloc_bytes > 0 ) {
59
+ queue_free (&queue);
60
+ }
61
+
62
+ int count = req_bytes / buffer_size_bytes;
63
+ LOGI (" queue_init(size:%d, count:%d)" , buffer_size_bytes, count);
64
+ queue_init (&queue, buffer_size_bytes, count);
65
+ buffer_size_alloc_bytes = req_bytes;
54
66
}
55
- buffer_size_total_bytes = buffer_size_req_bytes;
56
67
}
57
68
return true ;
58
69
}
@@ -73,13 +84,13 @@ class BufferRP2040T : public BaseBuffer<T> {
73
84
// reads multiple values
74
85
int readArray (T data[], int len) override {
75
86
LOGD (" readArray: %d" , len);
76
- if (!is_blocking_write && read_buffer.size ()==0 ){
77
- // make sure that the read buffer is big enough
78
- read_buffer.resize (len + buffer_size);
79
- }
80
87
// handle unalloc;ated queue
81
- if (buffer_size_total_bytes == 0 ) return 0 ;
82
- if (isEmpty () && read_buffer.isEmpty ()) return 0 ;
88
+ if (buffer_size_alloc_bytes == 0 ) return 0 ;
89
+
90
+ // blocking read
91
+ while (is_blocking_read && read_buffer.available () + available () < len)
92
+ delay (1 );
93
+
83
94
// fill read buffer if necessary
84
95
while (read_buffer.availableForWrite () >= buffer_size) {
85
96
LOGD (" reading %d %d " , buffer_size, read_buffer.availableForWrite ());
@@ -102,7 +113,7 @@ class BufferRP2040T : public BaseBuffer<T> {
102
113
LOGD (" writeArray: %d" , len);
103
114
int result = 0 ;
104
115
// make sure that we have the data allocated
105
- resize (buffer_size_req_bytes);
116
+ resize (buffer_size_req_bytes / sizeof (T) );
106
117
107
118
if (is_blocking_write) {
108
119
result = writeBlocking (data, len);
@@ -115,12 +126,12 @@ class BufferRP2040T : public BaseBuffer<T> {
115
126
116
127
// checks if the buffer is full
117
128
bool isFull () override {
118
- if (buffer_size_total_bytes == 0 ) return false ;
129
+ if (buffer_size_alloc_bytes == 0 ) return false ;
119
130
return queue_is_full (&queue);
120
131
}
121
132
122
133
bool isEmpty () {
123
- if (buffer_size_total_bytes == 0 ) return true ;
134
+ if (buffer_size_alloc_bytes == 0 ) return true ;
124
135
return queue_is_empty (&queue);
125
136
}
126
137
@@ -130,44 +141,55 @@ class BufferRP2040T : public BaseBuffer<T> {
130
141
// clears the buffer
131
142
void reset () override {
132
143
queue_free (&queue);
133
- buffer_size_total_bytes = 0 ;
144
+ buffer_size_alloc_bytes = 0 ;
134
145
}
135
146
136
147
// provides the number of entries that are available to read
137
148
int available () override {
138
- return buffer_size / sizeof (T);
149
+ if (buffer_size_alloc_bytes == 0 ) return 0 ;
150
+ return (queue_get_level (&queue) * buffer_size);
139
151
}
140
152
141
153
// provides the number of entries that are available to write
142
- int availableForWrite () override { return size () - available (); }
154
+ int availableForWrite () override {
155
+ if (buffer_size_alloc_bytes == 0 ) return size ();
156
+ return size () - available (); }
143
157
144
158
// returns the address of the start of the physical read buffer
145
159
T *address () override {
146
160
LOGE (" address() not implemented" );
147
161
return nullptr ;
148
162
}
149
163
150
- size_t size () { return buffer_size_req_bytes / sizeof (T); }
164
+ size_t size () { return buffer_size_alloc_bytes / sizeof (T); }
151
165
152
166
// / When we use a non blocking write, the write size must be identical with the buffer size
153
167
void setBlockingWrite (bool flag){
154
168
is_blocking_write = flag;
155
169
}
156
170
171
+ // / When we use a blockingread, the we wait for the data to be available
172
+ void setBlockingRead (bool flag){
173
+ is_blocking_read = flag;
174
+ }
175
+
176
+
157
177
protected:
158
178
queue_t queue;
159
- int buffer_size_total_bytes = 0 ;
179
+ int buffer_size_alloc_bytes = 0 ;
160
180
int buffer_size_req_bytes = 0 ;
181
+ int buffer_size_bytes = 0 ;
161
182
int buffer_size = 0 ;
162
183
SingleBuffer<T> write_buffer{0 };
163
184
audio_tools::RingBuffer<T> read_buffer{0 };
164
185
bool is_blocking_write = true ;
186
+ bool is_blocking_read = false ;
165
187
166
188
int writeBlocking (const T data[], int len) {
167
189
LOGD (" writeArray: %d" , len);
168
190
169
- if (len > buffer_size ){
170
- LOGE (" write %d too big for buffer_size: %d" , len, buffer_size );
191
+ if (len > buffer_size_bytes ){
192
+ LOGE (" write %d too big for buffer_size: %d" , len, buffer_size_bytes );
171
193
return 0 ;
172
194
}
173
195
@@ -184,8 +206,8 @@ class BufferRP2040T : public BaseBuffer<T> {
184
206
}
185
207
186
208
int writeNonBlocking (const T data[], int len) {
187
- if (len != buffer_size ){
188
- LOGE (" write %d must be buffer_size: %d" , len, buffer_size );
209
+ if (len != buffer_size_bytes ){
210
+ LOGE (" write %d must be buffer_size: %d" , len, buffer_size_bytes );
189
211
return 0 ;
190
212
}
191
213
0 commit comments