Skip to content

Commit 16a926d

Browse files
committed
WebSockets: Try to handle incomplete sends
1 parent 62f1972 commit 16a926d

File tree

2 files changed

+66
-46
lines changed

2 files changed

+66
-46
lines changed

src/AsyncWebSocket.cpp

Lines changed: 63 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -31,43 +31,32 @@
3131

3232
#define MAX_PRINTF_LEN 64
3333

34-
size_t webSocketSendFrameWindow(AsyncClient *client){
34+
#define MAX_HEADER_SIZE 8
35+
36+
// Return a guess at the maximum packet size we can send
37+
static size_t webSocketSendFrameWindow(AsyncClient *client){
3538
if(!client->canSend())
3639
return 0;
3740
size_t space = client->space();
38-
if(space < 9)
41+
if(space <= MAX_HEADER_SIZE)
3942
return 0;
40-
return space - 8;
43+
// TODO - consider if we have enough contiguous RAM to allocate
44+
return space - MAX_HEADER_SIZE;
45+
}
46+
47+
static size_t webSocketHeaderLength(bool mask, size_t len) {
48+
return ((len < 126)?2:4) + (mask * 4);
4149
}
4250

43-
size_t webSocketSendFrame(AsyncClient *client, bool final, uint8_t opcode, bool mask, uint8_t *data, size_t len){
51+
static size_t webSocketSendFrameHeader(AsyncClient *client, bool final, uint8_t opcode, uint8_t mask_buf[4], size_t len){
4452
if(!client->canSend())
4553
return 0;
46-
size_t space = client->space();
47-
if(space < 2)
48-
return 0;
49-
uint8_t mbuf[4] = {0,0,0,0};
50-
uint8_t headLen = 2;
51-
if(len && mask){
52-
headLen += 4;
53-
mbuf[0] = rand() % 0xFF;
54-
mbuf[1] = rand() % 0xFF;
55-
mbuf[2] = rand() % 0xFF;
56-
mbuf[3] = rand() % 0xFF;
57-
}
58-
if(len > 125)
59-
headLen += 2;
60-
if(space < headLen)
61-
return 0;
62-
space -= headLen;
6354

64-
if(len > space) len = space;
55+
uint8_t buf[8]; // header buffer
56+
uint8_t headLen = webSocketHeaderLength(mask_buf != nullptr, len);
6557

66-
uint8_t *buf = (uint8_t*)malloc(headLen);
67-
if(buf == NULL){
68-
//os_printf("could not malloc %u bytes for frame header\n", headLen);
58+
if(client->space() < headLen)
6959
return 0;
70-
}
7160

7261
buf[0] = opcode & 0x0F;
7362
if(final)
@@ -79,33 +68,41 @@ size_t webSocketSendFrame(AsyncClient *client, bool final, uint8_t opcode, bool
7968
buf[2] = (uint8_t)((len >> 8) & 0xFF);
8069
buf[3] = (uint8_t)(len & 0xFF);
8170
}
82-
if(len && mask){
71+
if(len && mask_buf){
8372
buf[1] |= 0x80;
84-
memcpy(buf + (headLen - 4), mbuf, 4);
73+
74+
for (int i = 0; i < 4; ++i) {
75+
buf[headLen-4+i] = mask_buf[i] = rand() % 0xFF;
76+
}
8577
}
86-
if(client->add((const char *)buf, headLen) != headLen){
78+
79+
size_t sent = client->add((const char *)buf, headLen);
80+
if(sent != headLen){
8781
//os_printf("error adding %lu header bytes\n", headLen);
88-
free(buf);
82+
// we are in BIG trouble as we don't cache the headers...!
83+
// TODO: might be better to close the connection here
84+
}
85+
return sent;
86+
}
87+
88+
static size_t webSocketSendFrame(AsyncClient *client, bool final, uint8_t opcode, bool mask, uint8_t *data, size_t len){
89+
uint8_t mbuf[4];
90+
if (webSocketSendFrameHeader(client, final, opcode, mask ? mbuf : nullptr, len) == 0) {
8991
return 0;
9092
}
91-
free(buf);
9293

94+
size_t added = 0;
9395
if(len){
96+
// Apply the mask
9497
if(len && mask){
9598
size_t i;
9699
for(i=0;i<len;i++)
97100
data[i] = data[i] ^ mbuf[i%4];
98-
}
99-
if(client->add((const char *)data, len) != len){
100-
//os_printf("error adding %lu data bytes\n", len);
101-
return 0;
102-
}
101+
}
102+
added = client->add((const char *)data, len);
103103
}
104-
if(!client->send()){
105-
//os_printf("error sending frame: %lu\n", headLen+len);
106-
return 0;
107-
}
108-
return len;
104+
client->send();
105+
return added;
109106
}
110107

111108

@@ -159,6 +156,7 @@ class AsyncWebSocketControl {
159156

160157
AsyncWebSocketBasicMessage::AsyncWebSocketBasicMessage(const char * data, size_t len, uint8_t opcode, bool mask)
161158
:_len(len)
159+
,_attempted(0)
162160
,_sent(0)
163161
,_ack(0)
164162
,_acked(0)
@@ -177,6 +175,7 @@ AsyncWebSocketBasicMessage::AsyncWebSocketBasicMessage(const char * data, size_t
177175
}
178176
AsyncWebSocketBasicMessage::AsyncWebSocketBasicMessage(uint8_t opcode, bool mask)
179177
:_len(0)
178+
,_attempted(0)
180179
,_sent(0)
181180
,_ack(0)
182181
,_acked(0)
@@ -215,6 +214,13 @@ AsyncWebSocketBasicMessage::~AsyncWebSocketBasicMessage() {
215214
_status = WS_MSG_ERROR;
216215
return 0;
217216
}
217+
if (_sent < _attempted) {
218+
// Frame was truncated
219+
size_t sent = client->write((const char*)(_data + _sent), _attempted - _sent);
220+
_ack += sent;
221+
_sent += sent;
222+
return sent;
223+
}
218224

219225
size_t toSend = _len - _sent;
220226
size_t window = webSocketSendFrameWindow(client);
@@ -223,8 +229,9 @@ AsyncWebSocketBasicMessage::~AsyncWebSocketBasicMessage() {
223229
toSend = window;
224230
}
225231

232+
_attempted += toSend;
226233
_sent += toSend;
227-
_ack += toSend + ((toSend < 126)?2:4) + (_mask * 4);
234+
_ack += toSend + webSocketHeaderLength(_mask, toSend);
228235

229236
bool final = (_sent == _len);
230237
uint8_t* dPtr = (uint8_t*)(_data + (_sent - toSend));
@@ -233,8 +240,9 @@ AsyncWebSocketBasicMessage::~AsyncWebSocketBasicMessage() {
233240
size_t sent = webSocketSendFrame(client, final, opCode, _mask, dPtr, toSend);
234241
_status = WS_MSG_SENDING;
235242
if(toSend && sent != toSend){
236-
_sent -= (toSend - sent);
243+
_attempted -= (toSend - sent);
237244
_ack -= (toSend - sent);
245+
// TODO - what if header never sent
238246
}
239247
return sent;
240248
}
@@ -259,7 +267,8 @@ AsyncWebSocketBasicMessage::~AsyncWebSocketBasicMessage() {
259267

260268

261269
AsyncWebSocketMultiMessage::AsyncWebSocketMultiMessage(AsyncWebSocketSharedBuffer buffer, uint8_t opcode, bool mask)
262-
:_sent(0)
270+
:_attempted(0)
271+
,_sent(0)
263272
,_ack(0)
264273
,_acked(0)
265274
,_WSbuffer(std::move(buffer))
@@ -302,6 +311,13 @@ AsyncWebSocketMultiMessage::~AsyncWebSocketMultiMessage() {
302311
//ets_printf("E: %u > %u\n", _sent, _len);
303312
return 0;
304313
}
314+
if (_sent < _attempted) {
315+
// Frame was truncated
316+
size_t sent = client->write(_WSbuffer.data() + _sent, _attempted - _sent);
317+
_ack += sent;
318+
_sent += sent;
319+
return sent;
320+
}
305321

306322
size_t toSend = _WSbuffer.size() - _sent;
307323
size_t window = webSocketSendFrameWindow(client);
@@ -310,8 +326,9 @@ AsyncWebSocketMultiMessage::~AsyncWebSocketMultiMessage() {
310326
toSend = window;
311327
}
312328

329+
_attempted += toSend;
313330
_sent += toSend;
314-
_ack += toSend + ((toSend < 126)?2:4) + (_mask * 4);
331+
_ack += toSend + webSocketHeaderLength(_mask, toSend);
315332

316333
//ets_printf("W: %u %u\n", _sent - toSend, toSend);
317334

@@ -325,6 +342,7 @@ AsyncWebSocketMultiMessage::~AsyncWebSocketMultiMessage() {
325342
//ets_printf("E: %u != %u\n", toSend, sent);
326343
_sent -= (toSend - sent);
327344
_ack -= (toSend - sent);
345+
// TODO - what if header never sent
328346
}
329347
//ets_printf("S: %u %u\n", _sent, sent);
330348
return sent;

src/AsyncWebSocket.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ class AsyncWebSocketMessage {
137137
class AsyncWebSocketBasicMessage: public AsyncWebSocketMessage {
138138
private:
139139
size_t _len;
140-
size_t _sent;
140+
size_t _attempted;
141+
size_t _sent;
141142
size_t _ack;
142143
size_t _acked;
143144
uint8_t * _data;
@@ -152,6 +153,7 @@ class AsyncWebSocketBasicMessage: public AsyncWebSocketMessage {
152153

153154
class AsyncWebSocketMultiMessage: public AsyncWebSocketMessage {
154155
private:
156+
size_t _attempted;
155157
size_t _sent;
156158
size_t _ack;
157159
size_t _acked;

0 commit comments

Comments
 (0)