Skip to content

Commit 910fe86

Browse files
committed
[重构Thread和ThreadPool模块]:改进状态管理和线程安全性
- **Thread类重构**: 将原子状态变量改为互斥锁保护,简化状态管理逻辑,修复竞争条件问题 - **状态转换优化**: 重新设计状态机转换流程,支持在Stopped状态下重新设置任务,提高线程复用性 - **等待机制改进**: 重构waitForFinished方法,支持无限等待和超时等待,修复条件变量使用方式 - **线程池任务类型统一**: 将ThreadPool任务类型统一为Thread::Task,支持stop_token传递,增强任务取消能力 - **线程池关闭逻辑增强**: 改进shutdown和shutdownNow机制,确保任务队列清理和线程安全停止 - **单元测试全面扩展**: 新增大量测试用例,覆盖边界条件、异常处理、性能场景和内存安全性验证 - **停止令牌支持**: 为所有任务添加stop_token参数,实现优雅的任务取消和资源清理机制 - **条件变量使用规范化**: 修复条件变量的等待条件,避免虚假唤醒和竞态条件问题
1 parent e656c85 commit 910fe86

File tree

4 files changed

+617
-264
lines changed

4 files changed

+617
-264
lines changed

src/Thread/thread.hpp

Lines changed: 36 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,9 @@
22

33
#include <utils/object.hpp>
44

5-
#include <atomic>
65
#include <chrono>
76
#include <condition_variable>
87
#include <functional>
9-
#include <future>
108
#include <iostream>
119
#include <mutex>
1210
#include <thread>
@@ -17,7 +15,7 @@ class Thread : noncopyable
1715
public:
1816
using Task = std::function<void(std::stop_token)>;
1917

