diff --git a/bindings/python/src/MessageQueueBindings.cpp b/bindings/python/src/MessageQueueBindings.cpp index d8b1a99276..c2077b7b00 100644 --- a/bindings/python/src/MessageQueueBindings.cpp +++ b/bindings/python/src/MessageQueueBindings.cpp @@ -81,6 +81,7 @@ void MessageQueueBindings::bind(pybind11::module& m, void* pCallstack) { .def("getMaxSize", &MessageQueue::getMaxSize, DOC(dai, MessageQueue, getMaxSize)) .def("getSize", &MessageQueue::getSize, DOC(dai, MessageQueue, getSize)) .def("isFull", &MessageQueue::isFull, DOC(dai, MessageQueue, isFull)) + .def("getFps", &MessageQueue::getFps, DOC(dai, MessageQueue, getFps)) .def("addCallback", addCallbackLambda, py::arg("callback"), DOC(dai, MessageQueue, addCallback)) .def("removeCallback", &MessageQueue::removeCallback, py::arg("callbackId"), DOC(dai, MessageQueue, removeCallback)) .def("has", static_cast(&MessageQueue::has), DOC(dai, MessageQueue, has)) diff --git a/include/depthai/pipeline/MessageQueue.hpp b/include/depthai/pipeline/MessageQueue.hpp index e7a99cfcde..d536fe41db 100644 --- a/include/depthai/pipeline/MessageQueue.hpp +++ b/include/depthai/pipeline/MessageQueue.hpp @@ -3,6 +3,8 @@ // std #include #include +#include +#include // project #include "depthai/pipeline/datatype/ADatatype.hpp" @@ -26,10 +28,13 @@ class MessageQueue : public std::enable_shared_from_this { private: static constexpr auto CLOSED_QUEUE_MESSAGE = "MessageQueue was closed"; + static constexpr size_t FPS_QUEUE_MAX_SIZE = 10; LockingQueue> queue; std::string name; std::mutex callbacksMtx; + std::mutex fpsMtx; std::unordered_map)>> callbacks; + std::deque fpsQueue; CallbackId uniqueCallbackId{0}; void callCallbacks(std::shared_ptr msg); @@ -128,6 +133,13 @@ class MessageQueue : public std::enable_shared_from_this { */ unsigned int isFull() const; + /** + * Gets current FPS of the queue + * + * @returns Current FPS + */ + float getFps(); + /** * Adds a callback on message received * diff --git a/src/pipeline/MessageQueue.cpp b/src/pipeline/MessageQueue.cpp index 5142cd0d3c..f5870b49d3 100644 --- a/src/pipeline/MessageQueue.cpp +++ b/src/pipeline/MessageQueue.cpp @@ -2,6 +2,7 @@ // std #include +#include #include // project @@ -70,6 +71,37 @@ unsigned int MessageQueue::isFull() const { return queue.isFull(); } +float MessageQueue::getFps() { + std::unique_lock lock(fpsMtx); + + // Get current time + auto now = std::chrono::steady_clock::now(); + auto threshold = now - std::chrono::seconds(2); + + // Remove timestamps older than 2 seconds + while (!fpsQueue.empty() && fpsQueue.front() < threshold) { + fpsQueue.pop_front(); + } + + // If fewer than 2 timestamps are in queue, not enough data to compute FPS + if(fpsQueue.size() < 2) { + return 0.0; + } + + auto oldest = fpsQueue.front(); + auto newest = fpsQueue.back(); + auto diff = std::chrono::duration(newest - oldest).count(); // seconds + + // If diff is extremely small, avoid dividing by zero + if(diff <= 0.0) { + return 0.0; + } + // Using (N - 1) frames over 'diff' seconds + // or (N) messages over 'diff' seconds—both approaches are common. + // This calculates how many frames we got over that time window. + return (fpsQueue.size() - 1) / diff; +} + int MessageQueue::addCallback(std::function)> callback) { // Lock first std::unique_lock lock(callbacksMtx); @@ -111,6 +143,16 @@ void MessageQueue::send(const std::shared_ptr& msg) { callCallbacks(msg); auto queueNotClosed = queue.push(msg); if(!queueNotClosed) throw QueueException(CLOSED_QUEUE_MESSAGE); + + // Record the timestamp for FPS calculation + { + std::unique_lock lock(fpsMtx); + auto now = std::chrono::steady_clock::now(); + fpsQueue.push_back(now); + if(fpsQueue.size() > FPS_QUEUE_MAX_SIZE) { + fpsQueue.pop_front(); + } + } } bool MessageQueue::send(const std::shared_ptr& msg, std::chrono::milliseconds timeout) { @@ -119,6 +161,14 @@ bool MessageQueue::send(const std::shared_ptr& msg, std::chrono::mill if(queue.isDestroyed()) { throw QueueException(CLOSED_QUEUE_MESSAGE); } + { + std::unique_lock lock(fpsMtx); + auto now = std::chrono::steady_clock::now(); + fpsQueue.push_back(now); + if(fpsQueue.size() > FPS_QUEUE_MAX_SIZE) { + fpsQueue.pop_front(); + } + } return queue.tryWaitAndPush(msg, timeout); } diff --git a/tests/src/onhost_tests/message_queue_test.cpp b/tests/src/onhost_tests/message_queue_test.cpp index 561447338b..68418eb1ae 100644 --- a/tests/src/onhost_tests/message_queue_test.cpp +++ b/tests/src/onhost_tests/message_queue_test.cpp @@ -395,3 +395,39 @@ TEST_CASE("Multi callbacks", "[MessageQueue]") { REQUIRE(callbackCount1 == 1); REQUIRE(callbackCount2 == 1); } + +TEST_CASE("MessageQueue - FPS Calculation", "[MessageQueue]") { + // Create a non-blocking queue to avoid blocking when the underlying queue is full + MessageQueue queue(10, false); + + // Ensure FPS starts at 0 + REQUIRE(queue.getFps() == 0.0); + + // Send 10 messages with a small delay + constexpr int NUM_MESSAGES = 10; + constexpr int DELAY_MS = 50; // 50ms delay between messages + + for (int i = 0; i < NUM_MESSAGES; ++i) { + auto msg = std::make_shared(); + queue.send(msg); + std::this_thread::sleep_for(std::chrono::milliseconds(DELAY_MS)); + } + + // Compute expected FPS: 10 messages over roughly 450ms ~20 FPS + double fps = queue.getFps(); + REQUIRE(fps > 15.0); // Should be around 20 FPS + REQUIRE(fps < 30.0); // Upper bound check + + // Send one more message, verify FPS updates consistently + auto msg = std::make_shared(); + queue.send(msg); + fps = queue.getFps(); + REQUIRE(fps > 15.0); // Still consistent with recent message rate + + // Wait long enough (more than 2 seconds) so that old timestamps drop out, + // causing the FPS to be recalculated from fewer (or no) messages. + std::this_thread::sleep_for(std::chrono::milliseconds(2100)); + fps = queue.getFps(); + // With fewer than 2 messages in the recent window, getFps() should return 0.0 + REQUIRE(fps == 0.0); +} \ No newline at end of file