Skip to content

Commit 5183404

Browse files
committed
Cleanup and finalizing compression
1 parent 4444c69 commit 5183404

File tree

5 files changed

+84
-48
lines changed

5 files changed

+84
-48
lines changed

Test/main.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -524,12 +524,12 @@ int main(int argc, char *argv[])
524524
testgetline();
525525
wssautobahntest();
526526
std::this_thread::sleep_for(1s);
527-
/* generaltest();
527+
generaltest();
528528
std::this_thread::sleep_for(1s);
529529
generalTLStest();
530530
std::this_thread::sleep_for(1s);
531531
multithreadtest();
532532
std::this_thread::sleep_for(1s);
533-
multithreadthroughputtest();*/
533+
multithreadthroughputtest();
534534
return 0;
535535
}

include/WS_Lite.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,10 @@
22
#include <chrono>
33
#include <functional>
44
#include <memory>
5-
#include <string.h>
65
#include <string>
76
#include <system_error>
87
#include <unordered_map>
9-
#include <vector>
8+
109
typedef struct x509_store_ctx_st X509_STORE_CTX;
1110

1211
#if defined(WINDOWS) || defined(WIN32)
@@ -853,7 +852,6 @@ namespace WS_LITE {
853852
*/
854853
virtual std::error_code set_password_callback(const std::function<std::string(std::size_t, password_purpose)> &callback,
855854
std::error_code &ec) = 0;
856-
857-
}; // namespace WS_LITE
855+
};
858856
} // namespace WS_LITE
859857
} // namespace SL

