Skip to content

Commit 5f696af

Browse files
committed
Adding pending data check so users can throttle sending of data
1 parent 39d3935 commit 5f696af

File tree

4 files changed

+33
-4
lines changed

4 files changed

+33
-4
lines changed

Test/main.cpp

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ void generaltest()
116116
auto lastheard = std::chrono::high_resolution_clock::now();
117117

118118
SL::WS_LITE::PortNumber port(3002);
119-
auto listenerct =
119+
auto listenerctx =
120120
SL::WS_LITE::CreateContext(SL::WS_LITE::ThreadCount(1))
121121
->NoTLS()
122122
->CreateListener(port)
@@ -158,6 +158,10 @@ void generaltest()
158158
while (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - lastheard).count() < 2000) {
159159
std::this_thread::sleep_for(200ms);
160160
}
161+
162+
std::this_thread::sleep_for(200ms);
163+
assert(clientctx->get_DataPending() == 0);
164+
assert(listenerctx->get_DataPending() == 0);
161165
}
162166
void generalTLStest()
163167
{
@@ -259,6 +263,9 @@ void generalTLStest()
259263
while (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - lastheard).count() < 2000) {
260264
std::this_thread::sleep_for(200ms);
261265
}
266+
std::this_thread::sleep_for(200ms);
267+
assert(clientctx->get_DataPending() == 0);
268+
assert(listenerctx->get_DataPending() == 0);
262269
}
263270

264271
void multithreadtest()
@@ -321,6 +328,11 @@ void multithreadtest()
321328
while (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - lastheard).count() < 2000) {
322329
std::this_thread::sleep_for(200ms);
323330
}
331+
std::this_thread::sleep_for(200ms);
332+
for (auto &c : clients) {
333+
assert(c->get_DataPending() == 0);
334+
}
335+
assert(listenerctx->get_DataPending() == 0);
324336
}
325337
const auto bufferesize = 1024 * 1024 * 10;
326338
void multithreadthroughputtest()
@@ -392,10 +404,14 @@ void multithreadthroughputtest()
392404
->connect("localhost", port);
393405
clients.push_back(c);
394406
}
395-
396407
while (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::high_resolution_clock::now() - lastheard).count() < 5000) {
397408
std::this_thread::sleep_for(200ms);
398409
}
410+
std::this_thread::sleep_for(200ms);
411+
for (auto &c : clients) {
412+
assert(c->get_DataPending() == 0);
413+
}
414+
assert(listenerctx->get_DataPending() == 0);
399415
std::cout << "Received " << mbsreceived << " bytes" << std::endl;
400416
}
401417
int main(int argc, char *argv[])

include/WS_Lite.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
#include <functional>
44
#include <memory>
55
#include <string>
6-
#include <unordered_map>
76
#include <system_error>
7+
#include <unordered_map>
88
typedef struct x509_store_ctx_st X509_STORE_CTX;
99

1010
#if defined(WINDOWS) || defined(WIN32)
@@ -167,6 +167,8 @@ namespace WS_LITE {
167167
virtual bool is_v4() const = 0;
168168
virtual bool is_v6() const = 0;
169169
virtual bool is_loopback() const = 0;
170+
// this will return the amount of data queued up but has not yet been submitted to the OS to be sent
171+
virtual size_t get_DataPending() const = 0;
170172
virtual void send(const WSMessage &msg, bool compressmessage) = 0;
171173
// send a close message and close the socket
172174
virtual void close(unsigned short code = 1000, const std::string &msg = "") = 0;
@@ -186,6 +188,8 @@ namespace WS_LITE {
186188
virtual void set_WriteTimeout(std::chrono::seconds seconds) = 0;
187189
// get the current write timeout in seconds
188190
virtual std::chrono::seconds get_WriteTimeout() = 0;
191+
// this will return the amount of data queued up but has not yet been submitted to the OS to be sent
192+
virtual size_t get_DataPending() const = 0;
189193
};
190194
class WS_LITE_EXTERN IWSListener_Configuration {
191195
public:

include/internal/DataStructures.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ namespace WS_LITE {
104104
std::chrono::seconds WriteTimeout = std::chrono::seconds(30);
105105
size_t MaxPayload = 1024 * 1024 * 20; // 20 MB
106106
bool TLSEnabled = false;
107+
size_t DataPending = 0;
107108

108109
std::function<void(const std::shared_ptr<IWSocket> &, const std::unordered_map<std::string, std::string> &)> onConnection;
109110
std::function<void(const std::shared_ptr<IWSocket> &, const WSMessage &)> onMessage;
@@ -163,7 +164,7 @@ namespace WS_LITE {
163164
sendclosemessage<isServer>(p, self, code, msg);
164165
}
165166
}
166-
167+
virtual size_t get_DataPending() const override { return DataPending; }
167168
void canceltimers()
168169
{
169170
std::error_code ec;
@@ -188,6 +189,7 @@ namespace WS_LITE {
188189
asio::basic_waitable_timer<std::chrono::steady_clock> write_deadline;
189190
asio::strand strand;
190191
std::deque<SendQueueItem> SendMessageQueue;
192+
size_t DataPending = 0;
191193
};
192194

193195
} // namespace WS_LITE

include/internal/WebSocketProtocol.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,12 +195,15 @@ namespace WS_LITE {
195195
}
196196

197197
socket->strand.post([socket, msg, parent, compressmessage]() {
198+
198199
if (socket->SocketStatus_ == SocketStatus::CONNECTED) {
199200
// update the socket status to reflect it is closing to prevent other messages from being sent.. this is the last valid message
200201
// make sure to do this after a call to write so the write process sends the close message, but no others
201202
if (msg.code == OpCode::CLOSE) {
202203
socket->SocketStatus_ = SocketStatus::CLOSING;
203204
}
205+
socket->DataPending += msg.len;
206+
socket->Parent->DataPending += msg.len;
204207
socket->SendMessageQueue.emplace_back(SendQueueItem{msg, compressmessage});
205208
SL::WS_LITE::startwrite<isServer>(parent, socket);
206209
}
@@ -254,6 +257,8 @@ namespace WS_LITE {
254257
return handleclose(parent, socket, 1002, "write header failed " + ec.message());
255258
}
256259
assert(msg.len == bytes_transferred);
260+
socket->DataPending -= msg.len;
261+
socket->Parent->DataPending -= msg.len;
257262
startwrite<isServer>(parent, socket);
258263

259264
}));
@@ -552,6 +557,7 @@ namespace WS_LITE {
552557
virtual std::chrono::seconds get_ReadTimeout() override;
553558
virtual void set_WriteTimeout(std::chrono::seconds seconds) override;
554559
virtual std::chrono::seconds get_WriteTimeout() override;
560+
virtual size_t get_DataPending() const override { return Impl_->DataPending; }
555561
};
556562
class WSListener : public IWSHub {
557563
std::shared_ptr<WSContextImpl> Impl_;
@@ -565,6 +571,7 @@ namespace WS_LITE {
565571
virtual std::chrono::seconds get_ReadTimeout() override;
566572
virtual void set_WriteTimeout(std::chrono::seconds seconds) override;
567573
virtual std::chrono::seconds get_WriteTimeout() override;
574+
virtual size_t get_DataPending() const override { return Impl_->DataPending; }
568575
};
569576

570577
class WSListener_Configuration : public IWSListener_Configuration {

0 commit comments

Comments
 (0)