Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 18 additions & 17 deletions src/AsyncEventSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,23 +192,24 @@ AsyncEventSourceClient::AsyncEventSourceClient(AsyncWebServerRequest *request, A

AsyncEventSourceClient::~AsyncEventSourceClient() {
#ifdef ESP32
// Protect message queue access (size checks and modifications) which is not thread-safe.
std::lock_guard<std::recursive_mutex> lock(_lockmq);
#endif
_messageQueue.clear();
close();
}

bool AsyncEventSourceClient::_queueMessage(const char *message, size_t len) {
#ifdef ESP32
// Protect message queue access (size checks and modifications) which is not thread-safe.
std::lock_guard<std::recursive_mutex> lock(_lockmq);
#endif

if (_messageQueue.size() >= SSE_MAX_QUEUED_MESSAGES) {
async_ws_log_w("Event message queue overflow: discard message");
return false;
}

#ifdef ESP32
// length() is not thread-safe, thus acquiring the lock before this call..
std::lock_guard<std::recursive_mutex> lock(_lockmq);
#endif

if (_client) {
_messageQueue.emplace_back(message, len);
} else {
Expand All @@ -230,16 +231,16 @@ bool AsyncEventSourceClient::_queueMessage(const char *message, size_t len) {
}

bool AsyncEventSourceClient::_queueMessage(AsyncEvent_SharedData_t &&msg) {
#ifdef ESP32
// Protect message queue access (size checks and modifications) which is not thread-safe.
std::lock_guard<std::recursive_mutex> lock(_lockmq);
#endif

if (_messageQueue.size() >= SSE_MAX_QUEUED_MESSAGES) {
async_ws_log_w("Event message queue overflow: discard message");
return false;
}

#ifdef ESP32
// length() is not thread-safe, thus acquiring the lock before this call..
std::lock_guard<std::recursive_mutex> lock(_lockmq);
#endif

if (_client) {
_messageQueue.emplace_back(std::move(msg));
} else {
Expand All @@ -261,7 +262,7 @@ bool AsyncEventSourceClient::_queueMessage(AsyncEvent_SharedData_t &&msg) {

void AsyncEventSourceClient::_onAck(size_t len __attribute__((unused)), uint32_t time __attribute__((unused))) {
#ifdef ESP32
// Same here, acquiring the lock early
// Protect message queue access (size checks and modifications) which is not thread-safe.
std::lock_guard<std::recursive_mutex> lock(_lockmq);
#endif

Expand All @@ -288,11 +289,11 @@ void AsyncEventSourceClient::_onAck(size_t len __attribute__((unused)), uint32_t
}

void AsyncEventSourceClient::_onPoll() {
if (_messageQueue.size()) {
#ifdef ESP32
// Same here, acquiring the lock early
std::lock_guard<std::recursive_mutex> lock(_lockmq);
// Protect message queue access (size checks and modifications) which is not thread-safe.
std::lock_guard<std::recursive_mutex> lock(_lockmq);
#endif
if (_messageQueue.size()) {
_runQueue();
}
}
Expand Down Expand Up @@ -379,12 +380,12 @@ void AsyncEventSource::_addClient(AsyncEventSourceClient *client) {
}

void AsyncEventSource::_handleDisconnect(AsyncEventSourceClient *client) {
if (_disconnectcb) {
_disconnectcb(client);
}
#ifdef ESP32
std::lock_guard<std::recursive_mutex> lock(_client_queue_lock);
#endif
if (_disconnectcb) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change may be incorrect. Client callbacks shouldn't be called while holding the lock -- if the client does something like "hmm, I'm done with this server now, lets destruct it" we'll have a bad time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same issue existed here:

void AsyncEventSource::_addClient(AsyncEventSourceClient *client) {
if (!client) {
return;
}
#ifdef ESP32
std::lock_guardstd::recursive_mutex lock(_client_queue_lock);
#endif
_clients.emplace_back(client);
if (_connectcb) {
_connectcb(client);
}

_adjust_inflight_window();
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added 2 commits in #415 to fix the 2 situations.

_disconnectcb(client);
}
for (auto i = _clients.begin(); i != _clients.end(); ++i) {
if (i->get() == client) {
_clients.erase(i);
Expand Down
3 changes: 3 additions & 0 deletions src/AsyncEventSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,9 @@ class AsyncEventSourceClient {
return _lastId;
}
size_t packetsWaiting() const {
#ifdef ESP32
std::lock_guard<std::recursive_mutex> lock(_lockmq);
#endif
return _messageQueue.size();
};

Expand Down
37 changes: 35 additions & 2 deletions src/AsyncWebSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -309,12 +309,12 @@ void AsyncWebSocketClient::_clearQueue() {
void AsyncWebSocketClient::_onAck(size_t len, uint32_t time) {
_lastMessageTime = millis();

async_ws_log_v("[%s][%" PRIu32 "] START ACK(%u, %" PRIu32 ") Q:%u", _server->url(), _clientId, len, time, _messageQueue.size());

#ifdef ESP32
std::unique_lock<std::recursive_mutex> lock(_lock);
#endif

async_ws_log_v("[%s][%" PRIu32 "] START ACK(%u, %" PRIu32 ") Q:%u", _server->url(), _clientId, len, time, _messageQueue.size());

if (!_controlQueue.empty()) {
auto &head = _controlQueue.front();
if (head.finished()) {
Expand Down Expand Up @@ -988,6 +988,9 @@ void AsyncWebSocket::_handleEvent(AsyncWebSocketClient *client, AwsEventType typ
}

AsyncWebSocketClient *AsyncWebSocket::_newClient(AsyncWebServerRequest *request) {
#ifdef ESP32
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
_clients.emplace_back(request, this);
// we've just detached AsyncTCP client from AsyncWebServerRequest
_handleEvent(&_clients.back(), WS_EVT_CONNECT, request, NULL, 0);
Expand All @@ -997,6 +1000,9 @@ AsyncWebSocketClient *AsyncWebSocket::_newClient(AsyncWebServerRequest *request)
}

void AsyncWebSocket::_handleDisconnect(AsyncWebSocketClient *client) {
#ifdef ESP32
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
const auto client_id = client->id();
const auto iter = std::find_if(std::begin(_clients), std::end(_clients), [client_id](const AsyncWebSocketClient &c) {
return c.id() == client_id;
Expand All @@ -1007,12 +1013,18 @@ void AsyncWebSocket::_handleDisconnect(AsyncWebSocketClient *client) {
}

bool AsyncWebSocket::availableForWriteAll() {
#ifdef ESP32
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
return std::none_of(std::begin(_clients), std::end(_clients), [](const AsyncWebSocketClient &c) {
return c.queueIsFull();
});
}

bool AsyncWebSocket::availableForWrite(uint32_t id) {
#ifdef ESP32
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
const auto iter = std::find_if(std::begin(_clients), std::end(_clients), [id](const AsyncWebSocketClient &c) {
return c.id() == id;
});
Expand All @@ -1023,12 +1035,18 @@ bool AsyncWebSocket::availableForWrite(uint32_t id) {
}

size_t AsyncWebSocket::count() const {
#ifdef ESP32
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
return std::count_if(std::begin(_clients), std::end(_clients), [](const AsyncWebSocketClient &c) {
return c.status() == WS_CONNECTED;
});
}

AsyncWebSocketClient *AsyncWebSocket::client(uint32_t id) {
#ifdef ESP32
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
const auto iter = std::find_if(_clients.begin(), _clients.end(), [id](const AsyncWebSocketClient &c) {
return c.id() == id && c.status() == WS_CONNECTED;
});
Expand All @@ -1046,6 +1064,9 @@ void AsyncWebSocket::close(uint32_t id, uint16_t code, const char *message) {
}

void AsyncWebSocket::closeAll(uint16_t code, const char *message) {
#ifdef ESP32
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
for (auto &c : _clients) {
if (c.status() == WS_CONNECTED) {
c.close(code, message);
Expand All @@ -1054,6 +1075,9 @@ void AsyncWebSocket::closeAll(uint16_t code, const char *message) {
}

void AsyncWebSocket::cleanupClients(uint16_t maxClients) {
#ifdef ESP32
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
const size_t c = count();
if (c > maxClients) {
async_ws_log_v("[%s] CLEANUP %" PRIu32 " (%u/%" PRIu16 ")", _url.c_str(), _clients.front().id(), c, maxClients);
Expand All @@ -1074,6 +1098,9 @@ bool AsyncWebSocket::ping(uint32_t id, const uint8_t *data, size_t len) {
}

AsyncWebSocket::SendStatus AsyncWebSocket::pingAll(const uint8_t *data, size_t len) {
#ifdef ESP32
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
size_t hit = 0;
size_t miss = 0;
for (auto &c : _clients) {
Expand Down Expand Up @@ -1182,6 +1209,9 @@ AsyncWebSocket::SendStatus AsyncWebSocket::textAll(AsyncWebSocketMessageBuffer *
}

AsyncWebSocket::SendStatus AsyncWebSocket::textAll(AsyncWebSocketSharedBuffer buffer) {
#ifdef ESP32
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
size_t hit = 0;
size_t miss = 0;
for (auto &c : _clients) {
Expand Down Expand Up @@ -1271,6 +1301,9 @@ AsyncWebSocket::SendStatus AsyncWebSocket::binaryAll(AsyncWebSocketMessageBuffer
return status;
}
AsyncWebSocket::SendStatus AsyncWebSocket::binaryAll(AsyncWebSocketSharedBuffer buffer) {
#ifdef ESP32
std::lock_guard<std::recursive_mutex> lock(_lock);
#endif
size_t hit = 0;
size_t miss = 0;
for (auto &c : _clients) {
Expand Down
2 changes: 1 addition & 1 deletion src/AsyncWebSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ class AsyncWebSocket : public AsyncWebHandler {
AwsHandshakeHandler _handshakeHandler;
bool _enabled;
#ifdef ESP32
mutable std::mutex _lock;
mutable std::recursive_mutex _lock;
#endif

public:
Expand Down
Loading