Skip to content

expose WebSocket makeBuffer() method to be publically available #8

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: yuboxfixes-0xFEEDC0DE64-cleanup
Choose a base branch
from
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -876,8 +876,11 @@ handler->setCacheControl("max-age=30");
```

### Specifying Date-Modified header
It is possible to specify Date-Modified header to enable the server to return Not-Modified (304) response for requests
with "If-Modified-Since" header with the same value, instead of responding with the actual file content.
Sever sets "Last-Modified" header automatically if FS driver supports file modification timestamps (LittleFS does).
Server returns "Not-Modified" (304) response for requests with "If-Modified-Since" header with _the same_ value as file's mod date, instead of responding with the actual file content. It does not perform date calculations checking if File's mod date is newer or later than in "If-Modified-Since" header.

For FS not supporting file timestamps (like deprecated SPIFFS) it is possible to specify Date-Modified header manually.

```cpp
// Update the date modified string every time files are updated
server.serveStatic("/", SPIFFS, "/www/").setLastModified("Mon, 20 Jun 2016 14:00:00 GMT");
Expand Down
13 changes: 3 additions & 10 deletions library.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,9 @@
"platforms": ["espressif8266", "espressif32"],
"dependencies": [
{
"owner": "me-no-dev",
"name": "ESPAsyncTCP",
"version": "^1.2.2",
"platforms": "espressif8266"
},
{
"owner": "me-no-dev",
"name": "AsyncTCP",
"version": "^1.1.1",
"platforms": "espressif32"
"owner": "yubox-node-org",
"name": "AsyncTCPSock",
"version": "https://github.com/yubox-node-org/AsyncTCPSock"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, I'm curious to understand how this library compare to the ESPHome maintained fork (https://github.com/esphome/AsyncTCP) and what it brings compare to AsyncTCP ?
I am using a fork of yubox-node-org/ESPAsyncWebServer also, but with the ESPHome maintained AsyncTCP.
Thanks!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who knows... Do not see much activity in this fork recently.
How is it works yubox-node-org/ESPAsyncWebServer with esphome/AsyncTCP?
Is it more stable? My impression was that yubox-node-org/ESPAsyncWebServer must be used with it's own version of AsyncTCPSock.
Have you tried esphome/ESPAsyncWebServer? They have added some fixes there, but yubox fork is almost a full rework.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI I am maintaining a more up to date version of the yubox-node-org fork, which is deployed in PlatformIO registry and Arduino registry.

  • It is compatible, but cleaned up regarding CI, spdiff editor, etc.
  • It is also compatible Arduino Json 7
  • It contains the few improvements made by ESPHome

The ESPHome folks decided to fork the original repo, which is IMO wrong because the yubox-node-org introduces a LOT of fixes regarding concurrency issues when using tasks especially on dual core systems. So they miss all that, especially on the WebSocket part.

I know a lot of projects depending on the yubox-node-org fork instead of the ESPHome one or original one for this reason. It adds a lot of stability and prevents corrupted heap causes by concurrent access to queues in the lib.

The version I maintain is using esphome/AsyncTCP because the ESPHome team does a great job IMO to maintain it. They fixed a couple of issues and introduced IPv6.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll take a look into your fork also! If you like I can rebase this PR onto your fork.
I also have some improvements for async webserver that I can share.

Copy link

@mathieucarbou mathieucarbou Jan 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll take a look into your fork also! If you like I can rebase this PR onto your fork. I also have some improvements for async webserver that I can share.

Yes! An adapted version of this change would definitely be interesting because sadly the api change in the buffer is not compatible with the original API. But making is compatible is really harder I think.

For the moment I have to workaround that by using feature detection like here:

https://github.com/ayushsharma82/ESP-DASH/pull/195/files#diff-b22c24b3d761e54f8997b5313d563e3b2b58217a99f7e9e95beb68ad19ba6e13R343-R351

// this fork (originally from yubox-node-org), uses another API with shared pointer that better support concurrent use cases then the original project
 #if defined(ASYNCWEBSERVER_FORK_mathieucarbou)
   auto buffer = std::make_shared<std::vector<uint8_t>>(len);
   assert(buffer);
   serializeJson(doc, buffer->data(), len);
 #else
   AsyncWebSocketMessageBuffer* buffer = _ws->makeBuffer(len);
   assert(buffer);
   serializeJson(doc, buffer->get(), len);
 #endif

The problem is that the original API returns a pointer to a buffer, which requires it to be responsible of its destruction:

https://github.com/esphome/ESPAsyncWebServer/blob/master/src/AsyncWebSocket.h#L333-L334

The change using yubox fork with a shared ptr allows a buffer created by the caller to still be referenced until all the clients have used it. His change is well explained in his commit here:

me-no-dev@4963ce9

I don't know how to make the original API compatible with the use of shared ptr.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@proddy : did you find out ? is it possible that this increase is caused by the difference values for the queue sizes ?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be honest I can't remember! I'm pretty sure it's in AsyncTCP. I'm still using the ESPHome fork with IPv6 support with CONFIG_ASYNC_TCP_STACK_SIZE set to 5120 which is enough for ESP32 (2.3k) and ESP32S3 (3.5k)

Copy link

@mathieucarbou mathieucarbou Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be honest I can't remember! I'm pretty sure it's in AsyncTCP. I'm still using the ESPHome fork with IPv6 support with CONFIG_ASYNC_TCP_STACK_SIZE set to 5120 which is enough for ESP32 (2.3k) and ESP32S3 (3.5k)

Ok. Because I ran some tests this morning:

Screenshot 2024-06-04 at 10 54 27

  • 9h57 - 10h05: AsyncTCPSock with default stack size
  • 10h06 - 10h14 : AsyncTCPSock with 4k stack size
  • Before and after: AsyncTCP with 4k stack size.

If you look from 10h35 and after: this is typically what I like to see. There is no big allocations / deallocation on heap, which avoids fragmentation. This is quite stable.

Spikes are app reloads (ESP-DASH PRo + WebSerial Pro).
This app also has mqtt publications each 5 seconds, dashboard refresh each 500ms, and log streamed to web console (websocket) in debug mode.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am running with my forks:

My AsyncTCP fork is based on ESPHome, but supports Arduino 3, Ipv6 for Arduino 3 also, and fixes some issus in the ESPHome implementation of Ipv6 (I've PR-ed them if I remember). It also include a workaround of a bug introduced in Arduino 3 in the IpAddress implementation.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am running with my forks:

My AsyncTCP fork is based on ESPHome, but supports Arduino 3, Ipv6 for Arduino 3 also, and fixes some issus in the ESPHome implementation of Ipv6 (I've PR-ed them if I remember). It also include a workaround of a bug introduced in Arduino 3 in the IpAddress implementation.

nice. I'll do some benchmarking too. Having local copies of the libraries is a nightmare to maintain so it'll be good if I switch to public libraries.

},
{
"name": "Hash",
Expand Down
94 changes: 23 additions & 71 deletions src/AsyncWebSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class AsyncWebSocketControl {
*/


AsyncWebSocketMessage::AsyncWebSocketMessage(std::shared_ptr<std::vector<uint8_t>> buffer, uint8_t opcode, bool mask) :
AsyncWebSocketMessage::AsyncWebSocketMessage(AsyncWebSocketMessageBuffer buffer, uint8_t opcode, bool mask) :
_WSbuffer{buffer},
_opcode(opcode & 0x07),
_mask{mask},
Expand Down Expand Up @@ -643,39 +643,12 @@ size_t AsyncWebSocketClient::printf_P(PGM_P formatP, ...)
}
#endif

namespace {
std::shared_ptr<std::vector<uint8_t>> makeBuffer(const uint8_t *message, size_t len)
AsyncWebSocketMessageBuffer AsyncWebSocketClient::makeBuffer(const uint8_t *message, size_t len)
{
auto buffer = std::make_shared<std::vector<uint8_t>>(len);
std::memcpy(buffer->data(), message, len);
return buffer;
}
}

void AsyncWebSocketClient::text(std::shared_ptr<std::vector<uint8_t>> buffer)
{
_queueMessage(buffer);
}

void AsyncWebSocketClient::text(const uint8_t *message, size_t len)
{
text(makeBuffer(message, len));
}

void AsyncWebSocketClient::text(const char *message, size_t len)
{
text((const uint8_t *)message, len);
}

void AsyncWebSocketClient::text(const char *message)
{
text(message, strlen(message));
}

void AsyncWebSocketClient::text(const String &message)
{
text(message.c_str(), message.length());
}

void AsyncWebSocketClient::text(const __FlashStringHelper *data)
{
Expand All @@ -698,31 +671,6 @@ void AsyncWebSocketClient::text(const __FlashStringHelper *data)
}
}

void AsyncWebSocketClient::binary(std::shared_ptr<std::vector<uint8_t>> buffer)
{
_queueMessage(buffer, WS_BINARY);
}

void AsyncWebSocketClient::binary(const uint8_t *message, size_t len)
{
binary(makeBuffer(message, len));
}

void AsyncWebSocketClient::binary(const char *message, size_t len)
{
binary((const uint8_t *)message, len);
}

void AsyncWebSocketClient::binary(const char *message)
{
binary(message, strlen(message));
}

void AsyncWebSocketClient::binary(const String &message)
{
binary(message.c_str(), message.length());
}

void AsyncWebSocketClient::binary(const __FlashStringHelper *data, size_t len)
{
PGM_P p = reinterpret_cast<PGM_P>(data);
Expand All @@ -737,7 +685,7 @@ void AsyncWebSocketClient::binary(const __FlashStringHelper *data, size_t len)
IPAddress AsyncWebSocketClient::remoteIP() const
{
if (!_client)
return IPAddress(0U);
return IPAddress((uint32_t)0);

return _client->remoteIP();
}
Expand Down Expand Up @@ -850,23 +798,15 @@ void AsyncWebSocket::pingAll(const uint8_t *data, size_t len)
c.ping(data, len);
}

void AsyncWebSocket::text(uint32_t id, AsyncWebSocketMessageBuffer message){
if (AsyncWebSocketClient * c = client(id))
c->message(message);
};
void AsyncWebSocket::text(uint32_t id, const uint8_t *message, size_t len)
{
if (AsyncWebSocketClient * c = client(id))
c->text(makeBuffer(message, len));
}
void AsyncWebSocket::text(uint32_t id, const char *message, size_t len)
{
text(id, (const uint8_t *)message, len);
}
void AsyncWebSocket::text(uint32_t id, const char * message)
{
text(id, message, strlen(message));
}
void AsyncWebSocket::text(uint32_t id, const String &message)
{
text(id, message.c_str(), message.length());
}
void AsyncWebSocket::text(uint32_t id, const __FlashStringHelper *data)
{
PGM_P p = reinterpret_cast<PGM_P>(data);
Expand All @@ -889,7 +829,7 @@ void AsyncWebSocket::text(uint32_t id, const __FlashStringHelper *data)
}
}

void AsyncWebSocket::textAll(std::shared_ptr<std::vector<uint8_t>> buffer)
void AsyncWebSocket::textAll(AsyncWebSocketMessageBuffer buffer)
{
for (auto &c : _clients)
if (c.status() == WS_CONNECTED)
Expand Down Expand Up @@ -932,6 +872,10 @@ void AsyncWebSocket::textAll(const __FlashStringHelper *data)
}
}

void AsyncWebSocket::binary(uint32_t id, AsyncWebSocketMessageBuffer message){
if (AsyncWebSocketClient * c = client(id))
c->binary(message);
};
void AsyncWebSocket::binary(uint32_t id, const uint8_t *message, size_t len)
{
if (AsyncWebSocketClient *c = client(id))
Expand Down Expand Up @@ -961,7 +905,7 @@ void AsyncWebSocket::binary(uint32_t id, const __FlashStringHelper *data, size_t
}
}

void AsyncWebSocket::binaryAll(std::shared_ptr<std::vector<uint8_t>> buffer)
void AsyncWebSocket::binaryAll(AsyncWebSocketMessageBuffer buffer)
{
for (auto &c : _clients)
if (c.status() == WS_CONNECTED)
Expand Down Expand Up @@ -1022,7 +966,7 @@ size_t AsyncWebSocket::printfAll(const char *format, ...)
va_end(arg);
delete[] temp;

std::shared_ptr<std::vector<uint8_t>> buffer = std::make_shared<std::vector<uint8_t>>(len);
AsyncWebSocketMessageBuffer buffer = std::make_shared<std::vector<uint8_t>>(len);

va_start(arg, format);
vsnprintf( (char *)buffer->data(), len + 1, format, arg);
Expand Down Expand Up @@ -1058,7 +1002,7 @@ size_t AsyncWebSocket::printfAll_P(PGM_P formatP, ...)
va_end(arg);
delete[] temp;

std::shared_ptr<std::vector<uint8_t>> buffer = std::make_shared<std::vector<uint8_t>>(len + 1);
AsyncWebSocketMessageBuffer buffer = std::make_shared<std::vector<uint8_t>>(len + 1);

va_start(arg, formatP);
vsnprintf_P((char *)buffer->data(), len + 1, formatP, arg);
Expand Down Expand Up @@ -1141,6 +1085,13 @@ void AsyncWebSocket::handleRequest(AsyncWebServerRequest *request)
request->send(response);
}

AsyncWebSocketMessageBuffer AsyncWebSocket::makeBuffer(const uint8_t *message, size_t len)
{
auto buffer = std::make_shared<std::vector<uint8_t>>(len);
std::memcpy(buffer->data(), message, len);
return buffer;
}

/*
* Response to Web Socket request - sends the authorization and detaches the TCP Client from the web server
* Authentication code from https://github.com/Links2004/arduinoWebSockets/blob/master/src/WebSockets.cpp#L480
Expand Down Expand Up @@ -1208,3 +1159,4 @@ size_t AsyncWebSocketResponse::_ack(AsyncWebServerRequest *request, size_t len,

return 0;
}

78 changes: 61 additions & 17 deletions src/AsyncWebSocket.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@
#define DEFAULT_MAX_WS_CLIENTS 4
#endif

using AsyncWebSocketMessageBuffer = std::shared_ptr<std::vector<uint8_t>>;

class AsyncWebSocket;
class AsyncWebSocketResponse;
class AsyncWebSocketClient;
Expand Down Expand Up @@ -84,6 +86,7 @@ typedef enum { WS_CONTINUATION, WS_TEXT, WS_BINARY, WS_DISCONNECT = 0x08, WS_PIN
typedef enum { WS_MSG_SENDING, WS_MSG_SENT, WS_MSG_ERROR } AwsMessageStatus;
typedef enum { WS_EVT_CONNECT, WS_EVT_DISCONNECT, WS_EVT_PONG, WS_EVT_ERROR, WS_EVT_DATA } AwsEventType;


class AsyncWebSocketMessage
{
private:
Expand Down Expand Up @@ -161,7 +164,26 @@ class AsyncWebSocketClient {
}

//data packets
void message(std::shared_ptr<std::vector<uint8_t>> buffer, uint8_t opcode=WS_TEXT, bool mask=false) { _queueMessage(buffer, opcode, mask); }
/**
* @brief allocate memory buffer owned by shared-pointer and copy provided data
* used to keep the data untill websocket send is complete for single/multiple clients
*
* @param message
* @param len
* @return AsyncWebSocketMessageBuffer
*/
AsyncWebSocketMessageBuffer makeBuffer(const uint8_t *message, size_t len);

/**
* @brief allocate empty memory buffer owned by shared-pointer
* used to keep the data untill websocket send is complete for single/multiple clients
*
* @param len
* @return AsyncWebSocketMessageBuffer
*/
inline AsyncWebSocketMessageBuffer makeBuffer(size_t len){ return std::make_shared<std::vector<uint8_t>>(len); };

void message(AsyncWebSocketMessageBuffer buffer, uint8_t opcode=WS_TEXT, bool mask=false) { _queueMessage(buffer, opcode, mask); }
bool queueIsFull() const;
size_t queueLen() const;

Expand All @@ -170,18 +192,18 @@ class AsyncWebSocketClient {
size_t printf_P(PGM_P formatP, ...) __attribute__ ((format (printf, 2, 3)));
#endif

void text(std::shared_ptr<std::vector<uint8_t>> buffer);
void text(const uint8_t *message, size_t len);
void text(const char *message, size_t len);
void text(const char *message);
void text(const String &message);
inline void text(AsyncWebSocketMessageBuffer buffer){ _queueMessage(buffer); };
inline void text(const uint8_t *message, size_t len){ text(makeBuffer(message, len)); };
inline void text(const char *message, size_t len){ text((const uint8_t *)message, len); };
inline void text(const char *message){ text(message, strlen(message)); };
inline void text(const String &message){ text(message.c_str(), message.length()); };
void text(const __FlashStringHelper *message);

void binary(std::shared_ptr<std::vector<uint8_t>> buffer);
void binary(const uint8_t *message, size_t len);
void binary(const char * message, size_t len);
void binary(const char * message);
void binary(const String &message);
inline void binary(AsyncWebSocketMessageBuffer buffer){ _queueMessage(buffer, WS_BINARY); };
inline void binary(const uint8_t *message, size_t len){ binary(makeBuffer(message, len)); };
inline void binary(const char * message, size_t len){ binary((const uint8_t *)message, len); };
inline void binary(const char * message){ binary(message, strlen(message)); };
inline void binary(const String &message){ binary(message.c_str(), message.length()); };
void binary(const __FlashStringHelper *message, size_t len);

bool canSend() const;
Expand All @@ -205,7 +227,7 @@ class AsyncWebSocket: public AsyncWebHandler {
std::list<AsyncWebSocketClient> _clients;
uint32_t _cNextId;
AwsEventHandler _eventHandler;
AwsHandshakeHandler _handshakeHandler;
AwsHandshakeHandler _handshakeHandler;
bool _enabled;
AsyncWebLock _lock;

Expand All @@ -229,26 +251,48 @@ class AsyncWebSocket: public AsyncWebHandler {
void ping(uint32_t id, const uint8_t *data=NULL, size_t len=0);
void pingAll(const uint8_t *data=NULL, size_t len=0); // done

//data packets
/**
* @brief allocate memory buffer owned by shared-pointer and copy provided data
* used to keep the data untill websocket send is complete for single/multiple clients
*
* @param message
* @param len
* @return AsyncWebSocketMessageBuffer
*/
AsyncWebSocketMessageBuffer makeBuffer(const uint8_t *message, size_t len);

/**
* @brief allocate empty memory buffer owned by shared-pointer
* used to keep the data untill websocket send is complete for single/multiple clients
*
* @param len
* @return AsyncWebSocketMessageBuffer
*/
inline AsyncWebSocketMessageBuffer makeBuffer(size_t len){ return std::make_shared<std::vector<uint8_t>>(len); };

void text(uint32_t id, AsyncWebSocketMessageBuffer message);
void text(uint32_t id, const uint8_t * message, size_t len);
void text(uint32_t id, const char *message, size_t len);
void text(uint32_t id, const char *message);
void text(uint32_t id, const String &message);
inline void text(uint32_t id, const char *message, size_t len){ text(id, (const uint8_t *)message, len); };
inline void text(uint32_t id, const char *message){ text(id, message, strlen(message)); };
inline void text(uint32_t id, const String &message){ text(id, message.c_str(), message.length()); };
void text(uint32_t id, const __FlashStringHelper *message);

void textAll(std::shared_ptr<std::vector<uint8_t>> buffer);
void textAll(AsyncWebSocketMessageBuffer buffer);
void textAll(const uint8_t *message, size_t len);
void textAll(const char * message, size_t len);
void textAll(const char * message);
void textAll(const String &message);
void textAll(const __FlashStringHelper *message); // need to convert

void binary(uint32_t id, AsyncWebSocketMessageBuffer message);
void binary(uint32_t id, const uint8_t *message, size_t len);
void binary(uint32_t id, const char *message, size_t len);
void binary(uint32_t id, const char *message);
void binary(uint32_t id, const String &message);
void binary(uint32_t id, const __FlashStringHelper *message, size_t len);

void binaryAll(std::shared_ptr<std::vector<uint8_t>> buffer);
void binaryAll(AsyncWebSocketMessageBuffer buffer);
void binaryAll(const uint8_t *message, size_t len);
void binaryAll(const char *message, size_t len);
void binaryAll(const char *message);
Expand Down
13 changes: 10 additions & 3 deletions src/WebHandlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/
#include "ESPAsyncWebServer.h"
#include "WebHandlerImpl.h"
#include <ctime>

AsyncStaticWebHandler::AsyncStaticWebHandler(const char* uri, FS& fs, const char* path, const char* cache_control)
: _fs(fs), _uri(uri), _path(path), _default_file(F("index.htm")), _cache_control(cache_control), _last_modified(), _callback(nullptr)
Expand Down Expand Up @@ -199,15 +200,21 @@ uint8_t AsyncStaticWebHandler::_countBits(const uint8_t value) const
void AsyncStaticWebHandler::handleRequest(AsyncWebServerRequest *request)
{
// Get the filename from request->_tempObject and free it
String filename = String((char*)request->_tempObject);
String filename((char*)request->_tempObject);
free(request->_tempObject);
request->_tempObject = NULL;
if((_username.length() && _password.length()) && !request->authenticate(_username.c_str(), _password.c_str()))
return request->requestAuthentication();

if (request->_tempFile == true) {
String etag = String(request->_tempFile.size());
if (_last_modified.length() && _last_modified == request->header(F("If-Modified-Since"))) {
time_t lw = request->_tempFile.getLastWrite(); // get last file mod time (if supported by FS)
if (lw) {
char datetime[std::size("Fri, 27 Jan 2023 15:50:27 GMT")];
std::strftime(std::data(datetime), std::size(datetime), "%a, %d %b %Y %H:%M:%S GMT", std::gmtime(&lw));
_last_modified = datetime;
}
String etag(request->_tempFile.size());
if (_last_modified.length() && _last_modified == request->header("If-Modified-Since")) {
request->_tempFile.close();
request->send(304); // Not modified
} else if (_cache_control.length() && request->hasHeader(F("If-None-Match")) && request->header(F("If-None-Match")).equals(etag)) {
Expand Down