Skip to content

Commit 0879a53

Browse files
Fea, 添加TaskRunner::AsyncSleep、TaskRunner::AsyncWaitForObject
1 parent ec0dcd7 commit 0879a53

File tree

3 files changed

+260
-4
lines changed

3 files changed

+260
-4
lines changed

UnitTest/TaskRunnerUnitTest.cpp

Lines changed: 72 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ namespace TaskRunnerUnitTest
367367
return false;
368368
});
369369

370-
Assert::IsTrue(((TaskEntry*)_pWait.Get())->Wait(600ul));
370+
Assert::IsTrue(((TaskEntry*)_pWait.Get())->WaitTask(YY::TimeSpan::FromMilliseconds(600ul)));
371371
Assert::AreEqual(DWORD(_uWaitResult), DWORD(WAIT_TIMEOUT));
372372

373373
_pWait = _pTaskRunner->CreateWait(
@@ -379,7 +379,7 @@ namespace TaskRunnerUnitTest
379379
return false;
380380
});
381381
SetEvent(_hEvent);
382-
Assert::IsTrue(((TaskEntry*)_pWait.Get())->Wait(100ul));
382+
Assert::IsTrue(((TaskEntry*)_pWait.Get())->WaitTask(YY::TimeSpan::FromMilliseconds(100ul)));
383383
Assert::AreEqual(DWORD(_uWaitResult), DWORD(WAIT_OBJECT_0));
384384
// CloseHandle(_hEvent);
385385
}
@@ -836,7 +836,7 @@ namespace TaskRunnerUnitTest
836836
return false;
837837
});
838838

839-
Assert::IsTrue(((TaskEntry*)_pWait.Get())->Wait(600ul));
839+
Assert::IsTrue(_pWait->WaitTask(YY::TimeSpan::FromMilliseconds(600ul)));
840840
Assert::AreEqual(DWORD(_uWaitResult), DWORD(WAIT_TIMEOUT));
841841

842842
_pWait = _pTaskRunner->CreateWait(
@@ -848,8 +848,76 @@ namespace TaskRunnerUnitTest
848848
return false;
849849
});
850850
SetEvent(_hEvent);
851-
Assert::IsTrue(((TaskEntry*)_pWait.Get())->Wait(100ul));
851+
Assert::IsTrue((_pWait->WaitTask(YY::TimeSpan::FromMilliseconds(100ul))));
852852
// CloseHandle(_hEvent);
853853
}
854854
};
855+
856+
TEST_CLASS(CommonTaskRunnerUnitTest)
857+
{
858+
public:
859+
#if defined(_HAS_CXX20) && _HAS_CXX20
860+
TEST_METHOD(AsyncSleep)
861+
{
862+
const auto _hrPending = E_PENDING;
863+
864+
auto _pTaskRunner = SequencedTaskRunner::Create();
865+
866+
auto _pHr = std::make_shared<HRESULT>(_hrPending);
867+
std::weak_ptr<HRESULT> _pWeakHr = _pHr;
868+
_pTaskRunner->PostTask(
869+
[_pWeakHr]() -> YY::Coroutine<void>
870+
{
871+
auto _pHr = _pWeakHr.lock();
872+
if (!_pHr)
873+
co_return;
874+
875+
*_pHr = co_await YY::TaskRunner::AsyncSleep(YY::TimeSpan::FromMilliseconds(500));
876+
WakeByAddressAll(_pHr.get());
877+
co_return;
878+
});
879+
880+
::WaitOnAddress((volatile void*)_pHr.get(), (void*)&_hrPending, sizeof(_hrPending), 2000);
881+
882+
Assert::AreEqual(HRESULT(S_OK), HRESULT(*_pHr));
883+
}
884+
#endif
885+
886+
#if defined(_HAS_CXX20) && _HAS_CXX20 && defined(_WIN32)
887+
TEST_METHOD(AsyncWaitForObject)
888+
{
889+
const auto _hrPending = E_PENDING;
890+
891+
auto _pTaskRunner = SequencedTaskRunner::Create();
892+
893+
auto _pHr = std::make_shared<HRESULT>(_hrPending);
894+
std::weak_ptr<HRESULT> _pWeakHr = _pHr;
895+
_pTaskRunner->PostTask(
896+
[_pWeakHr]() -> YY::Coroutine<void>
897+
{
898+
auto _pHr = _pWeakHr.lock();
899+
if (!_pHr)
900+
co_return;
901+
902+
auto _hEvent = CreateEventW(nullptr, FALSE, FALSE, nullptr);
903+
904+
auto _uResult = co_await YY::TaskRunner::AsyncWaitForObject(_hEvent, YY::TimeSpan::FromMilliseconds(100));
905+
906+
Assert::AreEqual(DWORD(WAIT_TIMEOUT), DWORD(_uResult));
907+
908+
SetEvent(_hEvent);
909+
_uResult = co_await YY::TaskRunner::AsyncWaitForObject(_hEvent, YY::TimeSpan::FromMilliseconds(100));
910+
Assert::AreEqual(DWORD(WAIT_OBJECT_0), DWORD(_uResult));
911+
912+
*_pHr.get() = S_OK;
913+
WakeByAddressAll(_pHr.get());
914+
co_return;
915+
});
916+
917+
::WaitOnAddress((volatile void*)_pHr.get(), (void*)&_hrPending, sizeof(_hrPending), 2000);
918+
919+
Assert::AreEqual(HRESULT(S_OK), HRESULT(*_pHr));
920+
}
921+
#endif
922+
};
855923
} // namespace UnitTest

