Skip to content

Commit 520b526

Browse files
committed
ESPNow cleanup: use BufferRTOS and Semaphore for confirmations
1 parent d78cd1c commit 520b526

File tree

2 files changed

+84
-97
lines changed

2 files changed

+84
-97
lines changed

src/AudioTools/Communication/ESPNowStream.h

Lines changed: 82 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -2,36 +2,15 @@
22
#include <WiFiUdp.h>
33
#include <esp_now.h>
44

5-
#include "AudioTools/CoreAudio/BaseStream.h"
65
#include "AudioTools/CoreAudio/AudioBasic/StrView.h"
7-
#include "AudioTools/CoreAudio/Buffers.h"
8-
6+
#include "AudioTools/CoreAudio/BaseStream.h"
7+
#include "AudioTools/Concurrency/RTOS.h"
98

109
namespace audio_tools {
1110

12-
1311
// forward declarations
1412
class ESPNowStream;
15-
ESPNowStream *ESPNowStreamSelf = nullptr;
16-
17-
/**
18-
* @brief A simple RIA locking class for the ESP32 using _lock_t
19-
* @author Phil Schatzmann
20-
* @copyright GPLv3
21-
*/
22-
class Lock {
23-
public:
24-
Lock(_lock_t &lock) {
25-
this->p_lock = &lock;
26-
_lock_acquire(p_lock);
27-
}
28-
~Lock() { _lock_release(p_lock); }
29-
30-
protected:
31-
_lock_t *p_lock = nullptr;
32-
};
33-
34-
13+
static ESPNowStream *ESPNowStreamSelf = nullptr;
3514

3615
/**
3716
* @brief Configuration for ESP-NOW protocolö.W
@@ -45,11 +24,10 @@ struct ESPNowStreamConfig {
4524
const char *ssid = nullptr;
4625
const char *password = nullptr;
4726
bool use_send_ack = true; // we wait for
48-
uint16_t delay_after_write_ms = 2;
4927
uint16_t delay_after_failed_write_ms = 2000;
5028
uint16_t buffer_size = ESP_NOW_MAX_DATA_LEN;
5129
uint16_t buffer_count = 400;
52-
int write_retry_count = -1; // -1 endless
30+
int write_retry_count = -1; // -1 endless
5331
#if ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 0, 0)
5432
void (*recveive_cb)(const esp_now_recv_info *info, const uint8_t *data,
5533
int data_len) = nullptr;
@@ -65,7 +43,7 @@ struct ESPNowStreamConfig {
6543
};
6644

6745
/**
68-
* @brief ESPNow as Arduino Stream
46+
* @brief ESPNow as Arduino Stream.
6947
* @ingroup communications
7048
* @author Phil Schatzmann
7149
* @copyright GPLv3
@@ -75,7 +53,7 @@ class ESPNowStream : public BaseStream {
7553
ESPNowStream() { ESPNowStreamSelf = this; };
7654

7755
~ESPNowStream() {
78-
if (p_buffer != nullptr) delete p_buffer;
56+
if (xSemaphore != nullptr) vSemaphoreDelete(xSemaphore);
7957
}
8058

8159
ESPNowStreamConfig defaultConfig() {
@@ -84,9 +62,9 @@ class ESPNowStream : public BaseStream {
8462
}
8563

8664
/// Returns the mac address of the current ESP32
87-
const char *macAddress() {
88-
static const char* result = WiFi.macAddress().c_str();
89-
return result;
65+
const char *macAddress() {
66+
static const char *result = WiFi.macAddress().c_str();
67+
return result;
9068
}
9169

9270
/// Defines an alternative send callback
@@ -112,7 +90,8 @@ class ESPNowStream : public BaseStream {
11290
LOGE("Could not set mac address");
11391
return false;
11492
}
115-
delay(500); // On some boards calling macAddress to early leads to a race condition.
93+
delay(500); // On some boards calling macAddress to early leads to a race
94+
// condition.
11695
// checking if address has been updated
11796
const char *addr = macAddress();
11897
if (strcmp(addr, cfg.mac_address) != 0) {
@@ -132,8 +111,7 @@ class ESPNowStream : public BaseStream {
132111

133112
#if ESP_IDF_VERSION < ESP_IDF_VERSION_VAL(5, 0, 0)
134113
LOGI("Setting ESP-NEW rate");
135-
if (esp_wifi_config_espnow_rate(getInterface(), cfg.rate) !=
136-
ESP_OK) {
114+
if (esp_wifi_config_espnow_rate(getInterface(), cfg.rate) != ESP_OK) {
137115
LOGW("Could not set rate");
138116
}
139117
#endif
@@ -146,10 +124,13 @@ class ESPNowStream : public BaseStream {
146124

147125
/// DeInitialization
148126
void end() {
149-
if (esp_now_deinit() != ESP_OK) {
150-
LOGE("esp_now_deinit");
127+
if (is_init){
128+
if (esp_now_deinit() != ESP_OK) {
129+
LOGE("esp_now_deinit");
130+
}
131+
if (buffer.size()>0) buffer.resize(0);
132+
is_init = false;
151133
}
152-
is_init = false;
153134
}
154135

155136
/// Adds a peer to which we can send info or from which we can receive info
@@ -189,7 +170,7 @@ class ESPNowStream : public BaseStream {
189170
peer.ifidx = getInterface();
190171
peer.encrypt = false;
191172

192-
if (StrView(address).equals(cfg.mac_address)){
173+
if (StrView(address).equals(cfg.mac_address)) {
193174
LOGW("Did not add own address as peer");
194175
return true;
195176
}
@@ -208,42 +189,31 @@ class ESPNowStream : public BaseStream {
208189

209190
/// Writes the data - sends it to all the peers
210191
size_t write(const uint8_t *data, size_t len) override {
192+
setupSemaphore();
211193
int open = len;
212194
size_t result = 0;
213-
int retry_count = 0;
195+
int retry_count = cfg.write_retry_count;
214196
while (open > 0) {
215-
if (available_to_write > 0) {
216-
resetAvailableToWrite();
217-
size_t send_len = min(open, ESP_NOW_MAX_DATA_LEN);
218-
esp_err_t rc = esp_now_send(nullptr, data + result, send_len);
219-
// wait for confirmation
220-
if (cfg.use_send_ack) {
221-
while (available_to_write == 0) {
222-
delay(1);
223-
}
224-
} else {
225-
is_write_ok = true;
226-
}
227-
// check status
228-
if (rc == ESP_OK && is_write_ok) {
229-
open -= send_len;
230-
result += send_len;
231-
} else {
232-
LOGW("Write failed - retrying again");
233-
retry_count++;
234-
if (cfg.write_retry_count>0 && retry_count>=cfg.write_retry_count){
235-
LOGE("Write error after %d retries", cfg.write_retry_count);
236-
// break loop
237-
return 0;
238-
}
239-
}
240-
// if we do have no partner to write we stall and retry later
241-
} else {
242-
delay(cfg.delay_after_write_ms);
197+
resetAvailableToWrite();
198+
// wait for confirmation
199+
if (cfg.use_send_ack) {
200+
xSemaphoreTake(xSemaphore, portMAX_DELAY);
243201
}
244-
245-
// Wait some time before we retry
246-
if (!is_write_ok) {
202+
size_t send_len = min(open, ESP_NOW_MAX_DATA_LEN);
203+
esp_err_t rc = esp_now_send(nullptr, data + result, send_len);
204+
// check status
205+
if (rc == ESP_OK) {
206+
open -= send_len;
207+
result += send_len;
208+
retry_count = 0;
209+
} else {
210+
LOGW("Write failed - retrying again");
211+
retry_count++;
212+
if (retry_count-- < 0 ) {
213+
LOGE("Write error after %d retries", cfg.write_retry_count);
214+
// break loop
215+
return 0;
216+
}
247217
delay(cfg.delay_after_failed_write_ms);
248218
}
249219
}
@@ -252,34 +222,51 @@ class ESPNowStream : public BaseStream {
252222

253223
/// Reeds the data from the peers
254224
size_t readBytes(uint8_t *data, size_t len) override {
255-
if (p_buffer == nullptr) return 0;
256-
Lock lock(write_lock);
257-
return p_buffer->readArray(data, len);
225+
if (buffer.size()==0) return 0;
226+
return buffer.readArray(data, len);
258227
}
259228

260229
int available() override {
261-
return p_buffer == nullptr ? 0 : p_buffer->available();
230+
if (!buffer) return 0;
231+
return buffer.size() == 0? 0 : buffer.available();
262232
}
263233

264234
int availableForWrite() override {
235+
if (!buffer) return 0;
265236
return cfg.use_send_ack ? available_to_write : cfg.buffer_size;
266237
}
267238

239+
/// provides how much the receive buffer is filled (in percent)
240+
float getBufferPercent() {
241+
int size = buffer.size();
242+
// prevent div by 0
243+
if (size==0) return 0.0;
244+
// calculate percent
245+
return 100.0 * buffer.available() / size;
246+
}
247+
268248
protected:
269249
ESPNowStreamConfig cfg;
270-
BaseBuffer<uint8_t> *p_buffer = nullptr;
250+
BufferRTOS<uint8_t> buffer{0};
271251
esp_now_recv_cb_t receive = default_recv_cb;
272252
esp_now_send_cb_t send = default_send_cb;
273-
volatile size_t available_to_write;
253+
volatile size_t available_to_write = 0;
274254
bool is_init = false;
275-
bool is_write_ok = false;
276-
_lock_t write_lock;
255+
SemaphoreHandle_t xSemaphore = nullptr;
277256

278-
inline void setupReceiveBuffer(){
257+
inline void setupSemaphore() {
258+
// use semaphore for confirmations
259+
if (cfg.use_send_ack && xSemaphore==nullptr) {
260+
xSemaphore = xSemaphoreCreateBinary();
261+
xSemaphoreGive(xSemaphore);
262+
}
263+
}
264+
265+
inline void setupReceiveBuffer() {
279266
// setup receive buffer
280-
if (p_buffer == nullptr && cfg.buffer_count > 0) {
281-
// p_buffer = new NBuffer<uint8_t>(cfg.buffer_size , cfg.buffer_count);
282-
p_buffer = new RingBuffer<uint8_t>(cfg.buffer_size * cfg.buffer_count);
267+
if (!buffer) {
268+
LOGI("setupReceiveBuffer: %d", cfg.buffer_size * cfg.buffer_count);
269+
buffer.resize(cfg.buffer_size * cfg.buffer_count);
283270
}
284271
}
285272

@@ -352,26 +339,24 @@ class ESPNowStream : public BaseStream {
352339
}
353340

354341
static int bufferAvailableForWrite() {
355-
Lock lock(ESPNowStreamSelf->write_lock);
356-
return ESPNowStreamSelf->p_buffer->availableForWrite();
342+
return ESPNowStreamSelf->buffer.availableForWrite();
357343
}
358344

359345
#if ESP_IDF_VERSION >= ESP_IDF_VERSION_VAL(5, 0, 0)
360-
static void default_recv_cb(const esp_now_recv_info *info, const uint8_t *data, int data_len)
346+
static void default_recv_cb(const esp_now_recv_info *info,
347+
const uint8_t *data, int data_len)
361348
#else
362-
static void default_recv_cb(const uint8_t *mac_addr, const uint8_t *data, int data_len)
349+
static void default_recv_cb(const uint8_t *mac_addr, const uint8_t *data,
350+
int data_len)
363351
#endif
364-
{
352+
{
365353
LOGD("rec_cb: %d", data_len);
366-
// make sure that the receive buffer is available - moved from begin to make sure that it is only allocated when needed
354+
// make sure that the receive buffer is available - moved from begin to make
355+
// sure that it is only allocated when needed
367356
ESPNowStreamSelf->setupReceiveBuffer();
368357
// blocking write
369-
while (bufferAvailableForWrite() < data_len) {
370-
delay(2);
371-
}
372-
Lock lock(ESPNowStreamSelf->write_lock);
373-
size_t result = ESPNowStreamSelf->p_buffer->writeArray(data, data_len);
374-
if (result!=data_len){
358+
size_t result = ESPNowStreamSelf->buffer.writeArray(data, data_len);
359+
if (result != data_len) {
375360
LOGE("writeArray %d -> %d", data_len, result);
376361
}
377362
}
@@ -391,12 +376,12 @@ class ESPNowStream : public BaseStream {
391376
if (strncmp((char *)mac_addr, (char *)first_mac, ESP_NOW_KEY_LEN) == 0) {
392377
ESPNowStreamSelf->available_to_write = ESPNowStreamSelf->cfg.buffer_size;
393378
if (status == ESP_NOW_SEND_SUCCESS) {
394-
ESPNowStreamSelf->is_write_ok = true;
379+
xSemaphoreGive(ESPNowStreamSelf->xSemaphore);
395380
} else {
396-
ESPNowStreamSelf->is_write_ok = false;
381+
LOGE("Send Error!");
397382
}
398383
}
399384
}
400385
};
401386

402-
}
387+
} // namespace audio_tools

src/AudioTools/Concurrency/RTOS/BufferRTOS.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,8 @@ class BufferRTOS : public BaseBuffer<T> {
151151

152152
size_t size() { return current_size_bytes / sizeof(T); }
153153

154+
operator bool() { return xStreamBuffer != nullptr && size()>0;}
155+
154156
protected:
155157
StreamBufferHandle_t xStreamBuffer = nullptr;
156158
StaticStreamBuffer_t static_stream_buffer;

0 commit comments

Comments
 (0)