Skip to content

Commit af3501a

Browse files
committed
[#26731] docdb: enable test for process supervisor
Summary: Reworks a regression test for the bug #26731 so it can be enabled. The original bug as a seg fault in `process_supervisor` when starting a process failed. 0f4ec65 fixed this issue and added a unit test. However the unit test reproduced the failure to spawn a process by exhausting the system's thread count, spawning threads in a loop. This test destabilized the machine it ran on so it was disabled. This diff reworks the test, using a test fake to reproduce the desired failure to spawn a new process. The test runs in under a second and it reproduces the original bug when the prod fix is removed. Jira: DB-16106 Test Plan: ``` ./yb_build.sh release --cxx-test process_wrapper-test --gtest_filter 'ProcessWrapperTest.ProcessWrapperStartFails' -n 100 --stop-at-failure ``` This test fails w/ a segfault [0] when the fix is removed (see diff at [1]). [0] ``` I0923 14:00:57.549199 1832153088 process_wrapper.cc:104] Restarting test sleep process process W0923 14:00:57.549203 1832153088 process_wrapper.cc:107] Failed trying to start test sleep process process: Illegal state (yb/yql/process_wrapper/process_wrapper-test.cc:164): Failed to spawn sleep process due to count hit, waiting a bit *** Aborted at 1758650458 (unix time) try "date -d @1758650458" if you are using GNU date *** PC: @ 0x0 perftools_pthread_specific_vals *** SIGSEGV (@0xe0) received by PID 33927 (TID 0x16d347000) stack trace: *** @ 0x1906bb584 _sigtramp @ 0x1031d89d8 yb::ProcessSupervisor::RunThread() @ 0x1031d89d8 yb::ProcessSupervisor::RunThread() @ 0x10750e408 yb::Thread::SuperviseThread() @ 0x19068af94 _pthread_start @ 0x190685d34 thread_start ``` [1] ``` % git diff diff --git a/src/yb/yql/process_wrapper/process_wrapper.cc b/src/yb/yql/process_wrapper/process_wrapper.cc index 7498ffb..01b172a63b 100644 --- a/src/yb/yql/process_wrapper/process_wrapper.cc +++ b/src/yb/yql/process_wrapper/process_wrapper.cc @@ -66,7 +66,7 @@ void ProcessSupervisor::RunThread() { }); std::string process_name = GetProcessName(); while (true) { - if (process_wrapper_) { + // if (process_wrapper_) { Result<int> wait_result = process_wrapper_->Wait(); if (wait_result.ok()) { int ret_code = *wait_result; @@ -87,7 +87,7 @@ void ProcessSupervisor::RunThread() { continue; } } - } + // } { UniqueLock lock(mtx_); ``` Reviewers: timur Reviewed By: timur Subscribers: rthallam, ybase, yql Differential Revision: https://phorge.dev.yugabyte.com/D46953
1 parent 9fa72e7 commit af3501a

File tree

1 file changed

+89
-72
lines changed

1 file changed

+89
-72
lines changed

src/yb/yql/process_wrapper/process_wrapper-test.cc

Lines changed: 89 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
// under the License.
1212

1313
#include <chrono>
14+
#include <latch>
1415
#include <optional>
1516
#include <thread>
1617

@@ -27,94 +28,57 @@ using namespace std::literals;
2728

2829
namespace yb {
2930

30-
class TestSleepProcessWrapper : public ProcessWrapper {
31+
class FailStartProcessWrapper : public ProcessWrapper {
3132
public:
32-
explicit TestSleepProcessWrapper(int sleep) : sleep_(sleep) {}
33+
explicit FailStartProcessWrapper(bool succeed_on_start);
3334

34-
Status PreflightCheck() override {
35-
return Status::OK();
36-
}
35+
Status PreflightCheck() override;
36+
Status ReloadConfig() override;
3737

38-
Status ReloadConfig() override {
39-
return Status::OK();
40-
}
38+
Status UpdateAndReloadConfig() override;
4139

42-
Status UpdateAndReloadConfig() override {
43-
return Status::OK();
44-
}
40+
Status Start() override;
4541

46-
Status Start() override {
47-
std::vector<std::string> argv{"/usr/bin/sleep", AsString(sleep_)};
48-
proc_.emplace(argv[0], argv);
49-
RETURN_NOT_OK(proc_->Start());
50-
LOG(INFO) << "Started pid: " << proc_->pid();
51-
return Status::OK();
52-
}
42+
Status WaitForStart();
5343

5444
private:
55-
int sleep_;
45+
bool succeed_on_start_;
46+
Status start_status_;
47+
std::latch started_;
5648
};
5749

58-
class TestSleepProcessSupervisor : public ProcessSupervisor {
50+
class FailStartProcessSupervisor : public ProcessSupervisor {
5951
public:
60-
explicit TestSleepProcessSupervisor(int sleep): sleep_(sleep) {}
61-
~TestSleepProcessSupervisor() {
62-
LOG(INFO) << "~TestSleepProcessSupervisor";
63-
}
64-
65-
std::shared_ptr<ProcessWrapper> CreateProcessWrapper() override {
66-
return std::make_shared<TestSleepProcessWrapper>(sleep_);
67-
}
52+
FailStartProcessSupervisor();
6853

69-
std::string GetProcessName() override {
70-
return "test sleep process";
71-
}
72-
private:
73-
int sleep_;
74-
};
54+
std::shared_ptr<ProcessWrapper> CreateProcessWrapper() override;
7555

76-
// Test is disabled because it will cause test server instability.
77-
TEST(ProcessWrapperTest, YB_DISABLE_TEST(HitThreadsLimit)) {
78-
std::vector<std::unique_ptr<TestSleepProcessSupervisor>> supervisors;
56+
std::string GetProcessName() override;
7957

80-
// This process is supposed to restart in a loop to reproduce crash after hitting limit on
81-
// number of threads.
82-
TestSleepProcessSupervisor supervisor(/* sleep = */ 1);
83-
ASSERT_OK(supervisor.Start());
58+
std::shared_ptr<FailStartProcessWrapper> WaitForProcessSpawn(uint64_t count);
8459

85-
std::atomic<bool> stop{false};
86-
std::vector<scoped_refptr<Thread>> threads;
87-
88-
auto deadline = CoarseMonoClock::now() + 120s;
89-
bool hit_threads_limit = false;
90-
while (CoarseMonoClock::now() < deadline) {
91-
scoped_refptr<Thread> thread;
92-
auto status = Thread::Create(
93-
"termination_monitor", "sigterm_loop",
94-
[&stop]() {
95-
while (!stop.load(std::memory_order_acquire)) {
96-
std::this_thread::sleep_for(50ms);
97-
}
98-
},
99-
&thread);
100-
if (!status.ok()) {
101-
LOG(INFO) << "Failed to start thread: " << status;
102-
if (!hit_threads_limit) {
103-
// Give process time to attempt restarts and reproduce the bug.
104-
deadline = CoarseMonoClock::now() + 3s;
105-
hit_threads_limit = true;
106-
}
107-
continue;
108-
}
109-
threads.push_back(thread);
110-
YB_LOG_EVERY_N_SECS(INFO, 5) << "Threads running: " << threads.size();
111-
}
60+
void SetSucceedProcessStart(bool succeed_process_start);
11261

113-
stop.store(true);
114-
for (auto& thread : threads) {
115-
thread->Join();
116-
}
62+
private:
63+
uint64_t start_count_{0};
64+
std::mutex mutex_;
65+
std::condition_variable cv_;
66+
std::atomic_bool succeed_process_start_;
67+
std::shared_ptr<FailStartProcessWrapper> last_spawned_proc_;
68+
};
11769

70+
TEST(ProcessWrapperTest, ProcessWrapperStartFails) {
71+
FailStartProcessSupervisor supervisor;
72+
ASSERT_OK(supervisor.Start());
73+
auto first_proc = supervisor.WaitForProcessSpawn(1);
74+
ASSERT_OK(first_proc->WaitForStart());
75+
// Now fail the next process start.
76+
supervisor.SetSucceedProcessStart(false);
77+
ASSERT_OK(supervisor.Restart());
78+
// Give 2 go rounds to ensure the process runner thread has dealt with the Start() failure.
79+
auto last_failed_proc = supervisor.WaitForProcessSpawn(3);
80+
ASSERT_NOK(last_failed_proc->WaitForStart());
81+
supervisor.SetSucceedProcessStart(true);
11882
supervisor.Stop();
11983
}
12084

@@ -184,6 +148,59 @@ TEST(TestProcessSupervisor, RestartOnUnStarted) {
184148
ASSERT_TRUE(s.IsIllegalState());
185149
}
186150

151+
FailStartProcessWrapper::FailStartProcessWrapper(bool succeed_on_start)
152+
: succeed_on_start_(succeed_on_start), started_(1) {}
153+
154+
Status FailStartProcessWrapper::PreflightCheck() { return Status::OK(); }
155+
156+
Status FailStartProcessWrapper::ReloadConfig() { return Status::OK(); }
157+
158+
Status FailStartProcessWrapper::UpdateAndReloadConfig() { return Status::OK(); }
159+
160+
Status FailStartProcessWrapper::Start() {
161+
if (!succeed_on_start_) {
162+
start_status_ = STATUS(IllegalState, "Failed to spawn sleep process due to count hit");
163+
started_.count_down();
164+
return start_status_;
165+
}
166+
std::vector<std::string> argv{"cat"};
167+
proc_.emplace("/bin/cat", argv);
168+
start_status_ = proc_->Start();
169+
started_.count_down();
170+
return start_status_;
171+
}
172+
173+
Status FailStartProcessWrapper::WaitForStart() {
174+
started_.wait();
175+
return start_status_;
176+
}
177+
178+
FailStartProcessSupervisor::FailStartProcessSupervisor() : succeed_process_start_(true) {}
179+
180+
std::shared_ptr<ProcessWrapper> FailStartProcessSupervisor::CreateProcessWrapper() {
181+
auto proc = std::make_shared<FailStartProcessWrapper>(succeed_process_start_);
182+
{
183+
std::lock_guard<std::mutex> lock(mutex_);
184+
start_count_++;
185+
last_spawned_proc_ = proc;
186+
}
187+
cv_.notify_all();
188+
return proc;
189+
}
190+
191+
std::string FailStartProcessSupervisor::GetProcessName() { return "test_process"; }
192+
193+
std::shared_ptr<FailStartProcessWrapper> FailStartProcessSupervisor::WaitForProcessSpawn(
194+
uint64_t count) {
195+
std::unique_lock<std::mutex> lock(mutex_);
196+
cv_.wait(lock, [this, &count] { return start_count_ >= count; });
197+
return last_spawned_proc_;
198+
}
199+
200+
void FailStartProcessSupervisor::SetSucceedProcessStart(bool succeed_process_start) {
201+
succeed_process_start_ = succeed_process_start;
202+
}
203+
187204
Status TestCatProcessWrapper::PreflightCheck() { return Status::OK(); }
188205

189206
Status TestCatProcessWrapper::ReloadConfig() { return Status::OK(); }

0 commit comments

Comments
 (0)