diff --git a/lldb/include/lldb/Host/MainLoopBase.h b/lldb/include/lldb/Host/MainLoopBase.h index 7365ee7a65ee6..be9a2676e7443 100644 --- a/lldb/include/lldb/Host/MainLoopBase.h +++ b/lldb/include/lldb/Host/MainLoopBase.h @@ -13,8 +13,10 @@ #include "lldb/Utility/Status.h" #include "llvm/ADT/DenseMap.h" #include "llvm/Support/ErrorHandling.h" +#include #include #include +#include namespace lldb_private { @@ -38,6 +40,9 @@ class MainLoopBase { class ReadHandle; public: + using TimePoint = std::chrono::time_point; + MainLoopBase() : m_terminate_request(false) {} virtual ~MainLoopBase() = default; @@ -52,7 +57,18 @@ class MainLoopBase { // Add a pending callback that will be executed once after all the pending // events are processed. The callback will be executed even if termination // was requested. - void AddPendingCallback(const Callback &callback); + void AddPendingCallback(const Callback &callback) { + AddCallback(callback, std::chrono::steady_clock::time_point()); + } + + // Add a callback that will be executed after a certain amount of time has + // passed. + void AddCallback(const Callback &callback, std::chrono::nanoseconds delay) { + AddCallback(callback, std::chrono::steady_clock::now() + delay); + } + + // Add a callback that will be executed after a given point in time. + void AddCallback(const Callback &callback, TimePoint point); // Waits for registered events and invoke the proper callbacks. Returns when // all callbacks deregister themselves or when someone requests termination. @@ -69,14 +85,18 @@ class MainLoopBase { virtual void UnregisterReadObject(IOObject::WaitableHandle handle) = 0; - // Interrupt the loop that is currently waiting for events and execute - // the current pending callbacks immediately. - virtual void TriggerPendingCallbacks() = 0; + // Interrupt the loop that is currently waiting for events. + virtual void Interrupt() = 0; + + void ProcessCallbacks(); - void ProcessPendingCallbacks(); + std::optional GetNextWakeupTime(); std::mutex m_callback_mutex; - std::vector m_pending_callbacks; + std::priority_queue, + std::vector>, + llvm::on_first>> + m_callbacks; bool m_terminate_request : 1; private: diff --git a/lldb/include/lldb/Host/posix/MainLoopPosix.h b/lldb/include/lldb/Host/posix/MainLoopPosix.h index 1988dde7c65ae..e9ac798b948df 100644 --- a/lldb/include/lldb/Host/posix/MainLoopPosix.h +++ b/lldb/include/lldb/Host/posix/MainLoopPosix.h @@ -54,7 +54,7 @@ class MainLoopPosix : public MainLoopBase { void UnregisterReadObject(IOObject::WaitableHandle handle) override; void UnregisterSignal(int signo, std::list::iterator callback_it); - void TriggerPendingCallbacks() override; + void Interrupt() override; private: void ProcessReadObject(IOObject::WaitableHandle handle); @@ -88,8 +88,8 @@ class MainLoopPosix : public MainLoopBase { llvm::DenseMap m_read_fds; llvm::DenseMap m_signals; - Pipe m_trigger_pipe; - std::atomic m_triggering; + Pipe m_interrupt_pipe; + std::atomic m_interrupting = false; #if HAVE_SYS_EVENT_H int m_kqueue; #endif diff --git a/lldb/include/lldb/Host/windows/MainLoopWindows.h b/lldb/include/lldb/Host/windows/MainLoopWindows.h index 33e179e6c1286..3937a24645d95 100644 --- a/lldb/include/lldb/Host/windows/MainLoopWindows.h +++ b/lldb/include/lldb/Host/windows/MainLoopWindows.h @@ -34,7 +34,7 @@ class MainLoopWindows : public MainLoopBase { protected: void UnregisterReadObject(IOObject::WaitableHandle handle) override; - void TriggerPendingCallbacks() override; + void Interrupt() override; private: void ProcessReadObject(IOObject::WaitableHandle handle); @@ -45,7 +45,7 @@ class MainLoopWindows : public MainLoopBase { Callback callback; }; llvm::DenseMap m_read_fds; - void *m_trigger_event; + void *m_interrupt_event; }; } // namespace lldb_private diff --git a/lldb/source/Host/common/MainLoopBase.cpp b/lldb/source/Host/common/MainLoopBase.cpp index 030a4f0371681..64a57e65849e9 100644 --- a/lldb/source/Host/common/MainLoopBase.cpp +++ b/lldb/source/Host/common/MainLoopBase.cpp @@ -7,27 +7,43 @@ //===----------------------------------------------------------------------===// #include "lldb/Host/MainLoopBase.h" +#include using namespace lldb; using namespace lldb_private; -void MainLoopBase::AddPendingCallback(const Callback &callback) { +void MainLoopBase::AddCallback(const Callback &callback, TimePoint point) { + bool interrupt_needed; { std::lock_guard lock{m_callback_mutex}; - m_pending_callbacks.push_back(callback); + // We need to interrupt the main thread if this callback is scheduled to + // execute at an earlier time than the earliest callback registered so far. + interrupt_needed = m_callbacks.empty() || point < m_callbacks.top().first; + m_callbacks.emplace(point, callback); } - TriggerPendingCallbacks(); + if (interrupt_needed) + Interrupt(); } -void MainLoopBase::ProcessPendingCallbacks() { - // Move the callbacks to a local vector to avoid keeping m_pending_callbacks - // locked throughout the calls. - std::vector pending_callbacks; - { - std::lock_guard lock{m_callback_mutex}; - pending_callbacks = std::move(m_pending_callbacks); - } +void MainLoopBase::ProcessCallbacks() { + while (true) { + Callback callback; + { + std::lock_guard lock{m_callback_mutex}; + if (m_callbacks.empty() || + std::chrono::steady_clock::now() < m_callbacks.top().first) + return; + callback = std::move(m_callbacks.top().second); + m_callbacks.pop(); + } - for (const Callback &callback : pending_callbacks) callback(*this); + } +} + +std::optional MainLoopBase::GetNextWakeupTime() { + std::lock_guard lock(m_callback_mutex); + if (m_callbacks.empty()) + return std::nullopt; + return m_callbacks.top().first; } diff --git a/lldb/source/Host/posix/MainLoopPosix.cpp b/lldb/source/Host/posix/MainLoopPosix.cpp index 040af480771d4..519347dd0cfad 100644 --- a/lldb/source/Host/posix/MainLoopPosix.cpp +++ b/lldb/source/Host/posix/MainLoopPosix.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -67,6 +68,30 @@ static void SignalHandler(int signo, siginfo_t *info, void *) { assert(bytes_written == 1 || (bytes_written == -1 && errno == EAGAIN)); } +class ToTimeSpec { +public: + explicit ToTimeSpec(std::optional point) { + using namespace std::chrono; + + if (!point) { + m_ts_ptr = nullptr; + return; + } + nanoseconds dur = std::max(*point - steady_clock::now(), nanoseconds(0)); + m_ts_ptr = &m_ts; + m_ts.tv_sec = duration_cast(dur).count(); + m_ts.tv_nsec = (dur % seconds(1)).count(); + } + ToTimeSpec(const ToTimeSpec &) = delete; + ToTimeSpec &operator=(const ToTimeSpec &) = delete; + + operator struct timespec *() { return m_ts_ptr; } + +private: + struct timespec m_ts; + struct timespec *m_ts_ptr; +}; + class MainLoopPosix::RunImpl { public: RunImpl(MainLoopPosix &loop); @@ -99,8 +124,9 @@ Status MainLoopPosix::RunImpl::Poll() { for (auto &fd : loop.m_read_fds) EV_SET(&in_events[i++], fd.first, EVFILT_READ, EV_ADD, 0, 0, 0); - num_events = kevent(loop.m_kqueue, in_events.data(), in_events.size(), - out_events, std::size(out_events), nullptr); + num_events = + kevent(loop.m_kqueue, in_events.data(), in_events.size(), out_events, + std::size(out_events), ToTimeSpec(loop.GetNextWakeupTime())); if (num_events < 0) { if (errno == EINTR) { @@ -144,7 +170,7 @@ Status MainLoopPosix::RunImpl::Poll() { } if (ppoll(read_fds.data(), read_fds.size(), - /*timeout=*/nullptr, + ToTimeSpec(loop.GetNextWakeupTime()), /*sigmask=*/nullptr) == -1 && errno != EINTR) return Status(errno, eErrorTypePOSIX); @@ -165,27 +191,28 @@ void MainLoopPosix::RunImpl::ProcessReadEvents() { } #endif -MainLoopPosix::MainLoopPosix() : m_triggering(false) { - Status error = m_trigger_pipe.CreateNew(/*child_process_inherit=*/false); +MainLoopPosix::MainLoopPosix() { + Status error = m_interrupt_pipe.CreateNew(/*child_process_inherit=*/false); assert(error.Success()); // Make the write end of the pipe non-blocking. - int result = fcntl(m_trigger_pipe.GetWriteFileDescriptor(), F_SETFL, - fcntl(m_trigger_pipe.GetWriteFileDescriptor(), F_GETFL) | + int result = fcntl(m_interrupt_pipe.GetWriteFileDescriptor(), F_SETFL, + fcntl(m_interrupt_pipe.GetWriteFileDescriptor(), F_GETFL) | O_NONBLOCK); assert(result == 0); UNUSED_IF_ASSERT_DISABLED(result); - const int trigger_pipe_fd = m_trigger_pipe.GetReadFileDescriptor(); - m_read_fds.insert({trigger_pipe_fd, [trigger_pipe_fd](MainLoopBase &loop) { - char c; - ssize_t bytes_read = llvm::sys::RetryAfterSignal( - -1, ::read, trigger_pipe_fd, &c, 1); - assert(bytes_read == 1); - UNUSED_IF_ASSERT_DISABLED(bytes_read); - // NB: This implicitly causes another loop iteration - // and therefore the execution of pending callbacks. - }}); + const int interrupt_pipe_fd = m_interrupt_pipe.GetReadFileDescriptor(); + m_read_fds.insert( + {interrupt_pipe_fd, [interrupt_pipe_fd](MainLoopBase &loop) { + char c; + ssize_t bytes_read = + llvm::sys::RetryAfterSignal(-1, ::read, interrupt_pipe_fd, &c, 1); + assert(bytes_read == 1); + UNUSED_IF_ASSERT_DISABLED(bytes_read); + // NB: This implicitly causes another loop iteration + // and therefore the execution of pending callbacks. + }}); #if HAVE_SYS_EVENT_H m_kqueue = kqueue(); assert(m_kqueue >= 0); @@ -196,8 +223,8 @@ MainLoopPosix::~MainLoopPosix() { #if HAVE_SYS_EVENT_H close(m_kqueue); #endif - m_read_fds.erase(m_trigger_pipe.GetReadFileDescriptor()); - m_trigger_pipe.Close(); + m_read_fds.erase(m_interrupt_pipe.GetReadFileDescriptor()); + m_interrupt_pipe.Close(); assert(m_read_fds.size() == 0); assert(m_signals.size() == 0); } @@ -244,11 +271,9 @@ MainLoopPosix::RegisterSignal(int signo, const Callback &callback, sigset_t old_set; // Set signal info before installing the signal handler! - g_signal_info[signo].pipe_fd = m_trigger_pipe.GetWriteFileDescriptor(); + g_signal_info[signo].pipe_fd = m_interrupt_pipe.GetWriteFileDescriptor(); g_signal_info[signo].flag = 0; - // Even if using kqueue, the signal handler will still be invoked, so it's - // important to replace it with our "benign" handler. int ret = sigaction(signo, &new_action, &info.old_action); UNUSED_IF_ASSERT_DISABLED(ret); assert(ret == 0 && "sigaction failed"); @@ -307,8 +332,8 @@ Status MainLoopPosix::Run() { ProcessSignals(); - m_triggering = false; - ProcessPendingCallbacks(); + m_interrupting = false; + ProcessCallbacks(); } return Status(); } @@ -346,13 +371,13 @@ void MainLoopPosix::ProcessSignal(int signo) { } } -void MainLoopPosix::TriggerPendingCallbacks() { - if (m_triggering.exchange(true)) +void MainLoopPosix::Interrupt() { + if (m_interrupting.exchange(true)) return; char c = '.'; size_t bytes_written; - Status error = m_trigger_pipe.Write(&c, 1, bytes_written); + Status error = m_interrupt_pipe.Write(&c, 1, bytes_written); assert(error.Success()); UNUSED_IF_ASSERT_DISABLED(error); assert(bytes_written == 1); diff --git a/lldb/source/Host/windows/MainLoopWindows.cpp b/lldb/source/Host/windows/MainLoopWindows.cpp index c9aa6d339d8f4..0a5a35e9db9dd 100644 --- a/lldb/source/Host/windows/MainLoopWindows.cpp +++ b/lldb/source/Host/windows/MainLoopWindows.cpp @@ -21,14 +21,24 @@ using namespace lldb; using namespace lldb_private; +static DWORD ToTimeout(std::optional point) { + using namespace std::chrono; + + if (!point) + return WSA_INFINITE; + + nanoseconds dur = (std::max)(*point - steady_clock::now(), nanoseconds(0)); + return duration_cast(dur).count(); +} + MainLoopWindows::MainLoopWindows() { - m_trigger_event = WSACreateEvent(); - assert(m_trigger_event != WSA_INVALID_EVENT); + m_interrupt_event = WSACreateEvent(); + assert(m_interrupt_event != WSA_INVALID_EVENT); } MainLoopWindows::~MainLoopWindows() { assert(m_read_fds.empty()); - BOOL result = WSACloseEvent(m_trigger_event); + BOOL result = WSACloseEvent(m_interrupt_event); assert(result == TRUE); UNUSED_IF_ASSERT_DISABLED(result); } @@ -43,10 +53,11 @@ llvm::Expected MainLoopWindows::Poll() { events.push_back(info.event); } - events.push_back(m_trigger_event); + events.push_back(m_interrupt_event); - DWORD result = WSAWaitForMultipleEvents(events.size(), events.data(), FALSE, - WSA_INFINITE, FALSE); + DWORD result = + WSAWaitForMultipleEvents(events.size(), events.data(), FALSE, + ToTimeout(GetNextWakeupTime()), FALSE); for (auto &fd : m_read_fds) { int result = WSAEventSelect(fd.first, WSA_INVALID_EVENT, 0); @@ -54,9 +65,13 @@ llvm::Expected MainLoopWindows::Poll() { UNUSED_IF_ASSERT_DISABLED(result); } - if (result >= WSA_WAIT_EVENT_0 && result <= WSA_WAIT_EVENT_0 + events.size()) + if (result >= WSA_WAIT_EVENT_0 && result < WSA_WAIT_EVENT_0 + events.size()) return result - WSA_WAIT_EVENT_0; + // A timeout is treated as a (premature) signalization of the interrupt event. + if (result == WSA_WAIT_TIMEOUT) + return events.size() - 1; + return llvm::createStringError(llvm::inconvertibleErrorCode(), "WSAWaitForMultipleEvents failed"); } @@ -127,13 +142,11 @@ Status MainLoopWindows::Run() { ProcessReadObject(KV.first); } else { assert(*signaled_event == m_read_fds.size()); - WSAResetEvent(m_trigger_event); + WSAResetEvent(m_interrupt_event); } - ProcessPendingCallbacks(); + ProcessCallbacks(); } return Status(); } -void MainLoopWindows::TriggerPendingCallbacks() { - WSASetEvent(m_trigger_event); -} +void MainLoopWindows::Interrupt() { WSASetEvent(m_interrupt_event); } diff --git a/lldb/unittests/Host/MainLoopTest.cpp b/lldb/unittests/Host/MainLoopTest.cpp index 622a547fa22f0..e7425b737a6da 100644 --- a/lldb/unittests/Host/MainLoopTest.cpp +++ b/lldb/unittests/Host/MainLoopTest.cpp @@ -15,6 +15,7 @@ #include "llvm/Config/llvm-config.h" // for LLVM_ON_UNIX #include "llvm/Testing/Support/Error.h" #include "gtest/gtest.h" +#include #include #include @@ -106,13 +107,9 @@ TEST_F(MainLoopTest, NoSpuriousReads) { error); ASSERT_THAT_ERROR(error.ToError(), llvm::Succeeded()); // Terminate the loop after one second. - std::thread terminate_thread([&loop] { - std::this_thread::sleep_for(std::chrono::seconds(1)); - loop.AddPendingCallback( - [](MainLoopBase &loop) { loop.RequestTermination(); }); - }); + loop.AddCallback([](MainLoopBase &loop) { loop.RequestTermination(); }, + std::chrono::seconds(1)); ASSERT_THAT_ERROR(loop.Run().ToError(), llvm::Succeeded()); - terminate_thread.join(); // Make sure the callback was called only once. ASSERT_EQ(1u, callback_count); @@ -223,6 +220,61 @@ TEST_F(MainLoopTest, ManyPendingCallbacks) { ASSERT_TRUE(loop.Run().Success()); } +TEST_F(MainLoopTest, CallbackWithTimeout) { + MainLoop loop; + loop.AddCallback([](MainLoopBase &loop) { loop.RequestTermination(); }, + std::chrono::seconds(2)); + auto start = std::chrono::steady_clock::now(); + ASSERT_THAT_ERROR(loop.Run().takeError(), llvm::Succeeded()); + EXPECT_GE(std::chrono::steady_clock::now() - start, std::chrono::seconds(2)); +} + +TEST_F(MainLoopTest, TimedCallbacksRunInOrder) { + MainLoop loop; + auto start = std::chrono::steady_clock::now(); + std::chrono::milliseconds epsilon(10); + std::vector order; + auto add_cb = [&](int id) { + loop.AddCallback([&order, id](MainLoopBase &) { order.push_back(id); }, + start + id * epsilon); + }; + add_cb(3); + add_cb(2); + add_cb(4); + add_cb(1); + loop.AddCallback([](MainLoopBase &loop) { loop.RequestTermination(); }, + start + 5 * epsilon); + ASSERT_THAT_ERROR(loop.Run().takeError(), llvm::Succeeded()); + EXPECT_GE(std::chrono::steady_clock::now() - start, 5 * epsilon); + ASSERT_THAT(order, testing::ElementsAre(1, 2, 3, 4)); +} + +TEST_F(MainLoopTest, TimedCallbackShortensSleep) { + MainLoop loop; + auto start = std::chrono::steady_clock::now(); + bool long_callback_called = false; + loop.AddCallback( + [&](MainLoopBase &loop) { + long_callback_called = true; + loop.RequestTermination(); + }, + std::chrono::seconds(30)); + std::future async_run = + std::async(std::launch::async, &MainLoop::Run, std::ref(loop)); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + bool short_callback_called = false; + loop.AddCallback( + [&](MainLoopBase &loop) { + short_callback_called = true; + loop.RequestTermination(); + }, + std::chrono::seconds(1)); + ASSERT_THAT_ERROR(async_run.get().takeError(), llvm::Succeeded()); + EXPECT_LT(std::chrono::steady_clock::now() - start, std::chrono::seconds(10)); + EXPECT_TRUE(short_callback_called); + EXPECT_FALSE(long_callback_called); +} + #ifdef LLVM_ON_UNIX TEST_F(MainLoopTest, DetectsEOF) {