From a9dd1ca4335072d418742308018510ba80255e10 Mon Sep 17 00:00:00 2001 From: atobisze Date: Wed, 9 Jul 2025 15:27:02 +0200 Subject: [PATCH 1/2] Tryout --- src/BUILD | 1 + src/ovinferrequestsqueue.cpp | 3 +- src/ovinferrequestsqueue.hpp | 1 + src/queue.hpp | 20 +++++++-- src/test/openvino_tests.cpp | 86 +++++++++++++++++++++++++++++++++++- 5 files changed, 105 insertions(+), 6 deletions(-) diff --git a/src/BUILD b/src/BUILD index 915565585d..e956c2b16f 100644 --- a/src/BUILD +++ b/src/BUILD @@ -2910,6 +2910,7 @@ cc_library( "@com_google_googletest//:gtest", ], copts = COPTS_TESTS, + local_defines = COMMON_LOCAL_DEFINES, linkopts = [], ) cc_library( diff --git a/src/ovinferrequestsqueue.cpp b/src/ovinferrequestsqueue.cpp index 798d02dbf1..879512e443 100644 --- a/src/ovinferrequestsqueue.cpp +++ b/src/ovinferrequestsqueue.cpp @@ -21,7 +21,8 @@ namespace ovms { OVInferRequestsQueue::OVInferRequestsQueue(ov::CompiledModel& compiledModel, int streamsLength) : - Queue(streamsLength) { + Queue(streamsLength), + compiledModel(compiledModel) { for (int i = 0; i < streamsLength; ++i) { streams[i] = i; OV_LOGGER("ov::CompiledModel: {} compiledModel.create_infer_request()", reinterpret_cast(&compiledModel)); diff --git a/src/ovinferrequestsqueue.hpp b/src/ovinferrequestsqueue.hpp index 8b5dfb729a..dcbb8f564b 100644 --- a/src/ovinferrequestsqueue.hpp +++ b/src/ovinferrequestsqueue.hpp @@ -24,6 +24,7 @@ namespace ovms { class OVInferRequestsQueue : public Queue { public: OVInferRequestsQueue(ov::CompiledModel& compiledModel, int streamsLength); + ov::CompiledModel& compiledModel; }; } // namespace ovms diff --git a/src/queue.hpp b/src/queue.hpp index 3047fa781d..2912d69cbe 100644 --- a/src/queue.hpp +++ b/src/queue.hpp @@ -55,6 +55,16 @@ class Queue { return idleStreamFuture; } + void extendQueue() { + if (!constructFunc.has_value()) { + return; + } + size_t streamSize = streams.size(); + streams.push_back(streamSize - 1); + inferRequests.reserve(streams.size()); + inferRequests.push_back(constructFunc.value()()); + } + std::optional tryToGetIdleStream() { // OVMS_PROFILE_FUNCTION(); int value; @@ -69,7 +79,6 @@ class Queue { return value; } } - /** * @brief Release stream after execution */ @@ -95,13 +104,16 @@ class Queue { /** * @brief Constructor with initialization */ - Queue(int streamsLength) : - streams(streamsLength), + // change constructor so that it can also accept lambda which returns T. This lambda + // is optional but if exists it will be used to construct T objects + Queue(int streamsLength, std::optional> constructFunc = std::nullopt) : streams(streamsLength), + constructFunc(constructFunc), front_idx{0}, back_idx{0} { for (int i = 0; i < streamsLength; ++i) { streams[i] = i; } + streams.reserve(50); } /** @@ -116,7 +128,7 @@ class Queue { * @brief Vector representing circular buffer for infer queue */ std::vector streams; - + std::optional> constructFunc = std::nullopt; /** * @brief Index of the front of the idle streams list */ diff --git a/src/test/openvino_tests.cpp b/src/test/openvino_tests.cpp index b082c525a9..ecc8c4fabc 100644 --- a/src/test/openvino_tests.cpp +++ b/src/test/openvino_tests.cpp @@ -50,7 +50,6 @@ TEST_F(OpenVINO, CallbacksTest) { Core core; auto model = core.read_model("/ovms/src/test/dummy/1/dummy.xml"); const std::string inputName{"b"}; - auto input = model->get_parameters().at(0); ov::element::Type_t dtype = ov::element::Type_t::f32; ov::Shape ovShape; ovShape.emplace_back(1); @@ -92,6 +91,91 @@ TEST_F(OpenVINO, CallbacksTest) { EXPECT_TRUE(outOvTensor.is()); EXPECT_TRUE(outAutoTensor.is()); } +TEST_F(OpenVINO, StressInferTest) { + Core core; + auto model = core.read_model("/ovms/src/test/dummy/1/dummy.xml"); + const std::string inputName{"b"}; + auto input = model->get_parameters().at(0); + ov::element::Type_t dtype = ov::element::Type_t::f32; + ov::Shape ovShape; + ovShape.emplace_back(1); + ovShape.emplace_back(100000); + std::map inputShapes; + inputShapes[inputName] = ovShape; + model->reshape(inputShapes); + auto cpuCompiledModel = core.compile_model(model, "CPU"); + std::vector inferRequests; + SPDLOG_INFO("Starting vector size:{}, vector capacity:{}", inferRequests.size(), inferRequests.capacity()); + inferRequests.resize(0); + SPDLOG_INFO("Starting vector size:{}, vector capacity:{}", inferRequests.size(), inferRequests.capacity()); + inferRequests.reserve(2); + SPDLOG_INFO("Starting vector size:{}, vector capacity:{}", inferRequests.size(), inferRequests.capacity()); + //inferRequests.shrink_to_fit(); + // we want to test workload when we increase number of infer requests vector during workload + // so we start with vector of 1, start workload on it + // then after 1s we start another thread which will add another infer request to the vector + // ideally we ensure that vector does realocate memory so it forces move of the objects inside it + + // first write function that will be done in thread. It will get reference to inferRequests vector + // it will create ov::Tensor with passed dtype and ovShape. It will set the content of that vector to i-th + // so that we will check content of response each time. It will perform workload until it gets signal by future + auto loadFunction = [&cpuCompiledModel, &inferRequests, inputName, dtype, ovShape](size_t i, std::future stopSignal) { + SPDLOG_INFO("Starting loadFunction:{}", i); + inferRequests.emplace_back(cpuCompiledModel.create_infer_request()); + SPDLOG_INFO("Starting shrinkToFit:{} vector size:{}, vector capacity:{}", i, inferRequests.size(), inferRequests.capacity()); + inferRequests.shrink_to_fit(); + SPDLOG_INFO("After shrinkToFit:{} vector size:{}, vector capacity:{}", i, inferRequests.size(), inferRequests.capacity()); + auto& inferRequest = inferRequests[i]; + // prepare ov::Tensor data + ov::Tensor inputOvTensor(dtype, ovShape); + ov::Tensor outputOvTensor(dtype, ovShape); + for (size_t j = 0; j < 100000; j++) { + reinterpret_cast(inputOvTensor.data())[j] = i; + reinterpret_cast(outputOvTensor.data())[j] = (i + 1); + if (j<10 || j > 99990) { + SPDLOG_ERROR("input data: {}, expected: {}, i:{}, j:{}", reinterpret_cast(inputOvTensor.data())[j], reinterpret_cast(outputOvTensor.data())[j], i, j); + } + } + + // now while loop that stops only if we get stop signal + SPDLOG_INFO("Running infer request {}", i); + size_t k = 0; + while (stopSignal.wait_for(std::chrono::milliseconds(0)) == std::future_status::timeout) { + inferRequest.set_tensor(inputName, inputOvTensor); + inferRequest.start_async(); + inferRequest.wait(); + auto outOvTensor = inferRequest.get_tensor("a"); + for (size_t j = 0; j < 100000; j++) { + if (j<10 || j > 99990) { + SPDLOG_ERROR("infReqRef:{} infReq[i]:{} outTensor data: {}, expected: {} i:{} j:{} k:{}", (void*)(&inferRequest), (void*)(&inferRequests[i]),reinterpret_cast(outOvTensor.data())[j], reinterpret_cast(outputOvTensor.data())[j], i, j , k); + } + } + ASSERT_EQ(0, std::memcmp(outOvTensor.data(), outputOvTensor.data(), outOvTensor.get_byte_size())) << "i: " << i; + ASSERT_EQ(0, std::memcmp(outOvTensor.data(), outputOvTensor.data(), outOvTensor.get_byte_size())) << "i: " << i; + k++; + } + }; + size_t n = 2; + std::vector> stopSignal(n); + std::vector threads; + threads.reserve(n); + for (size_t i = 0; i < n; ++i) { + // create thread that will run loadFunction + SPDLOG_INFO("Starting thread {}", i); + threads.emplace_back(loadFunction, i, stopSignal[i].get_future()); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + std::this_thread::sleep_for(std::chrono::seconds(5)); + for (size_t i = 0; i < n; ++i) { + // create thread that will run loadFunction + SPDLOG_INFO("Stopping thread {}", i); + stopSignal[i].set_value(); + } + for (size_t i = 0; i < n; ++i) { + SPDLOG_INFO("Joining thread {}", i); + threads[i].join(); + } +} TEST_F(OpenVINO, ResetOutputTensors) { Core core; auto model = core.read_model("/ovms/src/test/dummy/1/dummy.xml"); From 7b9e3271bf0d5a9a0d1fb253595a5e8ac76e6b9e Mon Sep 17 00:00:00 2001 From: atobisze Date: Wed, 9 Jul 2025 15:49:52 +0200 Subject: [PATCH 2/2] Check map --- src/custom_nodes/common/buffersqueue.cpp | 2 +- src/ovinferrequestsqueue.cpp | 5 ++--- src/ovinferrequestsqueue.hpp | 1 - src/queue.hpp | 23 ++++++----------------- src/test/openvino_tests.cpp | 13 +++++++------ 5 files changed, 16 insertions(+), 28 deletions(-) diff --git a/src/custom_nodes/common/buffersqueue.cpp b/src/custom_nodes/common/buffersqueue.cpp index d338eac258..5f022cc5e8 100644 --- a/src/custom_nodes/common/buffersqueue.cpp +++ b/src/custom_nodes/common/buffersqueue.cpp @@ -26,7 +26,7 @@ BuffersQueue::BuffersQueue(size_t singleBufferSize, int streamsLength) : size(singleBufferSize * streamsLength), memoryPool(std::make_unique(size)) { for (int i = 0; i < streamsLength; ++i) { - inferRequests.push_back(memoryPool.get() + i * singleBufferSize); + inferRequests.insert({i, memoryPool.get() + i * singleBufferSize}); } } diff --git a/src/ovinferrequestsqueue.cpp b/src/ovinferrequestsqueue.cpp index 879512e443..e5ea3266ef 100644 --- a/src/ovinferrequestsqueue.cpp +++ b/src/ovinferrequestsqueue.cpp @@ -21,12 +21,11 @@ namespace ovms { OVInferRequestsQueue::OVInferRequestsQueue(ov::CompiledModel& compiledModel, int streamsLength) : - Queue(streamsLength), - compiledModel(compiledModel) { + Queue(streamsLength) { for (int i = 0; i < streamsLength; ++i) { streams[i] = i; OV_LOGGER("ov::CompiledModel: {} compiledModel.create_infer_request()", reinterpret_cast(&compiledModel)); - inferRequests.push_back(compiledModel.create_infer_request()); + inferRequests.insert({i, compiledModel.create_infer_request()}); } } } // namespace ovms diff --git a/src/ovinferrequestsqueue.hpp b/src/ovinferrequestsqueue.hpp index dcbb8f564b..8b5dfb729a 100644 --- a/src/ovinferrequestsqueue.hpp +++ b/src/ovinferrequestsqueue.hpp @@ -24,7 +24,6 @@ namespace ovms { class OVInferRequestsQueue : public Queue { public: OVInferRequestsQueue(ov::CompiledModel& compiledModel, int streamsLength); - ov::CompiledModel& compiledModel; }; } // namespace ovms diff --git a/src/queue.hpp b/src/queue.hpp index 2912d69cbe..285f1be59d 100644 --- a/src/queue.hpp +++ b/src/queue.hpp @@ -25,6 +25,7 @@ #include #include #include +#include // #include "profiler.hpp" @@ -55,16 +56,6 @@ class Queue { return idleStreamFuture; } - void extendQueue() { - if (!constructFunc.has_value()) { - return; - } - size_t streamSize = streams.size(); - streams.push_back(streamSize - 1); - inferRequests.reserve(streams.size()); - inferRequests.push_back(constructFunc.value()()); - } - std::optional tryToGetIdleStream() { // OVMS_PROFILE_FUNCTION(); int value; @@ -79,6 +70,7 @@ class Queue { return value; } } + /** * @brief Release stream after execution */ @@ -104,16 +96,13 @@ class Queue { /** * @brief Constructor with initialization */ - // change constructor so that it can also accept lambda which returns T. This lambda - // is optional but if exists it will be used to construct T objects - Queue(int streamsLength, std::optional> constructFunc = std::nullopt) : streams(streamsLength), - constructFunc(constructFunc), + Queue(int streamsLength) : + streams(streamsLength), front_idx{0}, back_idx{0} { for (int i = 0; i < streamsLength; ++i) { streams[i] = i; } - streams.reserve(50); } /** @@ -128,7 +117,7 @@ class Queue { * @brief Vector representing circular buffer for infer queue */ std::vector streams; - std::optional> constructFunc = std::nullopt; + /** * @brief Index of the front of the idle streams list */ @@ -147,7 +136,7 @@ class Queue { /** * */ - std::vector inferRequests; + std::unordered_map inferRequests; std::queue> promises; }; } // namespace ovms diff --git a/src/test/openvino_tests.cpp b/src/test/openvino_tests.cpp index ecc8c4fabc..b893f8d023 100644 --- a/src/test/openvino_tests.cpp +++ b/src/test/openvino_tests.cpp @@ -92,6 +92,7 @@ TEST_F(OpenVINO, CallbacksTest) { EXPECT_TRUE(outAutoTensor.is()); } TEST_F(OpenVINO, StressInferTest) { + GTEST_SKIP(); Core core; auto model = core.read_model("/ovms/src/test/dummy/1/dummy.xml"); const std::string inputName{"b"}; @@ -110,7 +111,7 @@ TEST_F(OpenVINO, StressInferTest) { SPDLOG_INFO("Starting vector size:{}, vector capacity:{}", inferRequests.size(), inferRequests.capacity()); inferRequests.reserve(2); SPDLOG_INFO("Starting vector size:{}, vector capacity:{}", inferRequests.size(), inferRequests.capacity()); - //inferRequests.shrink_to_fit(); + // inferRequests.shrink_to_fit(); // we want to test workload when we increase number of infer requests vector during workload // so we start with vector of 1, start workload on it // then after 1s we start another thread which will add another infer request to the vector @@ -132,8 +133,8 @@ TEST_F(OpenVINO, StressInferTest) { for (size_t j = 0; j < 100000; j++) { reinterpret_cast(inputOvTensor.data())[j] = i; reinterpret_cast(outputOvTensor.data())[j] = (i + 1); - if (j<10 || j > 99990) { - SPDLOG_ERROR("input data: {}, expected: {}, i:{}, j:{}", reinterpret_cast(inputOvTensor.data())[j], reinterpret_cast(outputOvTensor.data())[j], i, j); + if (j < 10 || j > 99990) { + SPDLOG_ERROR("input data: {}, expected: {}, i:{}, j:{}", reinterpret_cast(inputOvTensor.data())[j], reinterpret_cast(outputOvTensor.data())[j], i, j); } } @@ -146,9 +147,9 @@ TEST_F(OpenVINO, StressInferTest) { inferRequest.wait(); auto outOvTensor = inferRequest.get_tensor("a"); for (size_t j = 0; j < 100000; j++) { - if (j<10 || j > 99990) { - SPDLOG_ERROR("infReqRef:{} infReq[i]:{} outTensor data: {}, expected: {} i:{} j:{} k:{}", (void*)(&inferRequest), (void*)(&inferRequests[i]),reinterpret_cast(outOvTensor.data())[j], reinterpret_cast(outputOvTensor.data())[j], i, j , k); - } + if (j < 10 || j > 99990) { + SPDLOG_ERROR("infReqRef:{} infReq[i]:{} outTensor data: {}, expected: {} i:{} j:{} k:{}", (void*)(&inferRequest), (void*)(&inferRequests[i]), reinterpret_cast(outOvTensor.data())[j], reinterpret_cast(outputOvTensor.data())[j], i, j, k); + } } ASSERT_EQ(0, std::memcmp(outOvTensor.data(), outputOvTensor.data(), outOvTensor.get_byte_size())) << "i: " << i; ASSERT_EQ(0, std::memcmp(outOvTensor.data(), outputOvTensor.data(), outOvTensor.get_byte_size())) << "i: " << i;