Skip to content

Commit 7019753

Browse files
committed
Correct Concurrency Tests
1 parent ac05a50 commit 7019753

File tree

7 files changed

+119
-31
lines changed

7 files changed

+119
-31
lines changed

examples/tests/concurrency/synchBufferRTOS/synchBufferRTOS.ino renamed to examples/tests/concurrency/BufferRTOS/BufferRTOS.ino

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,24 @@
1111
#include "AudioTools.h"
1212
#include "AudioLibs/Concurrency.h"
1313

14-
BufferRTOS<int16_t> buffer(1024, 512);
14+
BufferRTOS<int16_t> buffer(512, 10);
1515

1616
Task writeTask("write", 3000, 10, 0);
17+
1718
Task readTask("read", 3000, 10, 1);
1819

1920
void setup() {
2021
Serial.begin(115200);
21-
AudioLogger::instance().begin(Serial, AudioLogger::Info);
2222

2323
writeTask.begin([]() {
2424
int16_t data[512];
2525
for (int j = 0; j < 512; j++) {
2626
data[j] = j;
2727
}
28-
buffer.writeArray(data, 512);
28+
29+
size_t len = buffer.writeArray(data, 512);
30+
delay(1);
31+
2932
});
3033

3134
readTask.begin([]() {
@@ -35,7 +38,9 @@ void setup() {
3538
static int16_t data[512];
3639

3740
// read data
38-
buffer.readArray(data, 512);
41+
size_t read = buffer.readArray(data, 512);
42+
delay(1);
43+
assert(read==512);
3944

4045
// check data
4146
for (int j = 0; j < 512; j++) {

examples/tests/concurrency/NBuffer/NBuffer.ino

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ void setup() {
2525
for (int j = 0; j < 512; j++) {
2626
data[j] = j;
2727
}
28-
buffer.writeArray(data, 512);
28+
assert(buffer.writeArray(data, 512)==512);
29+
delay(1);
30+
2931
});
3032

3133
readTask.begin([]() {
@@ -35,7 +37,10 @@ void setup() {
3537
static int16_t data[512];
3638

3739
// read data
38-
buffer.readArray(data, 512);
40+
size_t read = buffer.readArray(data, 512);
41+
delay(1);
42+
if (read==0) return;
43+
assert(read==512);
3944

4045
// check data
4146
for (int j = 0; j < 512; j++) {
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/**
2+
* @file SynchonizedNBuffer.ino
3+
* @author Phil Schatzmann
4+
* @brief Multitask with SynchronizedBuffer using NBuffer
5+
* @version 0.1
6+
* @date 2022-11-25
7+
*
8+
* @copyright Copyright (c) 2022
9+
*
10+
*/
11+
#include "AudioTools.h"
12+
#include "AudioLibs/Concurrency.h"
13+
14+
SynchronizedNBuffer<int16_t> buffer(1024, 10);
15+
16+
Task writeTask("write", 3000, 10, 0);
17+
18+
Task readTask("read", 3000, 10, 1);
19+
20+
void setup() {
21+
Serial.begin(115200);
22+
23+
writeTask.begin([]() {
24+
int16_t data[512];
25+
for (int j = 0; j < 512; j++) {
26+
data[j] = j;
27+
}
28+
assert(buffer.writeArray(data, 512)==512);
29+
delay(1);
30+
31+
});
32+
33+
readTask.begin([]() {
34+
static uint64_t start = millis();
35+
static size_t total_bytes = 0;
36+
static size_t errors = 0;
37+
static int16_t data[512];
38+
39+
// read data
40+
assert(buffer.readArray(data, 512)==512);
41+
delay(1);
42+
43+
// check data
44+
for (int j = 0; j < 512; j++) {
45+
if (data[j] != j) errors++;
46+
}
47+
// calculate bytes per second
48+
total_bytes += sizeof(int16_t) * 512;
49+
if (total_bytes >= 1024000) {
50+
uint64_t duration = millis() - start;
51+
float mbps = static_cast<float>(total_bytes) / duration / 1000.0;
52+
53+
// print result
54+
Serial.print("Mbytes per second: ");
55+
Serial.print(mbps);
56+
Serial.print(" with ");
57+
Serial.print(errors);
58+
Serial.println(" errors");
59+
60+
start = millis();
61+
errors = 0;
62+
total_bytes = 0;
63+
}
64+
});
65+
}
66+
67+
void loop() { delay(1000); }

examples/tests/concurrency/synchNBuffer/synchNBuffer.ino

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
#include "AudioLibs/Concurrency.h"
1313

1414
audio_tools::Mutex mutex;
15-
NBuffer<int16_t> nbuffer(512, 8);
15+
NBuffer<int16_t> nbuffer(512, 10);
1616
SynchronizedBuffer<int16_t> buffer(nbuffer, mutex);
1717

1818
Task writeTask("write", 3000, 10, 0);
@@ -27,7 +27,9 @@ void setup() {
2727
for (int j = 0; j < 512; j++) {
2828
data[j] = j;
2929
}
30-
buffer.writeArray(data, 512);
30+
assert(buffer.writeArray(data, 512)==512);
31+
delay(1);
32+
3133
});
3234

3335
readTask.begin([]() {
@@ -37,7 +39,8 @@ void setup() {
3739
static int16_t data[512];
3840

3941
// read data
40-
buffer.readArray(data, 512);
42+
assert(buffer.readArray(data, 512)==512);
43+
delay(1);
4144

4245
// check data
4346
for (int j = 0; j < 512; j++) {

examples/tests/concurrency/synchRingBuffer/synchRingBuffer.ino

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,26 @@ Task readTask("read", 3000, 10, 1);
2121

2222
void setup() {
2323
Serial.begin(115200);
24+
2425
writeTask.begin([]() {
2526
int16_t data[512];
2627
for (int j = 0; j < 512; j++) {
2728
data[j] = j;
2829
}
29-
buffer.writeArray(data, 512);
30+
assert(buffer.writeArray(data, 512)==512);
31+
delay(1);
32+
3033
});
34+
3135
readTask.begin([]() {
3236
static uint64_t start = millis();
3337
static size_t total_bytes = 0;
3438
static size_t errors = 0;
3539
static int16_t data[512];
3640

3741
// read data
38-
buffer.readArray(data, 512);
42+
assert(buffer.readArray(data, 512)==512);
43+
delay(1);
3944

4045
// check data
4146
for (int j = 0; j < 512; j++) {

src/Concurrency/BufferRTOS.h

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,18 @@ namespace audio_tools {
2828
template <typename T>
2929
class BufferRTOS : public BaseBuffer<T> {
3030
public:
31-
BufferRTOS(size_t xStreamBufferSizeBytes, size_t xTriggerLevel = 1,
31+
BufferRTOS(size_t streamBufferSize, size_t xTriggerLevel = 1,
3232
TickType_t writeMaxWait = portMAX_DELAY,
3333
TickType_t readMaxWait = portMAX_DELAY,
3434
Allocator &allocator = DefaultAllocator)
3535
: BaseBuffer<T>() {
3636
readWait = readMaxWait;
3737
writeWait = writeMaxWait;
38-
current_size = xStreamBufferSizeBytes;
38+
current_size_bytes = (streamBufferSize+1) * sizeof(T);
3939
trigger_level = xTriggerLevel;
4040
p_allocator = &allocator;
4141

42-
if (current_size > 0) {
42+
if (streamBufferSize > 0) {
4343
setup();
4444
}
4545
}
@@ -49,9 +49,10 @@ class BufferRTOS : public BaseBuffer<T> {
4949
/// Re-Allocats the memory and the queue
5050
bool resize(size_t size) {
5151
bool result = true;
52-
if (current_size != size) {
52+
int req_size_bytes = (size + 1)*sizeof(T);
53+
if (current_size_bytes != req_size_bytes) {
5354
end();
54-
current_size = size;
55+
current_size_bytes = req_size_bytes;
5556
result = setup();
5657
}
5758
return result;
@@ -84,10 +85,10 @@ class BufferRTOS : public BaseBuffer<T> {
8485
#else
8586
portYIELD_FROM_ISR(xHigherPriorityTaskWoken);
8687
#endif
87-
return result;
88+
return result / sizeof(T);
8889
} else {
8990
return xStreamBufferReceive(xStreamBuffer, (void *)data, sizeof(T) * len,
90-
readWait);
91+
readWait) / sizeof(T);
9192
}
9293
}
9394

@@ -103,10 +104,10 @@ class BufferRTOS : public BaseBuffer<T> {
103104
#else
104105
portYIELD_FROM_ISR(xHigherPriorityTaskWoken);
105106
#endif
106-
return result;
107+
return result / sizeof(T);
107108
} else {
108109
return xStreamBufferSend(xStreamBuffer, (void *)data, sizeof(T) * len,
109-
writeWait);
110+
writeWait) / sizeof(T);
110111
}
111112
}
112113

@@ -134,12 +135,12 @@ class BufferRTOS : public BaseBuffer<T> {
134135

135136
// provides the number of entries that are available to read
136137
int available() override {
137-
return xStreamBufferBytesAvailable(xStreamBuffer);
138+
return xStreamBufferBytesAvailable(xStreamBuffer) / sizeof(T);
138139
}
139140

140141
// provides the number of entries that are available to write
141142
int availableForWrite() override {
142-
return xStreamBufferSpacesAvailable(xStreamBuffer);
143+
return xStreamBufferSpacesAvailable(xStreamBuffer) / sizeof(T);
143144
}
144145

145146
// returns the address of the start of the physical read buffer
@@ -148,7 +149,7 @@ class BufferRTOS : public BaseBuffer<T> {
148149
return nullptr;
149150
}
150151

151-
size_t size() { return current_size; }
152+
size_t size() { return current_size_bytes / sizeof(T); }
152153

153154
protected:
154155
StreamBufferHandle_t xStreamBuffer = nullptr;
@@ -160,16 +161,16 @@ class BufferRTOS : public BaseBuffer<T> {
160161
int writeWait = portMAX_DELAY;
161162
bool read_from_isr = false;
162163
bool write_from_isr = false;
163-
size_t current_size = 0;
164+
size_t current_size_bytes = 0;
164165
size_t trigger_level = 0;
165166

166167
/// The allocation has been postponed to be done here, so that we can e.g. use
167168
/// psram
168169
bool setup() {
169-
if (current_size == 0) return true;
170+
if (current_size_bytes == 0) return true;
170171

171172
// allocate data if necessary
172-
int size = (current_size + 1) * sizeof(T);
173+
int size = (current_size_bytes + 1) * sizeof(T);
173174
if (p_data == nullptr) {
174175
p_data = (uint8_t *)p_allocator->allocate(size);
175176
// check allocation
@@ -182,7 +183,7 @@ class BufferRTOS : public BaseBuffer<T> {
182183

183184
// create stream buffer if necessary
184185
if (xStreamBuffer == nullptr) {
185-
xStreamBuffer = xStreamBufferCreateStatic(current_size, trigger_level,
186+
xStreamBuffer = xStreamBufferCreateStatic(current_size_bytes, trigger_level,
186187
p_data, &static_stream_buffer);
187188
}
188189
if (xStreamBuffer == nullptr) {
@@ -198,7 +199,7 @@ class BufferRTOS : public BaseBuffer<T> {
198199
void end() {
199200
if (xStreamBuffer != nullptr) vStreamBufferDelete(xStreamBuffer);
200201
p_allocator->free(p_data);
201-
current_size = 0;
202+
current_size_bytes = 0;
202203
p_data = nullptr;
203204
xStreamBuffer = nullptr;
204205
}

src/Concurrency/SynchronizedBuffers.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,10 @@ namespace audio_tools {
3131
template <typename T>
3232
class SynchronizedBuffer : public BaseBuffer<T> {
3333
public:
34-
SynchronizedBuffer(BaseBuffer<T> &buffer, Mutex &mutex) {
34+
SynchronizedBuffer(BaseBuffer<T> &buffer, Mutex &mutex, bool syncAvailable=false) {
3535
p_buffer = &buffer;
3636
p_mutex = &mutex;
37+
is_sync_available = syncAvailable;
3738
}
3839

3940
// reads a single value
@@ -96,14 +97,14 @@ class SynchronizedBuffer : public BaseBuffer<T> {
9697
// provides the number of entries that are available to read
9798
int available() override {
9899
TRACED();
99-
LockGuard guard(p_mutex);
100+
if (is_sync_available) LockGuard guard(p_mutex);
100101
return p_buffer->available();
101102
}
102103

103104
// provides the number of entries that are available to write
104105
int availableForWrite() override {
105106
TRACED();
106-
LockGuard guard(p_mutex);
107+
if (is_sync_available) LockGuard guard(p_mutex);
107108
return p_buffer->availableForWrite();
108109
}
109110

@@ -120,6 +121,7 @@ class SynchronizedBuffer : public BaseBuffer<T> {
120121
protected:
121122
BaseBuffer<T> *p_buffer = nullptr;
122123
Mutex *p_mutex = nullptr;
124+
bool is_sync_available = false;
123125
};
124126

125127
/**

0 commit comments

Comments
 (0)