Skip to content

Commit 6ed437d

Browse files
committed
Use a queue of ProxyType<string> instead of a buffer for UDP output to never miss messages
1 parent 03b3f8d commit 6ed437d

File tree

2 files changed

+20
-17
lines changed

2 files changed

+20
-17
lines changed

MessageControl/MessageOutput.cpp

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -131,9 +131,9 @@ namespace Publishers {
131131
//UDPOutput
132132
UDPOutput::Channel::Channel(const Core::NodeId& nodeId)
133133
: Core::SocketDatagram(false, nodeId.Origin(), nodeId, Messaging::MessageUnit::Instance().DataSize(), 0)
134-
, _loaded(0)
134+
, _queue()
135+
, _stringPool(ProxyPoolTypeSize)
135136
{
136-
::memset(_sendBuffer, 0, sizeof(_sendBuffer));
137137
}
138138
UDPOutput::Channel::~Channel()
139139
{
@@ -142,13 +142,21 @@ namespace Publishers {
142142

143143
uint16_t UDPOutput::Channel::SendData(uint8_t* dataFrame, const uint16_t maxSendSize)
144144
{
145+
uint16_t actualByteCount = 0;
146+
145147
_adminLock.Lock();
146148

147-
uint16_t actualByteCount = (_loaded > maxSendSize ? maxSendSize : _loaded);
148-
memcpy(dataFrame, _sendBuffer, actualByteCount);
149-
_loaded = 0;
149+
if (_queue.empty() == true) {
150+
_adminLock.Unlock();
151+
}
152+
else {
153+
Core::ProxyType<string> msg = _queue.front();
154+
_queue.pop();
155+
_adminLock.Unlock();
150156

151-
_adminLock.Unlock();
157+
actualByteCount = std::min<uint16_t>(msg->size(), maxSendSize);
158+
memcpy(dataFrame, msg->c_str(), actualByteCount);
159+
}
152160

153161
return (actualByteCount);
154162
}
@@ -166,14 +174,9 @@ namespace Publishers {
166174
{
167175
_adminLock.Lock();
168176

169-
ASSERT((_loaded + text.length() + 1) < sizeof(_sendBuffer));
170-
171-
if ((_loaded + text.length() + 1) < sizeof(_sendBuffer)) {
172-
Core::FrameType<0> frame(_sendBuffer + _loaded, sizeof(_sendBuffer) - _loaded, sizeof(_sendBuffer) - _loaded);
173-
Core::FrameType<0>::Writer frameWriter(frame, 0);
174-
frameWriter.NullTerminatedText(text);
175-
_loaded += frameWriter.Offset();
176-
}
177+
Core::ProxyType<string> msg = _stringPool.Element();
178+
*msg = text;
179+
_queue.push(msg);
177180

178181
_adminLock.Unlock();
179182

MessageControl/MessageOutput.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ namespace Publishers {
345345

346346
class UDPOutput : public IPublish {
347347
private:
348-
static constexpr uint16_t UDPBufferSize = 4 * 1024;
348+
static constexpr uint32_t ProxyPoolTypeSize = 10;
349349

350350
class Channel : public Core::SocketDatagram {
351351
public:
@@ -364,8 +364,8 @@ namespace Publishers {
364364
uint16_t ReceiveData(uint8_t*, const uint16_t) override;
365365
void StateChange() override;
366366

367-
uint8_t _sendBuffer[UDPBufferSize];
368-
uint16_t _loaded;
367+
std::queue<Core::ProxyType<string>> _queue;
368+
Core::ProxyPoolType<string> _stringPool;
369369
Core::CriticalSection _adminLock;
370370
};
371371

0 commit comments

Comments
 (0)