Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.

Commit 1b74d3e

Browse files
authored
Fix internal connection issue under high transmit rate (#1091)
1 parent 802a126 commit 1b74d3e

File tree

2 files changed

+32
-6
lines changed

2 files changed

+32
-6
lines changed

source/core/owt_base/internal/TransportBase.cpp

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,8 @@ uint32_t TransportMessage::fillData(const uint8_t* data, uint32_t length)
7474
}
7575
if (data) {
7676
memcpy(m_buffer.get() + m_receivedBytes, data, length);
77+
} else {
78+
length = 0;
7779
}
7880
m_receivedBytes += length;
7981
return length;
@@ -157,10 +159,19 @@ void TransportSession::sendData(TransportData data)
157159
return;
158160
}
159161
auto self(shared_from_this());
160-
m_service->post(boost::bind(&TransportSession::sendHandler, self, data));
162+
m_service->post(boost::bind(&TransportSession::prepareSend, self, data));
163+
}
164+
165+
void TransportSession::prepareSend(TransportData data)
166+
{
167+
// Only access m_sendQueue in IO service thread.
168+
m_sendQueue.push(data);
169+
if (m_sendQueue.size() == 1) {
170+
sendHandler();
171+
}
161172
}
162173

163-
void TransportSession::sendHandler(TransportData data)
174+
void TransportSession::sendHandler()
164175
{
165176
if (m_isClosed) {
166177
return;
@@ -169,7 +180,11 @@ void TransportSession::sendHandler(TransportData data)
169180
ELOG_WARN("sendHandler: socket is not open");
170181
return;
171182
}
183+
if (m_sendQueue.empty()) {
184+
return;
185+
}
172186

187+
TransportData data = m_sendQueue.front();
173188
TransportMessage toSend(data.buffer.get(), data.length);
174189
TransportData wrappedData{toSend.messageData(),
175190
toSend.messageLength()};
@@ -197,13 +212,18 @@ void TransportSession::writeHandler(
197212
const boost::system::error_code& ec,
198213
std::size_t bytes)
199214
{
215+
assert(m_sendQueue.size() > 0);
216+
m_sendQueue.pop();
200217
if (ec) {
201218
ELOG_DEBUG("Error writing data: %s", ec.message().c_str());
202219
if (!m_isClosed) {
203220
m_isClosed = true;
204221
// Notify the listener about the socket error if the listener is not closing me.
205222
m_listener->onClose(m_id);
206223
}
224+
} else {
225+
ELOG_DEBUG("Wrote data: %zu", bytes);
226+
sendHandler();
207227
}
208228
}
209229

@@ -254,6 +274,7 @@ void TransportSession::receiveData()
254274

255275
uint32_t bytesToRead = m_receivedMessage.missingBytes();
256276
assert(bytesToRead > 0);
277+
257278
if (bytesToRead > m_receivedBufferSize) {
258279
// Double the received buffer size
259280
m_receivedBufferSize = std::max(m_receivedBufferSize * 2, bytesToRead);
@@ -286,7 +307,10 @@ void TransportSession::readHandler(
286307
if (!ec || ec.value() == boost::asio::error::message_size) {
287308
uint32_t bytesToRead = m_receivedMessage.missingBytes();
288309
assert(bytesToRead >= bytes);
289-
m_receivedMessage.fillData(m_receivedBuffer.get(), bytes);
310+
uint32_t filled = m_receivedMessage.fillData(m_receivedBuffer.get(), bytes);
311+
if (filled != bytes) {
312+
ELOG_WARN("Message fill length %u, %zu\n", filled, bytes);
313+
}
290314
receiveData();
291315
} else {
292316
if (ec.value() != boost::system::errc::operation_canceled &&

source/core/owt_base/internal/TransportBase.h

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
#include "IOService.h"
1717
#include <memory>
18+
#include <queue>
1819

1920
namespace owt_base {
2021

@@ -102,17 +103,18 @@ class TransportSession
102103

103104
private:
104105
void receiveData();
105-
void sendHandler(TransportData data);
106-
void writeHandler(const boost::system::error_code&, std::size_t);
107106
void readHandler(const boost::system::error_code&, std::size_t);
108-
107+
void prepareSend(TransportData data);
108+
void sendHandler();
109+
void writeHandler(const boost::system::error_code&, std::size_t);
109110

110111
uint32_t m_id;
111112
std::shared_ptr<IOService> m_service;
112113
boost::asio::ip::tcp::socket m_socket;
113114
std::shared_ptr<SSLSocket> m_sslSocket;
114115
TransportMessage m_receivedMessage;
115116
boost::shared_array<uint8_t> m_receivedBuffer;
117+
std::queue<TransportData> m_sendQueue;
116118
uint32_t m_receivedBufferSize;
117119
bool m_isClosed;
118120
Listener* m_listener;

0 commit comments

Comments
 (0)