diff --git a/src/BUILD b/src/BUILD index 915565585d..e956c2b16f 100644 --- a/src/BUILD +++ b/src/BUILD @@ -2910,6 +2910,7 @@ cc_library( "@com_google_googletest//:gtest", ], copts = COPTS_TESTS, + local_defines = COMMON_LOCAL_DEFINES, linkopts = [], ) cc_library( diff --git a/src/custom_nodes/common/buffersqueue.cpp b/src/custom_nodes/common/buffersqueue.cpp index d338eac258..5f022cc5e8 100644 --- a/src/custom_nodes/common/buffersqueue.cpp +++ b/src/custom_nodes/common/buffersqueue.cpp @@ -26,7 +26,7 @@ BuffersQueue::BuffersQueue(size_t singleBufferSize, int streamsLength) : size(singleBufferSize * streamsLength), memoryPool(std::make_unique(size)) { for (int i = 0; i < streamsLength; ++i) { - inferRequests.push_back(memoryPool.get() + i * singleBufferSize); + inferRequests.insert({i, memoryPool.get() + i * singleBufferSize}); } } diff --git a/src/ovinferrequestsqueue.cpp b/src/ovinferrequestsqueue.cpp index 798d02dbf1..e5ea3266ef 100644 --- a/src/ovinferrequestsqueue.cpp +++ b/src/ovinferrequestsqueue.cpp @@ -25,7 +25,7 @@ OVInferRequestsQueue::OVInferRequestsQueue(ov::CompiledModel& compiledModel, int for (int i = 0; i < streamsLength; ++i) { streams[i] = i; OV_LOGGER("ov::CompiledModel: {} compiledModel.create_infer_request()", reinterpret_cast(&compiledModel)); - inferRequests.push_back(compiledModel.create_infer_request()); + inferRequests.insert({i, compiledModel.create_infer_request()}); } } } // namespace ovms diff --git a/src/queue.hpp b/src/queue.hpp index 3047fa781d..285f1be59d 100644 --- a/src/queue.hpp +++ b/src/queue.hpp @@ -25,6 +25,7 @@ #include #include #include +#include // #include "profiler.hpp" @@ -135,7 +136,7 @@ class Queue { /** * */ - std::vector inferRequests; + std::unordered_map inferRequests; std::queue> promises; }; } // namespace ovms diff --git a/src/test/openvino_tests.cpp b/src/test/openvino_tests.cpp index b082c525a9..b893f8d023 100644 --- a/src/test/openvino_tests.cpp +++ b/src/test/openvino_tests.cpp @@ -50,7 +50,6 @@ TEST_F(OpenVINO, CallbacksTest) { Core core; auto model = core.read_model("/ovms/src/test/dummy/1/dummy.xml"); const std::string inputName{"b"}; - auto input = model->get_parameters().at(0); ov::element::Type_t dtype = ov::element::Type_t::f32; ov::Shape ovShape; ovShape.emplace_back(1); @@ -92,6 +91,92 @@ TEST_F(OpenVINO, CallbacksTest) { EXPECT_TRUE(outOvTensor.is()); EXPECT_TRUE(outAutoTensor.is()); } +TEST_F(OpenVINO, StressInferTest) { + GTEST_SKIP(); + Core core; + auto model = core.read_model("/ovms/src/test/dummy/1/dummy.xml"); + const std::string inputName{"b"}; + auto input = model->get_parameters().at(0); + ov::element::Type_t dtype = ov::element::Type_t::f32; + ov::Shape ovShape; + ovShape.emplace_back(1); + ovShape.emplace_back(100000); + std::map inputShapes; + inputShapes[inputName] = ovShape; + model->reshape(inputShapes); + auto cpuCompiledModel = core.compile_model(model, "CPU"); + std::vector inferRequests; + SPDLOG_INFO("Starting vector size:{}, vector capacity:{}", inferRequests.size(), inferRequests.capacity()); + inferRequests.resize(0); + SPDLOG_INFO("Starting vector size:{}, vector capacity:{}", inferRequests.size(), inferRequests.capacity()); + inferRequests.reserve(2); + SPDLOG_INFO("Starting vector size:{}, vector capacity:{}", inferRequests.size(), inferRequests.capacity()); + // inferRequests.shrink_to_fit(); + // we want to test workload when we increase number of infer requests vector during workload + // so we start with vector of 1, start workload on it + // then after 1s we start another thread which will add another infer request to the vector + // ideally we ensure that vector does realocate memory so it forces move of the objects inside it + + // first write function that will be done in thread. It will get reference to inferRequests vector + // it will create ov::Tensor with passed dtype and ovShape. It will set the content of that vector to i-th + // so that we will check content of response each time. It will perform workload until it gets signal by future + auto loadFunction = [&cpuCompiledModel, &inferRequests, inputName, dtype, ovShape](size_t i, std::future stopSignal) { + SPDLOG_INFO("Starting loadFunction:{}", i); + inferRequests.emplace_back(cpuCompiledModel.create_infer_request()); + SPDLOG_INFO("Starting shrinkToFit:{} vector size:{}, vector capacity:{}", i, inferRequests.size(), inferRequests.capacity()); + inferRequests.shrink_to_fit(); + SPDLOG_INFO("After shrinkToFit:{} vector size:{}, vector capacity:{}", i, inferRequests.size(), inferRequests.capacity()); + auto& inferRequest = inferRequests[i]; + // prepare ov::Tensor data + ov::Tensor inputOvTensor(dtype, ovShape); + ov::Tensor outputOvTensor(dtype, ovShape); + for (size_t j = 0; j < 100000; j++) { + reinterpret_cast(inputOvTensor.data())[j] = i; + reinterpret_cast(outputOvTensor.data())[j] = (i + 1); + if (j < 10 || j > 99990) { + SPDLOG_ERROR("input data: {}, expected: {}, i:{}, j:{}", reinterpret_cast(inputOvTensor.data())[j], reinterpret_cast(outputOvTensor.data())[j], i, j); + } + } + + // now while loop that stops only if we get stop signal + SPDLOG_INFO("Running infer request {}", i); + size_t k = 0; + while (stopSignal.wait_for(std::chrono::milliseconds(0)) == std::future_status::timeout) { + inferRequest.set_tensor(inputName, inputOvTensor); + inferRequest.start_async(); + inferRequest.wait(); + auto outOvTensor = inferRequest.get_tensor("a"); + for (size_t j = 0; j < 100000; j++) { + if (j < 10 || j > 99990) { + SPDLOG_ERROR("infReqRef:{} infReq[i]:{} outTensor data: {}, expected: {} i:{} j:{} k:{}", (void*)(&inferRequest), (void*)(&inferRequests[i]), reinterpret_cast(outOvTensor.data())[j], reinterpret_cast(outputOvTensor.data())[j], i, j, k); + } + } + ASSERT_EQ(0, std::memcmp(outOvTensor.data(), outputOvTensor.data(), outOvTensor.get_byte_size())) << "i: " << i; + ASSERT_EQ(0, std::memcmp(outOvTensor.data(), outputOvTensor.data(), outOvTensor.get_byte_size())) << "i: " << i; + k++; + } + }; + size_t n = 2; + std::vector> stopSignal(n); + std::vector threads; + threads.reserve(n); + for (size_t i = 0; i < n; ++i) { + // create thread that will run loadFunction + SPDLOG_INFO("Starting thread {}", i); + threads.emplace_back(loadFunction, i, stopSignal[i].get_future()); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + std::this_thread::sleep_for(std::chrono::seconds(5)); + for (size_t i = 0; i < n; ++i) { + // create thread that will run loadFunction + SPDLOG_INFO("Stopping thread {}", i); + stopSignal[i].set_value(); + } + for (size_t i = 0; i < n; ++i) { + SPDLOG_INFO("Joining thread {}", i); + threads[i].join(); + } +} TEST_F(OpenVINO, ResetOutputTensors) { Core core; auto model = core.read_model("/ovms/src/test/dummy/1/dummy.xml");