include/internal/WebSocket.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ namespace WS_LITE {
1818
WSMessage msg;
1919
CompressionOptions compressmessage;
2020
};
21-
struct WebSocketContext;
21+
class WebSocketContext;
2222
template <bool isServer, class SOCKETTYPE> class WebSocket final : public IWebSocket {
2323

2424
public:

include/internal/WebSocketContext.h

Lines changed: 58 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -8,50 +8,79 @@
88

99
namespace SL {
1010
namespace WS_LITE {
11-
const size_t LARGE_BUFFER_SIZE = 300 * 1024;
11+
const size_t LARGE_BUFFER_SIZE = 4 * 1024 * 1024; // 4 MB temp buffer
1212
class IWebSocket;
1313
struct HttpHeader;
1414
struct WSMessage;
15-
struct WebSocketContext {
15+
class WebSocketContext {
16+
unsigned char *InflateBuffer = nullptr;
17+
size_t InflateBufferSize = 0;
18+
std::unique_ptr<unsigned char[]> TempInflateBuffer;
19+
z_stream InflationStream = {};
20+
auto returnemptyinflate()
21+
{
22+
unsigned char *p = nullptr;
23+
size_t o = 0;
24+
return std::make_tuple(p, o);
25+
}
1626

17-
WebSocketContext() : inflationBuffer(std::make_unique<char[]>(LARGE_BUFFER_SIZE)) { inflateInit2(&inflationStream, -MAX_WBITS); }
18-
~WebSocketContext() { inflateEnd(&inflationStream); }
19-
auto inflate(unsigned char *data, size_t data_len)
27+
public:
28+
WebSocketContext()
29+
{
30+
TempInflateBuffer = std::make_unique<unsigned char[]>(MaxPayload);
31+
inflateInit2(&InflationStream, -MAX_WBITS);
32+
}
33+
~WebSocketContext() { inflateEnd(&InflationStream); }
34+
auto beginInflate()
35+
{
36+
InflateBufferSize = 0;
37+
free(InflateBuffer);
38+
InflateBuffer = nullptr;
39+
}
40+
auto Inflate(unsigned char *data, size_t data_len)
2041
{
21-
dynamicInflationBuffer.clear();
22-
inflationStream.next_in = (Bytef *)data;
23-
inflationStream.avail_in = data_len;
42+
InflationStream.next_in = (Bytef *)data;
43+
InflationStream.avail_in = data_len;
2444

2545
int err;
2646
do {
27-
inflationStream.next_out = (Bytef *)inflationBuffer.get();
28-
inflationStream.avail_out = LARGE_BUFFER_SIZE;
29-
err = ::inflate(&inflationStream, Z_FINISH);
30-
if (!inflationStream.avail_in) {
47+
InflationStream.next_out = (Bytef *)TempInflateBuffer.get();
48+
InflationStream.avail_out = LARGE_BUFFER_SIZE;
49+
err = ::inflate(&InflationStream, Z_FINISH);
50+
if (!InflationStream.avail_in) {
3151
break;
3252
}
53+
auto growsize = LARGE_BUFFER_SIZE - InflationStream.avail_out;
54+
InflateBufferSize += growsize;
55+
auto beforesize = InflateBufferSize;
56+
InflateBuffer = static_cast<unsigned char *>(realloc(InflateBuffer, InflateBufferSize));
57+
if (!InflateBuffer) {
58+
SL_WS_LITE_LOG(Logging_Levels::ERROR_log_level, "INFLATE MEMORY ALLOCATION ERROR!!! Tried to realloc " << InflateBufferSize);
59+
return returnemptyinflate();
60+
}
61+
memcpy(InflateBuffer + beforesize, TempInflateBuffer.get(), growsize);
62+
} while (err == Z_BUF_ERROR && InflateBufferSize <= MaxPayload);
3363

34-
dynamicInflationBuffer.append(inflationBuffer.get(), LARGE_BUFFER_SIZE - inflationStream.avail_out);
35-
} while (err == Z_BUF_ERROR && dynamicInflationBuffer.length() <= MaxPayload);
36-
37-
inflateReset(&inflationStream);
64+
inflateReset(&InflationStream);
3865

39-
if ((err != Z_BUF_ERROR && err != Z_OK) || dynamicInflationBuffer.length() > MaxPayload) {
40-
unsigned char *p = nullptr;
41-
size_t o = 0;
42-
return std::make_tuple(p, o);
66+
if ((err != Z_BUF_ERROR && err != Z_OK) || InflateBufferSize > MaxPayload) {
67+
return returnemptyinflate();
4368
}
44-
if (!dynamicInflationBuffer.empty()) {
45-
dynamicInflationBuffer.append(inflationBuffer.get(), LARGE_BUFFER_SIZE - inflationStream.avail_out);
46-
return std::make_tuple((unsigned char *)dynamicInflationBuffer.data(), dynamicInflationBuffer.length());
69+
if (InflateBufferSize > 0) {
70+
auto growsize = LARGE_BUFFER_SIZE - InflationStream.avail_out;
71+
InflateBufferSize += growsize;
72+
auto beforesize = InflateBufferSize;
73+
InflateBuffer = static_cast<unsigned char *>(realloc(InflateBuffer, InflateBufferSize));
74+
if (!InflateBuffer) {
75+
SL_WS_LITE_LOG(Logging_Levels::ERROR_log_level, "INFLATE MEMORY ALLOCATION ERROR!!! Tried to realloc " << InflateBufferSize);
76+
return returnemptyinflate();
77+
}
78+
memcpy(InflateBuffer + beforesize, TempInflateBuffer.get(), growsize);
79+
return std::make_tuple(InflateBuffer, InflateBufferSize);
4780
}
48-
49-
return std::make_tuple((unsigned char *)inflationBuffer.get(), LARGE_BUFFER_SIZE - (size_t)inflationStream.avail_out);
81+
return std::make_tuple(TempInflateBuffer.get(), LARGE_BUFFER_SIZE - (size_t)InflationStream.avail_out);
5082
}
51-
std::string dynamicInflationBuffer;
52-
std::unique_ptr<char[]> inflationBuffer;
53-
z_stream inflationStream = {};
54-
83+
auto endInflate() { beginInflate(); }
5584
std::function<void(const std::shared_ptr<IWebSocket> &, const HttpHeader &)> onConnection;
5685
std::function<void(const std::shared_ptr<IWebSocket> &, const WSMessage &)> onMessage;
5786
std::function<void(const std::shared_ptr<IWebSocket> &, unsigned short, const std::string &)> onDisconnection;

include/internal/WebSocketProtocol.h

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,17 @@ namespace WS_LITE {
263263
}
264264
}
265265
}
266-
266+
template <bool isServer, class SOCKETTYPE> inline void ProcessMessageFin(const SOCKETTYPE &socket, const WSMessage &unpacked, OpCode opcode)
267+
{
268+
if (socket->LastOpCode == OpCode::TEXT || opcode == OpCode::TEXT) {
269+
if (!isValidUtf8(unpacked.data, unpacked.len)) {
270+
return sendclosemessage<isServer>(socket, 1007, "Frame not valid utf8");
271+
}
272+
}
273+
if (socket->Parent->onMessage) {
274+
socket->Parent->onMessage(socket, unpacked);
275+
}
276+
}
267277
template <bool isServer, class SOCKETTYPE> inline void ProcessMessage(const SOCKETTYPE &socket, const std::shared_ptr<asio::streambuf> &extradata)
268278
{
269279

@@ -290,20 +300,19 @@ namespace WS_LITE {
290300
else if (socket->LastOpCode == OpCode::INVALID && opcode == OpCode::CONTINUATION) {
291301
return sendclosemessage<isServer>(socket, 1002, "Continuation Received without a previous frame");
292302
}
293-
auto unpacked =
294-
WSMessage{socket->ReceiveBuffer, socket->ReceiveBufferSize, socket->LastOpCode != OpCode::INVALID ? socket->LastOpCode : opcode};
303+
295304
// this could be compressed.... lets check it out
296305
if (socket->FrameCompressed || getrsv1(socket->ReceiveHeader)) { // is this the last of the messages? Decompress!!!
297-
auto[buffer, buffer_length] = socket->Parent->inflate(socket->ReceiveBuffer, socket->ReceiveBufferSize);
298-
unpacked = WSMessage{buffer, buffer_length, socket->LastOpCode != OpCode::INVALID ? socket->LastOpCode : opcode};
306+
socket->Parent->beginInflate();
307+
auto[buffer, buffer_length] = socket->Parent->Inflate(socket->ReceiveBuffer, socket->ReceiveBufferSize);
308+
auto unpacked = WSMessage{buffer, buffer_length, socket->LastOpCode != OpCode::INVALID ? socket->LastOpCode : opcode};
309+
ProcessMessageFin<isServer>(socket, unpacked, opcode);
310+
socket->Parent->endInflate();
299311
}
300-
if (socket->LastOpCode == OpCode::TEXT || opcode == OpCode::TEXT) {
301-
if (!isValidUtf8(unpacked.data, unpacked.len)) {
302-
return sendclosemessage<isServer>(socket, 1007, "Frame not valid utf8");
303-
}
304-
}
305-
if (socket->Parent->onMessage) {
306-
socket->Parent->onMessage(socket, unpacked);
312+
else {
313+
auto unpacked =
314+
WSMessage{socket->ReceiveBuffer, socket->ReceiveBufferSize, socket->LastOpCode != OpCode::INVALID ? socket->LastOpCode : opcode};
315+
ProcessMessageFin<isServer>(socket, unpacked, opcode);
307316
}
308317
ReadHeaderStart<isServer>(socket, extradata);
309318
}

0 commit comments

Comments
 (0)