Skip to content

Commit 0948167

Browse files
authored
Merge pull request #104 from c-jimenez/fix/websocket_fragmentation
[websocket] Handle fragmented websocket frames
2 parents 23143ce + 5a48d6b commit 0948167

File tree

4 files changed

+188
-7
lines changed

4 files changed

+188
-7
lines changed

src/websockets/libwebsockets/LibWebsocketClient.cpp

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,18 @@ LibWebsocketClient::LibWebsocketClient()
4949
m_wsi(nullptr),
5050
m_retry_policy(),
5151
m_retry_count(0),
52-
m_send_msgs()
52+
m_send_msgs(),
53+
m_fragmented_frame(nullptr),
54+
m_fragmented_frame_size(0),
55+
m_fragmented_frame_index(0)
5356
{
5457
}
5558
/** @brief Destructor */
5659
LibWebsocketClient::~LibWebsocketClient()
5760
{
5861
// To prevent keeping an open connection in background
5962
disconnect();
63+
releaseFragmentedFrame();
6064
}
6165

6266
/** @copydoc bool IWebsocketClient::connect(const std::string&, const std::string&, const Credentials&,
@@ -264,6 +268,38 @@ void LibWebsocketClient::process()
264268
lws_context_destroy(context);
265269
}
266270

271+
/** @brief Prepare the buffer to store a new fragmented frame */
272+
void LibWebsocketClient::beginFragmentedFrame(size_t frame_size)
273+
{
274+
// Release previously allocated data
275+
releaseFragmentedFrame();
276+
277+
// Allocate new buffer
278+
m_fragmented_frame = new uint8_t[frame_size];
279+
m_fragmented_frame_size = frame_size;
280+
}
281+
282+
/** @brief Append data to the fragmented frame */
283+
void LibWebsocketClient::appendFragmentedData(const void* data, size_t size)
284+
{
285+
size_t copy_len = size;
286+
if ((m_fragmented_frame_index + size) >= m_fragmented_frame_size)
287+
{
288+
copy_len = m_fragmented_frame_size - m_fragmented_frame_index;
289+
}
290+
memcpy(&m_fragmented_frame[m_fragmented_frame_index], data, copy_len);
291+
m_fragmented_frame_index += copy_len;
292+
}
293+
294+
/** @brief Release the memory associated with the fragmented frame */
295+
void LibWebsocketClient::releaseFragmentedFrame()
296+
{
297+
delete[] m_fragmented_frame;
298+
m_fragmented_frame = nullptr;
299+
m_fragmented_frame_size = 0;
300+
m_fragmented_frame_index = 0;
301+
}
302+
267303
/** @brief libwebsockets connection callback */
268304
void LibWebsocketClient::connectCallback(struct lws_sorted_usec_list* sul) noexcept
269305
{
@@ -380,8 +416,40 @@ int LibWebsocketClient::eventCallback(struct lws* wsi, enum lws_callback_reasons
380416
break;
381417

382418
case LWS_CALLBACK_CLIENT_RECEIVE:
383-
client->m_listener->wsClientDataReceived(in, len);
384-
break;
419+
{
420+
if (client->m_listener)
421+
{
422+
// Get frame info
423+
bool is_first = (lws_is_first_fragment(wsi) == 1);
424+
bool is_last = (lws_is_final_fragment(wsi) == 1);
425+
size_t remaining_length = lws_remaining_packet_payload(wsi);
426+
if (is_first && is_last)
427+
{
428+
// Notify client
429+
client->m_listener->wsClientDataReceived(in, len);
430+
}
431+
else if (is_first)
432+
{
433+
// Prepare frame bufferization
434+
client->beginFragmentedFrame(len + remaining_length);
435+
client->appendFragmentedData(in, len);
436+
}
437+
else
438+
{
439+
// Bufferize data
440+
client->appendFragmentedData(in, len);
441+
if (is_last)
442+
{
443+
// Notify client
444+
client->m_listener->wsClientDataReceived(client->m_fragmented_frame, client->m_fragmented_frame_size);
445+
446+
// Release resources
447+
client->releaseFragmentedFrame();
448+
}
449+
}
450+
}
451+
}
452+
break;
385453

386454
case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
387455
{

src/websockets/libwebsockets/LibWebsocketClient.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,23 @@ class LibWebsocketClient : public IWebsocketClient
121121
/** @brief Queue of messages to send */
122122
ocpp::helpers::Queue<SendMsg*> m_send_msgs;
123123

124+
/** @brief Buffer to store fragmented frames */
125+
uint8_t* m_fragmented_frame;
126+
/** @brief Size of the fragmented frame */
127+
size_t m_fragmented_frame_size;
128+
/** @brief Current index in the fragmented frame */
129+
size_t m_fragmented_frame_index;
130+
124131
/** @brief Internal thread */
125132
void process();
126133

134+
/** @brief Prepare the buffer to store a new fragmented frame */
135+
void beginFragmentedFrame(size_t frame_size);
136+
/** @brief Append data to the fragmented frame */
137+
void appendFragmentedData(const void* data, size_t size);
138+
/** @brief Release the memory associated with the fragmented frame */
139+
void releaseFragmentedFrame();
140+
127141
/** @brief libwebsockets connection callback */
128142
static void connectCallback(struct lws_sorted_usec_list* sul) noexcept;
129143
/** @brief libwebsockets event callback */

src/websockets/libwebsockets/LibWebsocketServer.cpp

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -511,11 +511,36 @@ int LibWebsocketServer::eventCallback(struct lws* wsi, enum lws_callback_reasons
511511
if (iter_client != server->m_clients.end())
512512
{
513513
Client* client = dynamic_cast<Client*>(iter_client->second.get());
514-
515-
// Notify client
516514
if (client->m_listener)
517515
{
518-
client->m_listener->wsClientDataReceived(in, len);
516+
// Get frame info
517+
bool is_first = (lws_is_first_fragment(wsi) == 1);
518+
bool is_last = (lws_is_final_fragment(wsi) == 1);
519+
size_t remaining_length = lws_remaining_packet_payload(wsi);
520+
if (is_first && is_last)
521+
{
522+
// Notify client
523+
client->m_listener->wsClientDataReceived(in, len);
524+
}
525+
else if (is_first)
526+
{
527+
// Prepare frame bufferization
528+
client->beginFragmentedFrame(len + remaining_length);
529+
client->appendFragmentedData(in, len);
530+
}
531+
else
532+
{
533+
// Bufferize data
534+
client->appendFragmentedData(in, len);
535+
if (is_last)
536+
{
537+
// Notify client
538+
client->m_listener->wsClientDataReceived(client->getFragmentedFrame(), client->getFragmentedFrameSize());
539+
540+
// Release resources
541+
client->releaseFragmentedFrame();
542+
}
543+
}
519544
}
520545
}
521546
}
@@ -530,13 +555,21 @@ int LibWebsocketServer::eventCallback(struct lws* wsi, enum lws_callback_reasons
530555

531556
/** @brief Constructor */
532557
LibWebsocketServer::Client::Client(struct lws* wsi, const char* ip_address)
533-
: m_wsi(wsi), m_ip_address(ip_address), m_connected(true), m_listener(nullptr), m_send_msgs()
558+
: m_wsi(wsi),
559+
m_ip_address(ip_address),
560+
m_connected(true),
561+
m_listener(nullptr),
562+
m_send_msgs(),
563+
m_fragmented_frame(nullptr),
564+
m_fragmented_frame_size(0),
565+
m_fragmented_frame_index(0)
534566
{
535567
}
536568
/** @brief Destructor */
537569
LibWebsocketServer::Client::~Client()
538570
{
539571
disconnect(true);
572+
releaseFragmentedFrame();
540573
}
541574

542575
/** @copydoc const std::string& IClient::ipAddress(bool) const */
@@ -606,5 +639,37 @@ void LibWebsocketServer::Client::registerListener(IClient::IListener& listener)
606639
m_listener = &listener;
607640
}
608641

642+
/** @brief Prepare the buffer to store a new fragmented frame */
643+
void LibWebsocketServer::Client::beginFragmentedFrame(size_t frame_size)
644+
{
645+
// Release previously allocated data
646+
releaseFragmentedFrame();
647+
648+
// Allocate new buffer
649+
m_fragmented_frame = new uint8_t[frame_size];
650+
m_fragmented_frame_size = frame_size;
651+
}
652+
653+
/** @brief Append data to the fragmented frame */
654+
void LibWebsocketServer::Client::appendFragmentedData(const void* data, size_t size)
655+
{
656+
size_t copy_len = size;
657+
if ((m_fragmented_frame_index + size) >= m_fragmented_frame_size)
658+
{
659+
copy_len = m_fragmented_frame_size - m_fragmented_frame_index;
660+
}
661+
memcpy(&m_fragmented_frame[m_fragmented_frame_index], data, copy_len);
662+
m_fragmented_frame_index += copy_len;
663+
}
664+
665+
/** @brief Release the memory associated with the fragmented frame */
666+
void LibWebsocketServer::Client::releaseFragmentedFrame()
667+
{
668+
delete[] m_fragmented_frame;
669+
m_fragmented_frame = nullptr;
670+
m_fragmented_frame_size = 0;
671+
m_fragmented_frame_index = 0;
672+
}
673+
609674
} // namespace websockets
610675
} // namespace ocpp

src/websockets/libwebsockets/LibWebsocketServer.h

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,34 @@ class LibWebsocketServer : public IWebsocketServer
110110
/** @copydoc bool IClient::registerListener(IListener&) */
111111
void registerListener(IClient::IListener& listener) override;
112112

113+
/**
114+
* @brief Get the size in bytes of the fragmented frame
115+
* @return Size in bytes of the fragmented frame
116+
*/
117+
size_t getFragmentedFrameSize() const { return m_fragmented_frame_size; }
118+
119+
/**
120+
* @brief Get the fragmented frame
121+
* @return Fragmented frame
122+
*/
123+
const void* getFragmentedFrame() const { return m_fragmented_frame; }
124+
125+
/**
126+
* @brief Prepare the buffer to store a new fragmented frame
127+
* @param frame_size Size of the fragmented frame in bytes
128+
*/
129+
void beginFragmentedFrame(size_t frame_size);
130+
131+
/**
132+
* @brief Append data to the fragmented frame
133+
* @param data Data to append
134+
* @param size Size of the data in bytes
135+
*/
136+
void appendFragmentedData(const void* data, size_t size);
137+
138+
/** @brief Release the memory associated with the fragmented frame */
139+
void releaseFragmentedFrame();
140+
113141
private:
114142
/** @brief Client socket */
115143
struct lws* m_wsi;
@@ -121,6 +149,12 @@ class LibWebsocketServer : public IWebsocketServer
121149
IClient::IListener* m_listener;
122150
/** @brief Queue of messages to send */
123151
ocpp::helpers::Queue<SendMsg*> m_send_msgs;
152+
/** @brief Buffer to store fragmented frames */
153+
uint8_t* m_fragmented_frame;
154+
/** @brief Size of the fragmented frame */
155+
size_t m_fragmented_frame_size;
156+
/** @brief Current index in the fragmented frame */
157+
size_t m_fragmented_frame_index;
124158
};
125159

126160
/** @brief Listener */

0 commit comments

Comments
 (0)