20-
enum class State {
18+
enum class State : int {
2119
Idle, // 初始状态,未启动
2220
Starting, // 正在启动
2321
Running, // 正在运行
@@ -36,7 +34,7 @@ class Thread : noncopyable
3634
void setTask(Task task)
3735
{
3836
std::lock_guard<std::mutex> lock(m_mutex);
39-
if (getStateUnsafe() != State::Idle) {
37+
if (m_state != State::Idle && m_state != State::Stopped) {
4038
throw std::runtime_error("Cannot set task while thread is not idle");
4139
}
4240
m_task = std::move(task);
@@ -46,31 +44,17 @@ class Thread : noncopyable
4644
{
4745
std::unique_lock<std::mutex> lock(m_mutex);
4846

49-
// 检查状态
50-
if (getStateUnsafe() != State::Idle) {
47+
if (m_state != State::Idle || !m_task) {
5148
return false;
5249
}
5350

54-
// 检查任务
55-
if (!m_task) {
56-
return false;
57-
}
58-
59-
// 设置状态为启动中
60-
m_state.store(State::Starting, std::memory_order_release);
51+
m_state = State::Starting;
6152

6253
try {
63-
// 创建线程
6454
m_jthread = std::jthread([this](std::stop_token token) { threadMain(token); });
65-
66-
// 等待线程真正启动 - 修复:使用正确的条件变量等待方式
67-
m_condState.wait(lock, [this] {
68-
return getStateUnsafe() == State::Running || getStateUnsafe() == State::Stopped;
69-
});
70-
71-
return getStateUnsafe() == State::Running;
55+
return true;
7256
} catch (...) {
73-
m_state.store(State::Stopped, std::memory_order_release);
57+
m_state = State::Stopped;
7458
m_condState.notify_all();
7559
return false;
7660
}
@@ -87,56 +71,47 @@ class Thread : noncopyable
8771
}
8872
}
8973

90-
bool waitForFinished()
74+
bool waitForFinished(std::chrono::milliseconds timeout = std::chrono::milliseconds::max())
9175
{
9276
std::unique_lock<std::mutex> lock(m_mutex);
9377

94-
// 如果已经是停止状态,直接返回
95-
if (getStateUnsafe() == State::Stopped) {
96-
return true;
97-
}
98-
99-
// 如果还没有启动,也直接返回
100-
if (getStateUnsafe() == State::Idle) {
78+
// 如果已经是停止状态或空闲状态,直接返回
79+
if (m_state == State::Stopped || m_state == State::Idle) {
10180
return true;
10281
}
10382

104-
// 等待状态变为 Stopped - 修复:使用正确的条件变量等待方式
105-
m_condState.wait(lock, [this] { return getStateUnsafe() == State::Stopped; });
106-
107-
return true;
108-
}
109-
110-
bool waitForFinished(std::chrono::milliseconds timeout)
111-
{
112-
std::unique_lock<std::mutex> lock(m_mutex);
113-
114-
if (getStateUnsafe() == State::Stopped || getStateUnsafe() == State::Idle) {
83+
// 等待状态变为 Stopped
84+
if (timeout == std::chrono::milliseconds::max()) {
85+
m_condState.wait(lock, [this] { return m_state == State::Stopped; });
11586
return true;
87+
} else {
88+
return m_condState.wait_for(lock, timeout, [this] { return m_state == State::Stopped; });
11689
}
117-
118-
return m_condState.wait_for(lock, timeout, [this] {
119-
return getStateUnsafe() == State::Stopped;
120-
});
12190
}
12291

12392
[[nodiscard]] bool isRunning() const
12493
{
125-
auto state = m_state.load(std::memory_order_acquire);
126-
return state == State::Running || state == State::Starting;
94+
std::lock_guard<std::mutex> lock(m_mutex);
95+
return m_state == State::Running || m_state == State::Starting;
12796
}
12897

12998
[[nodiscard]] bool isStopped() const
13099
{
131-
return m_state.load(std::memory_order_acquire) == State::Stopped;
100+
std::lock_guard<std::mutex> lock(m_mutex);
101+
return m_state == State::Stopped;
132102
}
133103

134104
[[nodiscard]] bool isIdle() const
135105
{
136-
return m_state.load(std::memory_order_acquire) == State::Idle;
106+
std::lock_guard<std::mutex> lock(m_mutex);
107+
return m_state == State::Idle;
137108
}
138109

139-
[[nodiscard]] State getState() const { return m_state.load(std::memory_order_acquire); }
110+
[[nodiscard]] State getState() const
111+
{
112+
std::lock_guard<std::mutex> lock(m_mutex);
113+
return m_state;
114+
}
140115

141116
[[nodiscard]] bool isJoinable() const { return m_jthread.joinable(); }
142117

@@ -182,20 +157,10 @@ class Thread : noncopyable
182157
{
183158
{
184159
std::lock_guard<std::mutex> lock(m_mutex);
185-
186-
// 检查是否在启动过程中被停止
187-
if (token.stop_requested()) {
188-
m_state.store(State::Stopped, std::memory_order_release);
189-
m_condState.notify_all();
190-
return;
191-
}
192-
193-
// 标记为运行状态
194-
m_state.store(State::Running, std::memory_order_release);
195-
m_condState.notify_all();
160+
m_state = State::Running;
196161
}
162+
m_condState.notify_all();
197163

198-
// 执行用户任务
199164
try {
200165
if (m_task) {
201166
m_task(token);
@@ -206,48 +171,40 @@ class Thread : noncopyable
206171
std::cerr << "Unknown thread exception" << std::endl;
207172
}
208173

209-
// 标记线程结束
210174
{
211175
std::lock_guard<std::mutex> lock(m_mutex);
212-
m_state.store(State::Stopped, std::memory_order_release);
213-
m_condState.notify_all();
176+
m_state = State::Stopped;
214177
}
178+
m_condState.notify_all();
215179
}
216180

217181
void stopInternal(bool useTimeout,
218182
std::chrono::milliseconds timeout = std::chrono::milliseconds(0))
219183
{
220-
State currentState = m_state.load(std::memory_order_acquire);
221-
222-
// 如果已经是停止状态或空闲状态,直接返回
223-
if (currentState == State::Stopped || currentState == State::Idle) {
224-
return;
184+
{
185+
std::lock_guard<std::mutex> lock(m_mutex);
186+
if (m_state == State::Stopped || m_state == State::Idle) {
187+
return;
188+
}
225189
}
226190

227-
// 请求停止
228-
if (m_jthread.joinable()) {
229-
m_jthread.request_stop();
230-
}
191+
requestStop();
231192

232-
// 等待线程停止
233193
if (useTimeout) {
234194
waitForFinished(timeout);
235195
} else {
236196
waitForFinished();
237197
}
238198

239-
// 如果线程仍然可连接,强制join
240199
if (!useTimeout && m_jthread.joinable()) {
241200
m_jthread.join();
242201
}
243202
}
244203

245-
State getStateUnsafe() const { return m_state.load(std::memory_order_relaxed); }
246-
247204
private:
248205
std::jthread m_jthread;
249206
mutable std::mutex m_mutex;
250207
std::condition_variable m_condState;
251-
std::atomic<State> m_state{State::Idle};
208+
State m_state{State::Idle};
252209
Task m_task;
253210
};

0 commit comments

Comments
 (0)