Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cloud/storage/core/libs/common/thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class TThreadPool final
worker.Name = TStringBuilder() << threadName << i++;
worker.Thread = std::make_unique<TWorkerThread>(*this, worker);
}
AtomicSet(RunningWorkers, NumWorkers);
}

~TThreadPool() override
Expand All @@ -97,8 +98,6 @@ class TThreadPool final

void Start() override
{
AtomicSet(RunningWorkers, NumWorkers);

for (auto& worker: Workers) {
worker.Thread->Start();
}
Expand Down
33 changes: 33 additions & 0 deletions cloud/storage/core/libs/common/thread_pool_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

#include <util/generic/scope.h>

#include <thread>

namespace NCloud {

////////////////////////////////////////////////////////////////////////////////
Expand All @@ -30,6 +32,37 @@ Y_UNIT_TEST_SUITE(TThreadPoolTest)

UNIT_ASSERT_EQUAL(future.GetValue(WaitTimeout), 42);
}

Y_UNIT_TEST(ShouldExecuteTaskEnqueuedBeforeStart)
{
auto threadPool = CreateThreadPool("thread", 1);

auto promise = NThreading::NewPromise();

auto future = promise.GetFuture();

std::thread thread(
[threadPool, promise = std::move(promise)]() mutable
{
promise.SetValue();
auto future = threadPool->Execute([] { return 42; });

UNIT_ASSERT_EQUAL(future.GetValue(WaitTimeout), 42);
});

future.GetValueSync();

// Sleep to be sure that task will be enqueued before start.
Sleep(TDuration::Seconds(1));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А если вызвать promise.SetValue(); после threadPool->Execute ?

Можно взять latch для синхронизации:

auto threadPool = CreateThreadPool("thread", 1);

std::latch enqueued{1};

std::thread thread([&] {
    auto future = threadPool->Execute([] { return 42; });
    enqueued.count_down();

    UNIT_ASSERT_EQUAL(42, future.GetValue(WaitTimeout));
});

enqueued.wait();

threadPool->Start();
threadPool->Stop();

thread.join();

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

я хотел написать тест который будет крашиться в отсутствии фикса.
Если фикса нет, то threadPool->Execute зависнет пока не вызовется threadPool->Start() т.е. будет дедлок.
Как бы да зафейлится тест но хотелось бы краш
по хорошему надо было бы вставить count_down после AllocateWorker

void Enqueue(ITaskPtr task) override
{
Queue.Enqueue(std::move(task));
if (AllocateWorker()) {
WakeUpWorker();
}
}

но так сделать понятно не получится. Так что не понятно как воспроизвести стабильно краш без слипов и без того чтобы лезть в кишки тред пула

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

enqueued.wait();
// Sleep to make it more likely that ReleaseWorker is called before starting the thread pool
Sleep(1s);
threadPool->Start();


threadPool->Start();
Y_DEFER
{
threadPool->Stop();
};

thread.join();
}
}

////////////////////////////////////////////////////////////////////////////////
Expand Down
Loading