include/YY/Base/Threading/TaskRunner.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,30 @@ namespace YY
333333

334334
virtual TaskRunnerStyle __YYAPI GetStyle() const noexcept = 0;
335335

336+
#if defined(_HAS_CXX20) && _HAS_CXX20
337+
/// <summary>
338+
/// 返回一个用于异步睡眠的等待器,该等待器在指定时间间隔后完成并提供 HRESULT 结果。
339+
/// </summary>
340+
/// <param name="_uAfter">要等待的时间间隔(TimeSpan),表示异步睡眠的持续时间。</param>
341+
/// <returns>一个 TaskAwaiter<HRESULT>,可用于等待异步睡眠操作完成并获取表示操作结果的 HRESULT。</returns>
342+
static TaskAwaiter<HRESULT> __YYAPI AsyncSleep(_In_ TimeSpan _uAfter);
343+
#endif
344+
345+
#if defined(_HAS_CXX20) && _HAS_CXX20 && defined(_WIN32)
346+
/// <summary>
347+
/// 异步等待指定对象并返回表示等待结果的 awaiter。
348+
/// </summary>
349+
/// <param name="_hHandle">要等待的对象句柄。</param>
350+
/// <param name="_iWaitTimeOut">等待超时时间;可选,默认为 TimeSpan::GetMax()(表示最大/无限超时)。</param>
351+
/// <returns>返回一个 TaskAwaiter<DWORD>,用于异步等待并在完成后获取等待结果,其结果可能值为:
352+
/// * WAIT_ABANDONED
353+
/// * WAIT_OBJECT_0
354+
/// * WAIT_TIMEOUT
355+
/// * WAIT_FAILED
356+
/// </returns>
357+
static TaskAwaiter<DWORD> __YYAPI AsyncWaitForObject(_In_ HANDLE _hHandle, _In_ TimeSpan _iWaitTimeOut = TimeSpan::GetMax());
358+
#endif
359+
336360
/// <summary>
337361
/// 将任务异步执行。
338362
/// </summary>

src/YY/Base/Threading/TaskRunner.cpp

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,170 @@ namespace YY
162162
return TaskRunnerDispatch::Get()->StartIo();
163163
}
164164

