Skip to content

Commit 0103540

Browse files
authored
Fixing Flatbuffer Processing (#1854)
* Fix - Retries are not done when connection is destructed * Remove costly unpack and streamline code
1 parent 250ae2a commit 0103540

File tree

3 files changed

+130
-93
lines changed

3 files changed

+130
-93
lines changed

libsrc/flatbufserver/FlatBufferClient.cpp

Lines changed: 115 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include "FlatBufferClient.h"
22
#include <utils/PixelFormat.h>
3+
#include <utils/ColorRgba.h>
34

45
// qt
56
#include <QTcpSocket>
@@ -28,7 +29,7 @@ FlatBufferClient::FlatBufferClient(QTcpSocket* socket, int timeout, QObject *par
2829
, _processingMessage(false)
2930
{
3031
_imageResampler.setPixelDecimation(1);
31-
32+
3233
// timer setup
3334
_timeoutTimer.reset(new QTimer(this));
3435
_timeoutTimer->setSingleShot(true);
@@ -57,59 +58,71 @@ void FlatBufferClient::readyRead()
5758
}
5859
}
5960

60-
void FlatBufferClient::processNextMessage()
61+
bool FlatBufferClient::processNextMessageInline()
6162
{
62-
if (_processingMessage) { return; } // Avoid re-entrancy
63+
if (_processingMessage) { return false; } // Avoid re-entrancy
6364

6465
// Wait for at least 4 bytes to read the message size
6566
if (_receiveBuffer.size() < 4) {
66-
return;
67+
return false;
6768
}
6869

6970
_processingMessage = true;
7071

71-
uint32_t messageSize;
72-
memcpy(&messageSize, _receiveBuffer.constData(), sizeof(uint32_t));
73-
messageSize = qFromBigEndian(messageSize);
72+
// Directly read message size (no memcpy)
73+
const uint8_t* raw = reinterpret_cast<const uint8_t*>(_receiveBuffer.constData());
74+
uint32_t messageSize = (raw[0] << 24) | (raw[1] << 16) | (raw[2] << 8) | raw[3];
7475

7576
// Validate message size
7677
if (messageSize == 0 || messageSize > FLATBUFFER_MAX_MSG_LENGTH)
7778
{
7879
Warning(_log, "Invalid message size: %d - dropping received data", messageSize);
79-
_receiveBuffer.clear();
8080
_processingMessage = false;
81-
return;
81+
return true;
8282
}
8383

8484
// Wait for full message
8585
if (_receiveBuffer.size() < static_cast<int>(messageSize + 4))
8686
{
8787
_processingMessage = false;
88-
return;
88+
return false;
8989
}
9090

91-
// Extract the message and remove it from the buffer
92-
_lastMessage = _receiveBuffer.mid(4, messageSize);
93-
_receiveBuffer.remove(0, messageSize + 4);
94-
95-
const uint8_t* msgData = reinterpret_cast<const uint8_t*>(_lastMessage.constData());
91+
// Extract the message and remove it from the buffer (no copying)
92+
const uint8_t* msgData = reinterpret_cast<const uint8_t*>(_receiveBuffer.constData() + 4);
9693
flatbuffers::Verifier verifier(msgData, messageSize);
9794

9895
if (!hyperionnet::VerifyRequestBuffer(verifier)) {
9996
Error(_log, "Invalid FlatBuffer message received");
10097
sendErrorReply("Invalid FlatBuffer message received");
10198
_processingMessage = false;
102-
QMetaObject::invokeMethod(this, &FlatBufferClient::processNextMessage, Qt::QueuedConnection);
103-
return;
99+
100+
// Clear the buffer in case of an invalid message
101+
_receiveBuffer.clear();
102+
return true;
104103
}
105104

106105
// Invoke message handling
107-
QMetaObject::invokeMethod(this, [this]() {
108-
const auto* msgData = reinterpret_cast<const uint8_t*>(_lastMessage.constData());
106+
QMetaObject::invokeMethod(this, [this, msgData, messageSize]() {
109107
handleMessage(hyperionnet::GetRequest(msgData));
110108
_processingMessage = false;
111-
QMetaObject::invokeMethod(this, &FlatBufferClient::processNextMessage, Qt::QueuedConnection);
109+
110+
// Remove the processed message from the buffer (header + body)
111+
_receiveBuffer.remove(0, messageSize + 4); // Clear the processed message + header
112+
113+
// Continue processing the next message
114+
processNextMessage();
112115
});
116+
117+
return true;
118+
}
119+
120+
void FlatBufferClient::processNextMessage()
121+
{
122+
// Run the message processing inline until the buffer is empty or we can't process further
123+
while (processNextMessageInline()) {
124+
// Keep processing as long as we can
125+
}
113126
}
114127

115128
void FlatBufferClient::noDataReceived()
@@ -201,64 +214,89 @@ void FlatBufferClient::handleRegisterCommand(const hyperionnet::Register *regReq
201214

202215
void FlatBufferClient::handleImageCommand(const hyperionnet::Image *image)
203216
{
204-
Image<ColorRgb> imageRGB;
205-
206217
// extract parameters
207218
int const duration = image->duration();
219+
208220
if (image->data_as_RawImage() != nullptr)
209221
{
210222
const auto* img = static_cast<const hyperionnet::RawImage*>(image->data_as_RawImage());
211223

212-
hyperionnet::RawImageT rawImageNative;
213-
img->UnPackTo(&rawImageNative);
214-
215-
const int width = rawImageNative.width;
216-
const int height = rawImageNative.height;
224+
// Read image properties directly from FlatBuffer
225+
const int width = img->width();
226+
const int height = img->height();
227+
const auto* data = img->data();
217228

218-
if (width <= 0 || height <= 0 || rawImageNative.data.empty())
229+
if (width <= 0 || height <= 0 || data == nullptr || data->size() == 0)
219230
{
220231
sendErrorReply("Invalid width and/or height or no raw image data provided");
221232
return;
222233
}
223234

224-
// check consistency of the size of the received data
225-
int const bytesPerPixel = rawImageNative.data.size() / (width * height);
235+
// Check consistency of image data size
236+
const int dataSize = data->size();
237+
const int bytesPerPixel = dataSize / (width * height);
226238
if (bytesPerPixel != 3 && bytesPerPixel != 4)
227239
{
228240
sendErrorReply("Size of image data does not match with the width and height");
229241
return;
230242
}
231243

232-
imageRGB.resize(width, height);
233-
processRawImage(rawImageNative, bytesPerPixel, _imageResampler, imageRGB);
244+
// Only resize if needed (reuse memory)
245+
if (_imageOutputBuffer.width() != width || _imageOutputBuffer.height() != height)
246+
{
247+
_imageOutputBuffer.resize(width, height);
248+
}
249+
250+
processRawImage(data->data(), width, height, bytesPerPixel, _imageResampler, _imageOutputBuffer);
234251
}
235252
else if (image->data_as_NV12Image() != nullptr)
236253
{
237254
const auto* img = static_cast<const hyperionnet::NV12Image*>(image->data_as_NV12Image());
238255

239-
hyperionnet::NV12ImageT nv12ImageNative;
240-
img->UnPackTo(&nv12ImageNative);
241-
242-
const int width = nv12ImageNative.width;
243-
const int height = nv12ImageNative.height;
244-
245-
if (width <= 0 || height <= 0 || nv12ImageNative.data_y.empty() || nv12ImageNative.data_uv.empty())
246-
{
247-
sendErrorReply("Invalid width and/or height or no complete NV12 image data provided");
248-
return;
249-
}
250-
251-
imageRGB.resize(width, height);
252-
processNV12Image(nv12ImageNative, _imageResampler, imageRGB);
256+
const int width = img->width();
257+
const int height = img->height();
258+
const auto* data_y = img->data_y();
259+
const auto* data_uv = img->data_uv();
260+
261+
if (width <= 0 || height <= 0 || data_y == nullptr || data_uv == nullptr ||
262+
data_y->size() == 0 || data_uv->size() == 0)
263+
{
264+
sendErrorReply("Invalid width and/or height or no complete NV12 image data provided");
265+
return;
266+
}
267+
268+
// Combine Y and UV into one contiguous buffer (reuse class member buffer)
269+
const size_t y_size = data_y->size();
270+
const size_t uv_size = data_uv->size();
271+
272+
size_t required_size = y_size + uv_size;
273+
if (_combinedNv12Buffer.capacity() < required_size)
274+
{
275+
_combinedNv12Buffer.reserve(required_size);
276+
}
277+
std::memcpy(_combinedNv12Buffer.data(), data_y->data(), y_size);
278+
std::memcpy(_combinedNv12Buffer.data() + y_size, data_uv->data(), uv_size);
279+
280+
// Determine stride for Y
281+
const int stride_y = img->stride_y() > 0 ? img->stride_y() : width;
282+
283+
// Resize only when needed
284+
if (_imageOutputBuffer.width() != width || _imageOutputBuffer.height() != height)
285+
{
286+
_imageOutputBuffer.resize(width, height);
287+
}
288+
289+
// Process image
290+
processNV12Image(_combinedNv12Buffer.data(), width, height, stride_y, _imageResampler, _imageOutputBuffer);
253291
}
254292
else
255293
{
256294
sendErrorReply("No or unknown image data provided");
257295
return;
258296
}
259297

260-
emit setGlobalInputImage(_priority, imageRGB, duration);
261-
emit setBufferImage("FlatBuffer", imageRGB);
298+
emit setGlobalInputImage(_priority, _imageOutputBuffer, duration);
299+
emit setBufferImage("FlatBuffer", _imageOutputBuffer);
262300

263301
// send reply
264302
sendSuccessReply();
@@ -319,49 +357,41 @@ void FlatBufferClient::sendErrorReply(const QString& error)
319357
sendMessage(_builder.GetBufferPointer(), _builder.GetSize());
320358
}
321359

322-
inline void FlatBufferClient::processRawImage(const hyperionnet::RawImageT& raw_image, int bytesPerPixel, ImageResampler& resampler, Image<ColorRgb>& outputImage) {
323-
324-
int const width = raw_image.width;
325-
int const height = raw_image.height;
326-
360+
inline void FlatBufferClient::processRawImage(const uint8_t* buffer,
361+
int width,
362+
int height,
363+
int bytesPerPixel,
364+
ImageResampler& resampler,
365+
Image<ColorRgb>& outputImage)
366+
{
327367
int const lineLength = width * bytesPerPixel;
328368
PixelFormat const pixelFormat = (bytesPerPixel == 4) ? PixelFormat::RGB32 : PixelFormat::RGB24;
329369

330-
// Process the image
331370
resampler.processImage(
332-
raw_image.data.data(), // Raw RGB/RGBA buffer
333-
width, // Image width
334-
height, // Image height
335-
lineLength, // Line length
336-
pixelFormat, // Pixel format (RGB24/RGB32)
337-
outputImage // Output image
338-
);
371+
buffer, // Raw buffer
372+
width,
373+
height,
374+
lineLength,
375+
pixelFormat,
376+
outputImage
377+
);
339378
}
340379

341-
inline void FlatBufferClient::processNV12Image(const hyperionnet::NV12ImageT& nv12_image, ImageResampler& resampler, Image<ColorRgb>& outputImage) {
342-
// Combine data_y and data_uv into a single buffer
343-
int const width = nv12_image.width;
344-
int const height = nv12_image.height;
345-
346-
size_t const y_size = nv12_image.data_y.size();
347-
size_t const uv_size = nv12_image.data_uv.size();
348-
std::vector<uint8_t> combined_buffer(y_size + uv_size);
349-
350-
std::memcpy(combined_buffer.data(), nv12_image.data_y.data(), y_size);
351-
std::memcpy(combined_buffer.data() + y_size, nv12_image.data_uv.data(), uv_size);
352-
353-
// Determine line length (stride_y)
354-
int const lineLength = nv12_image.stride_y > 0 ? nv12_image.stride_y : width;
355-
356-
PixelFormat const pixelFormat = PixelFormat::NV12;
380+
inline void FlatBufferClient::processNV12Image(const uint8_t* nv12_data,
381+
int width,
382+
int height,
383+
int stride_y,
384+
ImageResampler& resampler,
385+
Image<ColorRgb>& outputImage)
386+
{
387+
PixelFormat pixelFormat = PixelFormat::NV12;
357388

358-
// Process the image
359389
resampler.processImage(
360-
combined_buffer.data(), // Combined NV12 buffer
361-
width, // Image width
362-
height, // Image height
363-
lineLength, // Line length for Y plane
364-
pixelFormat, // Pixel format (NV12)
365-
outputImage // Output image
366-
);
390+
nv12_data, // Combined NV12 buffer
391+
width,
392+
height,
393+
stride_y,
394+
pixelFormat,
395+
outputImage
396+
);
367397
}

libsrc/flatbufserver/FlatBufferClient.h

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ private slots:
8181
/// @brief Is called whenever the socket got new data to read
8282
///
8383
void readyRead();
84-
void processNextMessage();
8584

8685
///
8786
/// @brief Is called when the socket closed the connection, also requests thread exit
@@ -123,6 +122,9 @@ private slots:
123122
///
124123
void handleNotImplemented();
125124

125+
void processNextMessage();
126+
bool processNextMessageInline();
127+
126128
///
127129
/// Send a message to the connected client
128130
/// @param data to be send
@@ -142,8 +144,8 @@ private slots:
142144
///
143145
void sendErrorReply(const QString& error);
144146

145-
void processRawImage(const hyperionnet::RawImageT& raw_image, int bytesPerPixel, ImageResampler& resampler, Image<ColorRgb>& outputImage);
146-
void processNV12Image(const hyperionnet::NV12ImageT& nv12_image, ImageResampler& resampler, Image<ColorRgb>& outputImage);
147+
void processRawImage(const uint8_t* buffer, int width, int height, int bytesPerPixel, ImageResampler& resampler, Image<ColorRgb>& outputImage);
148+
void processNV12Image(const uint8_t* nv12_data, int width, int height, int stride_y, ImageResampler& resampler, Image<ColorRgb>& outputImage);
147149

148150
private:
149151
Logger * _log;
@@ -156,12 +158,13 @@ private slots:
156158

157159
QByteArray _receiveBuffer;
158160

161+
Image<ColorRgb> _imageOutputBuffer;
159162
ImageResampler _imageResampler;
163+
std::vector<uint8_t> _combinedNv12Buffer;
160164

161165
// Flatbuffers builder
162166
flatbuffers::FlatBufferBuilder _builder;
163167
bool _processingMessage;
164-
QByteArray _lastMessage;
165168
};
166169

167170
#endif // FLATBUFFERCLIENT_H

libsrc/flatbufserver/FlatBufferConnection.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,21 @@ FlatBufferConnection::FlatBufferConnection(const QString& origin, const QHostAdd
3030
// init connect
3131
connectToRemoteHost();
3232

33-
// start the connection timer
3433
_timer.setInterval(5000);
35-
3634
connect(&_timer, &QTimer::timeout, this, &FlatBufferConnection::connectToRemoteHost);
35+
36+
//Trigger the retry timer when connection dropped
37+
connect(this, &FlatBufferConnection::isDisconnected, &_timer, static_cast<void (QTimer::*)()>(&QTimer::start));
3738
_timer.start();
3839
}
3940

4041
FlatBufferConnection::~FlatBufferConnection()
4142
{
4243
_timer.stop();
44+
45+
//Stop retrying on disconnect
46+
disconnect(this, &FlatBufferConnection::isDisconnected, &_timer, static_cast<void (QTimer::*)()>(&QTimer::start));
47+
4348
Debug(_log, "Closing connection with host: %s, port [%u]", QSTRING_CSTR(_host.toString()), _port);
4449
_socket.close();
4550
}
@@ -58,7 +63,6 @@ void FlatBufferConnection::onDisconnected()
5863
_isRegistered = false,
5964
Info(_log, "Disconnected from target host: %s, port [%u]", QSTRING_CSTR(_host.toString()), _port);
6065
emit isDisconnected();
61-
_timer.start();
6266
}
6367

6468

@@ -225,8 +229,8 @@ bool FlatBufferConnection::parseReply(const hyperionnet::Reply *reply)
225229
}
226230
else
227231
{
228-
_timer.stop();
229232
_isRegistered = true;
233+
_timer.stop();
230234
Debug(_log,"Client \"%s\" registered successfully with target host: %s, port [%u]", QSTRING_CSTR(_origin), QSTRING_CSTR(_host.toString()), _port);
231235
emit isReadyToSend();
232236
}

0 commit comments

Comments
 (0)