diff --git a/cloud/storage/core/libs/common/thread_pool.cpp b/cloud/storage/core/libs/common/thread_pool.cpp index e396261f9ab..6b72315c404 100644 --- a/cloud/storage/core/libs/common/thread_pool.cpp +++ b/cloud/storage/core/libs/common/thread_pool.cpp @@ -82,6 +82,7 @@ class TThreadPool final , SpinCycles(DurationToCyclesSafe(SPIN_TIMEOUT)) , MemoryTagScope(std::move(memoryTagScope)) , Workers(numWorkers) + , RunningWorkers(NumWorkers) { size_t i = 1; for (auto& worker: Workers) { @@ -97,8 +98,6 @@ class TThreadPool final void Start() override { - AtomicSet(RunningWorkers, NumWorkers); - for (auto& worker: Workers) { worker.Thread->Start(); } @@ -119,6 +118,8 @@ class TThreadPool final void Enqueue(ITaskPtr task) override { + Y_ABORT_UNLESS(AtomicGet(ShouldStop) == 0); + Queue.Enqueue(std::move(task)); if (AllocateWorker()) { diff --git a/cloud/storage/core/libs/common/thread_pool_ut.cpp b/cloud/storage/core/libs/common/thread_pool_ut.cpp index 8a092d98e89..31c5b33f9cc 100644 --- a/cloud/storage/core/libs/common/thread_pool_ut.cpp +++ b/cloud/storage/core/libs/common/thread_pool_ut.cpp @@ -6,6 +6,9 @@ #include +#include +#include + namespace NCloud { //////////////////////////////////////////////////////////////////////////////// @@ -30,6 +33,36 @@ Y_UNIT_TEST_SUITE(TThreadPoolTest) UNIT_ASSERT_EQUAL(future.GetValue(WaitTimeout), 42); } + + Y_UNIT_TEST(ShouldExecuteTaskEnqueuedBeforeStart) + { + auto threadPool = CreateThreadPool("thread", 1); + + std::latch enqueued{1}; + + std::thread thread( + [&]() mutable + { + enqueued.count_down(); + auto future = threadPool->Execute([] { return 42; }); + + UNIT_ASSERT_EQUAL(future.GetValue(WaitTimeout), 42); + }); + + enqueued.wait(); + + // Sleep to be sure that the thread will call the AllocateWorker + // function before the thread pool starts. + Sleep(TDuration::Seconds(1)); + + threadPool->Start(); + Y_DEFER + { + threadPool->Stop(); + }; + + thread.join(); + } } ////////////////////////////////////////////////////////////////////////////////