165+
#if defined(_HAS_CXX20) && _HAS_CXX20
166+
TaskAwaiter<HRESULT>__YYAPI TaskRunner::AsyncSleep(TimeSpan _uAfter)
167+
{
168+
const auto _uExpire = TickCount::GetNow() + _uAfter;
169+
170+
struct AsyncTaskEntry
171+
: public Timer
172+
, public TaskAwaiter<HRESULT>::RefData
173+
{
174+
HRESULT _hrValue = E_PENDING;
175+
176+
uint32_t __YYAPI AddRef() noexcept override
177+
{
178+
return Timer::AddRef();
179+
}
180+
181+
uint32_t __YYAPI Release() noexcept override
182+
{
183+
return Timer::Release();
184+
}
185+
186+
HRESULT __YYAPI RunTask() override
187+
{
188+
if (IsCanceled())
189+
return YY::Base::HRESULT_From_LSTATUS(ERROR_CANCELLED);
190+
191+
_hrValue = S_OK;
192+
auto _hHandle = (void*)YY::Base::Sync::Exchange(&hCoroutineHandle, /*hReadyHandle*/ (intptr_t)-1);
193+
if (_hHandle)
194+
{
195+
try
196+
{
197+
std::coroutine_handle<>::from_address(_hHandle).resume();
198+
}
199+
catch (const YY::Base::OperationCanceledException& _Exception)
200+
{
201+
_hrValue = YY::Base::HRESULT_From_LSTATUS(ERROR_CANCELLED);
202+
}
203+
}
204+
205+
Wakeup(_hrValue);
206+
return _hrValue;
207+
}
208+
209+
HRESULT __YYAPI Resume() noexcept override
210+
{
211+
return _hrValue;
212+
}
213+
};
214+
215+
auto _pAsyncTaskEntry = RefPtr<AsyncTaskEntry>::Create();
216+
if (!_pAsyncTaskEntry)
217+
throw Exception();
218+
219+
_pAsyncTaskEntry->uExpire = _uExpire;
220+
HRESULT _hr = S_OK;
221+
do
222+
{
223+
auto _pTaskRunner = TaskRunner::GetCurrent();
224+
if (!_pTaskRunner)
225+
{
226+
_hr = E_UNEXPECTED;
227+
break;
228+
}
229+
230+
_hr = _pTaskRunner->SetTimerInternal(_pAsyncTaskEntry);
231+
232+
} while (false);
233+
234+
if (_hr != S_OK)
235+
{
236+
_pAsyncTaskEntry->hCoroutineHandle = (intptr_t)-1;
237+
_pAsyncTaskEntry->_hrValue = _hr;
238+
}
239+
240+
return TaskAwaiter<HRESULT>(std::move(_pAsyncTaskEntry));
241+
}
242+
#endif
243+
244+
#if defined(_HAS_CXX20) && _HAS_CXX20 && defined(_WIN32)
245+
TaskAwaiter<DWORD>__YYAPI TaskRunner::AsyncWaitForObject(HANDLE _hHandle, TimeSpan _iWaitTimeOut)
246+
{
247+
struct AsyncTaskEntry
248+
: public Wait
249+
, public TaskAwaiter<DWORD>::RefData
250+
{
251+
uint32_t __YYAPI AddRef() noexcept override
252+
{
253+
return Wait::AddRef();
254+
}
255+
256+
uint32_t __YYAPI Release() noexcept override
257+
{
258+
return Wait::Release();
259+
}
260+
261+
HRESULT __YYAPI RunTask() override
262+
{
263+
if (IsCanceled())
264+
return YY::Base::HRESULT_From_LSTATUS(ERROR_CANCELLED);
265+
266+
HRESULT _hr = S_OK;
267+
auto _hHandle = (void*)YY::Base::Sync::Exchange(&hCoroutineHandle, /*hReadyHandle*/ (intptr_t)-1);
268+
if (_hHandle)
269+
{
270+
try
271+
{
272+
std::coroutine_handle<>::from_address(_hHandle).resume();
273+
}
274+
catch (const YY::Base::OperationCanceledException& _Exception)
275+
{
276+
_hr = YY::Base::HRESULT_From_LSTATUS(ERROR_CANCELLED);
277+
}
278+
}
279+
280+
Wakeup(_hr);
281+
return _hr;
282+
}
283+
284+
DWORD __YYAPI Resume() noexcept override
285+
{
286+
return uWaitResult;
287+
}
288+
};
289+
290+
auto _pAsyncTaskEntry = RefPtr<AsyncTaskEntry>::Create();
291+
if (!_pAsyncTaskEntry)
292+
throw Exception();
293+
294+
_pAsyncTaskEntry->hHandle = _hHandle;
295+
296+
// >= UINT32_MAX 时认为是无限等待。
297+
if (_iWaitTimeOut >= TimeSpan::FromMilliseconds(UINT32_MAX))
298+
{
299+
_pAsyncTaskEntry->uTimeOut = TickCount::GetMax();
300+
}
301+
else
302+
{
303+
_pAsyncTaskEntry->uTimeOut = TickCount::GetNow() + _iWaitTimeOut;
304+
}
305+
306+
HRESULT _hr = S_OK;
307+
do
308+
{
309+
auto _pTaskRunner = TaskRunner::GetCurrent();
310+
if (!_pTaskRunner)
311+
{
312+
_hr = E_UNEXPECTED;
313+
break;
314+
}
315+
316+
_hr = _pTaskRunner->SetWaitInternal(_pAsyncTaskEntry);
317+
318+
} while (false);
319+
320+
if (_hr != S_OK)
321+
{
322+
_pAsyncTaskEntry->hCoroutineHandle = (intptr_t)-1;
323+
}
324+
325+
return TaskAwaiter<DWORD>(std::move(_pAsyncTaskEntry));
326+
}
327+
#endif
328+
165329
HRESULT __YYAPI TaskRunner::PostDelayTask(TimeSpan _uAfter, std::function<void(void)>&& _pfnTaskCallback)
166330
{
167331
auto _uExpire = TickCount::GetNow() + _uAfter;

0 commit comments

Comments
 (0)