Skip to content

Commit f1cc38e

Browse files
committed
Use new PreparedMessage interface in Precompress
1 parent af39093 commit f1cc38e

File tree

3 files changed

+54
-24
lines changed

3 files changed

+54
-24
lines changed

examples/Precompress.cpp

Lines changed: 13 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,28 +6,22 @@ int main() {
66
/* Fill with user data */
77
};
88

9-
/* Keeping track of last precompressed message both in original and compressed format */
10-
std::string originalMessage;
11-
std::string compressedMessage;
9+
/* Keeping track of last prepared message */
10+
uWS::PreparedMessage preparedMessage;
1211
std::mutex m;
1312

1413
/* For demo, we create a thread that will update the precompressed message every second */
15-
std::thread t2([&originalMessage, &compressedMessage, &m]() {
16-
uWS::ZlibContext zlibContext;
17-
uWS::DeflationStream compressor(uWS::DEDICATED_COMPRESSOR);
18-
int counter = 0;
19-
20-
while (true) {
21-
counter++;
22-
14+
std::thread t2([&m, &preparedMessage]() {
15+
for (int counter = 1; true; counter++) {
2316
m.lock();
24-
originalMessage = "Hello you are looking at message number " + std::to_string(counter) + " and this text should be precompressed";
25-
compressedMessage = compressor.deflate(&zlibContext, {originalMessage.data(), originalMessage.length()}, true);
26-
m.unlock();
27-
17+
std::string newMessage = "Hello you are looking at message number " + std::to_string(counter) + " and this text should be precompressed";
18+
19+
/* Here the current preparedMessage is updated */
20+
preparedMessage = uWS::Loop::get()->prepareMessage(newMessage, uWS::OpCode::TEXT, true);
21+
22+
m.unlock();
2823
std::this_thread::sleep_for(std::chrono::milliseconds(500));
2924
}
30-
3125
});
3226

3327
uWS::App().ws<PerSocketData>("/*", {
@@ -39,19 +33,14 @@ int main() {
3933
/* Open event here, you may access ws->getUserData() which points to a PerSocketData struct */
4034

4135
},
42-
.message = [&originalMessage, &compressedMessage, &m](auto *ws, std::string_view message, uWS::OpCode opCode) {
36+
.message = [&m, &preparedMessage](auto *ws, std::string_view message, uWS::OpCode opCode) {
4337

4438
/* First respond by echoing what they send us, without compression */
4539
ws->send(message, opCode, false);
4640

47-
/* This should be wrapped up into ws->sendPrepared(PreparedMessage) in the future, experimental for now */
41+
/* Send last prepared message */
4842
m.lock();
49-
if (ws->hasNegotiatedCompression() && compressedMessage.length() < originalMessage.length()) {
50-
std::cout << "Responding with precompressed message saving " << (originalMessage.length() - compressedMessage.length()) << " bytes" << std::endl;
51-
ws->send({compressedMessage.data(), compressedMessage.length()}, uWS::OpCode::TEXT, uWS::CompressFlags::ALREADY_COMPRESSED);
52-
} else {
53-
ws->send({originalMessage.data(), originalMessage.length()}, uWS::OpCode::TEXT);
54-
}
43+
ws->sendPrepared(preparedMessage);
5544
m.unlock();
5645
},
5746
.dropped = [](auto */*ws*/, std::string_view /*message*/, uWS::OpCode /*opCode*/) {

src/Loop.h

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,15 @@
2525
#include <iostream>
2626

2727
namespace uWS {
28+
29+
/* A prepared message is dependent on the Loop, so it belongs here */
30+
struct PreparedMessage {
31+
/* These should be a single alloation along with the PreparedMessage itself (they are static) */
32+
std::string originalMessage, compressedMessage;
33+
bool compressed;
34+
int opCode;
35+
};
36+
2837
struct Loop {
2938
private:
3039
static void wakeupCb(us_loop_t *loop) {
@@ -106,6 +115,29 @@ struct Loop {
106115
}
107116

108117
public:
118+
119+
/* Preformatted messages need the Loop */
120+
PreparedMessage prepareMessage(std::string_view message, int opCode, bool compress = true) {
121+
/* The message could be formatted right here, but this optimization is not done yet */
122+
PreparedMessage preparedMessage;
123+
preparedMessage.compressed = compress;
124+
preparedMessage.opCode = opCode;
125+
preparedMessage.originalMessage = message;
126+
127+
LoopData *loopData = (LoopData *) us_loop_ext((us_loop_t *) this);
128+
129+
/* Initialize loop's deflate inflate streams */
130+
if (!loopData->zlibContext) {
131+
loopData->zlibContext = new ZlibContext;
132+
loopData->inflationStream = new InflationStream(CompressOptions::DEDICATED_DECOMPRESSOR);
133+
loopData->deflationStream = new DeflationStream(CompressOptions::DEDICATED_COMPRESSOR);
134+
}
135+
136+
preparedMessage.compressedMessage = loopData->deflationStream->deflate(loopData->zlibContext, {preparedMessage.originalMessage.data(), preparedMessage.originalMessage.length()}, true);
137+
138+
return preparedMessage;
139+
}
140+
109141
/* Lazily initializes a per-thread loop and returns it.
110142
* Will automatically free all initialized loops at exit. */
111143
static Loop *get(void *existingNativeLoop = nullptr) {

src/WebSocket.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,15 @@ struct WebSocket : AsyncSocket<SSL> {
100100
return webSocketData->compressionStatus == WebSocketData::ENABLED;
101101
}
102102

103+
/* Experimental */
104+
void sendPrepared(PreparedMessage &preparedMessage) {
105+
if (hasNegotiatedCompression() && preparedMessage.compressedMessage.length() < preparedMessage.originalMessage.length()) {
106+
send({preparedMessage.compressedMessage.data(), preparedMessage.compressedMessage.length()}, (OpCode) preparedMessage.opCode, uWS::CompressFlags::ALREADY_COMPRESSED);
107+
} else {
108+
send({preparedMessage.originalMessage.data(), preparedMessage.originalMessage.length()}, (OpCode) preparedMessage.opCode);
109+
}
110+
}
111+
103112
/* Send or buffer a WebSocket frame, compressed or not. Returns BACKPRESSURE on increased user space backpressure,
104113
* DROPPED on dropped message (due to backpressure) or SUCCCESS if you are free to send even more now. */
105114
SendStatus send(std::string_view message, OpCode opCode = OpCode::BINARY, int compress = false, bool fin = true) {

0 commit comments

Comments
 (0)