Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 115 additions & 85 deletions libsrc/flatbufserver/FlatBufferClient.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "FlatBufferClient.h"
#include <utils/PixelFormat.h>
#include <utils/ColorRgba.h>

// qt
#include <QTcpSocket>
Expand Down Expand Up @@ -28,7 +29,7 @@ FlatBufferClient::FlatBufferClient(QTcpSocket* socket, int timeout, QObject *par
, _processingMessage(false)
{
_imageResampler.setPixelDecimation(1);

// timer setup
_timeoutTimer.reset(new QTimer(this));
_timeoutTimer->setSingleShot(true);
Expand Down Expand Up @@ -57,59 +58,71 @@ void FlatBufferClient::readyRead()
}
}

void FlatBufferClient::processNextMessage()
bool FlatBufferClient::processNextMessageInline()
{
if (_processingMessage) { return; } // Avoid re-entrancy
if (_processingMessage) { return false; } // Avoid re-entrancy

// Wait for at least 4 bytes to read the message size
if (_receiveBuffer.size() < 4) {
return;
return false;
}

_processingMessage = true;

uint32_t messageSize;
memcpy(&messageSize, _receiveBuffer.constData(), sizeof(uint32_t));
messageSize = qFromBigEndian(messageSize);
// Directly read message size (no memcpy)
const uint8_t* raw = reinterpret_cast<const uint8_t*>(_receiveBuffer.constData());
uint32_t messageSize = (raw[0] << 24) | (raw[1] << 16) | (raw[2] << 8) | raw[3];

// Validate message size
if (messageSize == 0 || messageSize > FLATBUFFER_MAX_MSG_LENGTH)
{
Warning(_log, "Invalid message size: %d - dropping received data", messageSize);
_receiveBuffer.clear();
_processingMessage = false;
return;
return true;
}

// Wait for full message
if (_receiveBuffer.size() < static_cast<int>(messageSize + 4))
{
_processingMessage = false;
return;
return false;
}

// Extract the message and remove it from the buffer
_lastMessage = _receiveBuffer.mid(4, messageSize);
_receiveBuffer.remove(0, messageSize + 4);

const uint8_t* msgData = reinterpret_cast<const uint8_t*>(_lastMessage.constData());
// Extract the message and remove it from the buffer (no copying)
const uint8_t* msgData = reinterpret_cast<const uint8_t*>(_receiveBuffer.constData() + 4);
flatbuffers::Verifier verifier(msgData, messageSize);

if (!hyperionnet::VerifyRequestBuffer(verifier)) {
Error(_log, "Invalid FlatBuffer message received");
sendErrorReply("Invalid FlatBuffer message received");
_processingMessage = false;
QMetaObject::invokeMethod(this, &FlatBufferClient::processNextMessage, Qt::QueuedConnection);
return;

// Clear the buffer in case of an invalid message
_receiveBuffer.clear();
return true;
}

// Invoke message handling
QMetaObject::invokeMethod(this, [this]() {
const auto* msgData = reinterpret_cast<const uint8_t*>(_lastMessage.constData());
QMetaObject::invokeMethod(this, [this, msgData, messageSize]() {
handleMessage(hyperionnet::GetRequest(msgData));
_processingMessage = false;
QMetaObject::invokeMethod(this, &FlatBufferClient::processNextMessage, Qt::QueuedConnection);

// Remove the processed message from the buffer (header + body)
_receiveBuffer.remove(0, messageSize + 4); // Clear the processed message + header

// Continue processing the next message
processNextMessage();
});

return true;
}

void FlatBufferClient::processNextMessage()
{
// Run the message processing inline until the buffer is empty or we can't process further
while (processNextMessageInline()) {
// Keep processing as long as we can
}
}

void FlatBufferClient::noDataReceived()
Expand Down Expand Up @@ -201,64 +214,89 @@ void FlatBufferClient::handleRegisterCommand(const hyperionnet::Register *regReq

void FlatBufferClient::handleImageCommand(const hyperionnet::Image *image)
{
Image<ColorRgb> imageRGB;

// extract parameters
int const duration = image->duration();

if (image->data_as_RawImage() != nullptr)
{
const auto* img = static_cast<const hyperionnet::RawImage*>(image->data_as_RawImage());

hyperionnet::RawImageT rawImageNative;
img->UnPackTo(&rawImageNative);

const int width = rawImageNative.width;
const int height = rawImageNative.height;
// Read image properties directly from FlatBuffer
const int width = img->width();
const int height = img->height();
const auto* data = img->data();

if (width <= 0 || height <= 0 || rawImageNative.data.empty())
if (width <= 0 || height <= 0 || data == nullptr || data->size() == 0)
{
sendErrorReply("Invalid width and/or height or no raw image data provided");
return;
}

// check consistency of the size of the received data
int const bytesPerPixel = rawImageNative.data.size() / (width * height);
// Check consistency of image data size
const int dataSize = data->size();
const int bytesPerPixel = dataSize / (width * height);
if (bytesPerPixel != 3 && bytesPerPixel != 4)
{
sendErrorReply("Size of image data does not match with the width and height");
return;
}

imageRGB.resize(width, height);
processRawImage(rawImageNative, bytesPerPixel, _imageResampler, imageRGB);
// Only resize if needed (reuse memory)
if (_imageOutputBuffer.width() != width || _imageOutputBuffer.height() != height)
{
_imageOutputBuffer.resize(width, height);
}

processRawImage(data->data(), width, height, bytesPerPixel, _imageResampler, _imageOutputBuffer);
}
else if (image->data_as_NV12Image() != nullptr)
{
const auto* img = static_cast<const hyperionnet::NV12Image*>(image->data_as_NV12Image());

hyperionnet::NV12ImageT nv12ImageNative;
img->UnPackTo(&nv12ImageNative);

const int width = nv12ImageNative.width;
const int height = nv12ImageNative.height;

if (width <= 0 || height <= 0 || nv12ImageNative.data_y.empty() || nv12ImageNative.data_uv.empty())
{
sendErrorReply("Invalid width and/or height or no complete NV12 image data provided");
return;
}

imageRGB.resize(width, height);
processNV12Image(nv12ImageNative, _imageResampler, imageRGB);
const int width = img->width();
const int height = img->height();
const auto* data_y = img->data_y();
const auto* data_uv = img->data_uv();

if (width <= 0 || height <= 0 || data_y == nullptr || data_uv == nullptr ||
data_y->size() == 0 || data_uv->size() == 0)
{
sendErrorReply("Invalid width and/or height or no complete NV12 image data provided");
return;
}

// Combine Y and UV into one contiguous buffer (reuse class member buffer)
const size_t y_size = data_y->size();
const size_t uv_size = data_uv->size();

size_t required_size = y_size + uv_size;
if (_combinedNv12Buffer.capacity() < required_size)
{
_combinedNv12Buffer.reserve(required_size);
}
std::memcpy(_combinedNv12Buffer.data(), data_y->data(), y_size);
std::memcpy(_combinedNv12Buffer.data() + y_size, data_uv->data(), uv_size);

// Determine stride for Y
const int stride_y = img->stride_y() > 0 ? img->stride_y() : width;

// Resize only when needed
if (_imageOutputBuffer.width() != width || _imageOutputBuffer.height() != height)
{
_imageOutputBuffer.resize(width, height);
}

// Process image
processNV12Image(_combinedNv12Buffer.data(), width, height, stride_y, _imageResampler, _imageOutputBuffer);
}
else
{
sendErrorReply("No or unknown image data provided");
return;
}

emit setGlobalInputImage(_priority, imageRGB, duration);
emit setBufferImage("FlatBuffer", imageRGB);
emit setGlobalInputImage(_priority, _imageOutputBuffer, duration);
emit setBufferImage("FlatBuffer", _imageOutputBuffer);

// send reply
sendSuccessReply();
Expand Down Expand Up @@ -319,49 +357,41 @@ void FlatBufferClient::sendErrorReply(const QString& error)
sendMessage(_builder.GetBufferPointer(), _builder.GetSize());
}

inline void FlatBufferClient::processRawImage(const hyperionnet::RawImageT& raw_image, int bytesPerPixel, ImageResampler& resampler, Image<ColorRgb>& outputImage) {

int const width = raw_image.width;
int const height = raw_image.height;

inline void FlatBufferClient::processRawImage(const uint8_t* buffer,
int width,
int height,
int bytesPerPixel,
ImageResampler& resampler,
Image<ColorRgb>& outputImage)
{
int const lineLength = width * bytesPerPixel;
PixelFormat const pixelFormat = (bytesPerPixel == 4) ? PixelFormat::RGB32 : PixelFormat::RGB24;

// Process the image
resampler.processImage(
raw_image.data.data(), // Raw RGB/RGBA buffer
width, // Image width
height, // Image height
lineLength, // Line length
pixelFormat, // Pixel format (RGB24/RGB32)
outputImage // Output image
);
buffer, // Raw buffer
width,
height,
lineLength,
pixelFormat,
outputImage
);
}

inline void FlatBufferClient::processNV12Image(const hyperionnet::NV12ImageT& nv12_image, ImageResampler& resampler, Image<ColorRgb>& outputImage) {
// Combine data_y and data_uv into a single buffer
int const width = nv12_image.width;
int const height = nv12_image.height;

size_t const y_size = nv12_image.data_y.size();
size_t const uv_size = nv12_image.data_uv.size();
std::vector<uint8_t> combined_buffer(y_size + uv_size);

std::memcpy(combined_buffer.data(), nv12_image.data_y.data(), y_size);
std::memcpy(combined_buffer.data() + y_size, nv12_image.data_uv.data(), uv_size);

// Determine line length (stride_y)
int const lineLength = nv12_image.stride_y > 0 ? nv12_image.stride_y : width;

PixelFormat const pixelFormat = PixelFormat::NV12;
inline void FlatBufferClient::processNV12Image(const uint8_t* nv12_data,
int width,
int height,
int stride_y,
ImageResampler& resampler,
Image<ColorRgb>& outputImage)
{
PixelFormat pixelFormat = PixelFormat::NV12;

// Process the image
resampler.processImage(
combined_buffer.data(), // Combined NV12 buffer
width, // Image width
height, // Image height
lineLength, // Line length for Y plane
pixelFormat, // Pixel format (NV12)
outputImage // Output image
);
nv12_data, // Combined NV12 buffer
width,
height,
stride_y,
pixelFormat,
outputImage
);
}
11 changes: 7 additions & 4 deletions libsrc/flatbufserver/FlatBufferClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ private slots:
/// @brief Is called whenever the socket got new data to read
///
void readyRead();
void processNextMessage();

///
/// @brief Is called when the socket closed the connection, also requests thread exit
Expand Down Expand Up @@ -123,6 +122,9 @@ private slots:
///
void handleNotImplemented();

void processNextMessage();
bool processNextMessageInline();

///
/// Send a message to the connected client
/// @param data to be send
Expand All @@ -142,8 +144,8 @@ private slots:
///
void sendErrorReply(const QString& error);

void processRawImage(const hyperionnet::RawImageT& raw_image, int bytesPerPixel, ImageResampler& resampler, Image<ColorRgb>& outputImage);
void processNV12Image(const hyperionnet::NV12ImageT& nv12_image, ImageResampler& resampler, Image<ColorRgb>& outputImage);
void processRawImage(const uint8_t* buffer, int width, int height, int bytesPerPixel, ImageResampler& resampler, Image<ColorRgb>& outputImage);
void processNV12Image(const uint8_t* nv12_data, int width, int height, int stride_y, ImageResampler& resampler, Image<ColorRgb>& outputImage);

private:
Logger * _log;
Expand All @@ -156,12 +158,13 @@ private slots:

QByteArray _receiveBuffer;

Image<ColorRgb> _imageOutputBuffer;
ImageResampler _imageResampler;
std::vector<uint8_t> _combinedNv12Buffer;

// Flatbuffers builder
flatbuffers::FlatBufferBuilder _builder;
bool _processingMessage;
QByteArray _lastMessage;
};

#endif // FLATBUFFERCLIENT_H
12 changes: 8 additions & 4 deletions libsrc/flatbufserver/FlatBufferConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,21 @@ FlatBufferConnection::FlatBufferConnection(const QString& origin, const QHostAdd
// init connect
connectToRemoteHost();

// start the connection timer
_timer.setInterval(5000);

connect(&_timer, &QTimer::timeout, this, &FlatBufferConnection::connectToRemoteHost);

//Trigger the retry timer when connection dropped
connect(this, &FlatBufferConnection::isDisconnected, &_timer, static_cast<void (QTimer::*)()>(&QTimer::start));
_timer.start();
}

FlatBufferConnection::~FlatBufferConnection()
{
_timer.stop();

//Stop retrying on disconnect
disconnect(this, &FlatBufferConnection::isDisconnected, &_timer, static_cast<void (QTimer::*)()>(&QTimer::start));

Debug(_log, "Closing connection with host: %s, port [%u]", QSTRING_CSTR(_host.toString()), _port);
_socket.close();
}
Expand All @@ -58,7 +63,6 @@ void FlatBufferConnection::onDisconnected()
_isRegistered = false,
Info(_log, "Disconnected from target host: %s, port [%u]", QSTRING_CSTR(_host.toString()), _port);
emit isDisconnected();
_timer.start();
}


Expand Down Expand Up @@ -225,8 +229,8 @@ bool FlatBufferConnection::parseReply(const hyperionnet::Reply *reply)
}
else
{
_timer.stop();
_isRegistered = true;
_timer.stop();
Debug(_log,"Client \"%s\" registered successfully with target host: %s, port [%u]", QSTRING_CSTR(_origin), QSTRING_CSTR(_host.toString()), _port);
emit isReadyToSend();
}
Expand Down
Loading