Skip to content

Commit 822f5bf

Browse files
Fea, 为所有协程添加CancellationToken参数,避免再对象释放后,继续恢复
1 parent 7684b7f commit 822f5bf

File tree

6 files changed

+168
-13
lines changed

6 files changed

+168
-13
lines changed

include/YY/Base/IO/File.h

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -249,16 +249,19 @@ namespace YY
249249
/// <param name="_uOffset">读取文件的偏移。</param>
250250
/// <param name="_pBuffer">输入缓冲区。请确保读取期间,_pBuffer处于有效状态。</param>
251251
/// <param name="_cbBufferToRead">要读取的最大字节数。</param>
252+
/// <param name="_pCancellationToken">可选的取消令牌,用于取消异步操作。</param>
252253
/// <returns>返回实际读取的字节数。
253254
/// 如果实际读取字节数为 0,那么请额外检查 GetLastError()。</returns>
254255
TaskAwaiter<uint32_t> __YYAPI AsyncRead(
255256
_In_ uint64_t _uOffset,
256257
_Out_writes_bytes_(_cbBufferToRead) void* _pBuffer,
257-
_In_ uint32_t _cbBufferToRead) noexcept
258+
_In_ uint32_t _cbBufferToRead,
259+
_In_opt_ RefPtr<CancellationToken> _pCancellationToken = nullptr) noexcept
258260
{
259261
auto _pAsyncTaskEntry = RefPtr<CoroutineReadWriteTask>::Create();
260262
if (_pAsyncTaskEntry)
261263
{
264+
_pAsyncTaskEntry->pCancellationToken = std::move(_pCancellationToken);
262265
auto _lStatus = AsyncReadInternal(_uOffset, _pBuffer, _cbBufferToRead, _pAsyncTaskEntry);
263266
if (_lStatus != ERROR_SUCCESS)
264267
{
@@ -282,17 +285,20 @@ namespace YY
282285
/// <param name="_uOffset">写入文件的偏移。</param>
283286
/// <param name="_pBuffer">需要写入的数据缓冲区。</param>
284287
/// <param name="_cbBufferToWrite">要写入的字节数</param>
288+
/// <param name="_pCancellationToken">可选的取消令牌,用于取消异步操作。</param>
285289
/// <returns>返回实际写入的字节数。
286290
/// 如果实际写入字节数为 0,那么请额外检查 GetLastError()。
287291
/// </returns>
288292
TaskAwaiter<uint32_t> __YYAPI AsyncWrite(
289293
_In_ uint64_t _uOffset,
290294
_In_reads_bytes_(_cbBufferToWrite) const void* _pBuffer,
291-
_In_ uint32_t _cbBufferToWrite) noexcept
295+
_In_ uint32_t _cbBufferToWrite,
296+
_In_opt_ RefPtr<CancellationToken> _pCancellationToken = nullptr) noexcept
292297
{
293298
auto _pAsyncTaskEntry = RefPtr<CoroutineReadWriteTask>::Create();
294299
if (_pAsyncTaskEntry)
295300
{
301+
_pAsyncTaskEntry->pCancellationToken = std::move(_pCancellationToken);
296302
auto _lStatus = AsyncWriteInternal(_uOffset, _pBuffer, _cbBufferToWrite, _pAsyncTaskEntry);
297303
if (_lStatus != ERROR_SUCCESS)
298304
{
@@ -504,7 +510,12 @@ namespace YY
504510
}
505511

506512
#if defined(_HAS_CXX20) && _HAS_CXX20
507-
TaskAwaiter<LSTATUS> AsyncConnect()
513+
/// <summary>
514+
/// 异步链接管道。
515+
/// </summary>
516+
/// <param name="_pCancellationToken">可选的取消令牌,用于取消异步操作。。</param>
517+
/// <returns>协程等待器,用于接受操作结果。</returns>
518+
TaskAwaiter<LSTATUS> AsyncConnect(_In_opt_ RefPtr<CancellationToken> _pCancellationToken = nullptr)
508519
{
509520
struct CoroutineConnectTask
510521
: public IoTaskEntry
@@ -556,6 +567,7 @@ namespace YY
556567
auto _pConnectTask = RefPtr<CoroutineConnectTask>::Create();
557568
if (_pConnectTask)
558569
{
570+
_pConnectTask->pCancellationToken = std::move(_pCancellationToken);
559571
auto _lStatus = AsyncConnectIntetnal(_pConnectTask);
560572
if (_lStatus != ERROR_SUCCESS)
561573
{
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
#pragma once
2+
#include <YY/Base/YY.h>
3+
#include <YY/Base/Memory/RefPtr.h>
4+
#include <YY/Base/Memory/WeakPtr.h>
5+
#include <YY/Base/Memory/ObserverPtr.h>
6+
#include <YY/Base/Exception.h>
7+
#include <memory>
8+
9+
#pragma pack(push, __YY_PACKING)
10+
11+
namespace YY
12+
{
13+
namespace Base
14+
{
15+
namespace Threading
16+
{
17+
class CancellationToken : public RefValue
18+
{
19+
public:
20+
/// <summary>
21+
/// 指示是否已请求取消操作。
22+
/// </summary>
23+
/// <returns>如果已请求取消则返回 true,否则返回 false。</returns>
24+
virtual bool __YYAPI IsCancellationRequested() const = 0;
25+
26+
void __YYAPI ThrowIfCancellationRequested() const
27+
{
28+
if (IsCancellationRequested())
29+
{
30+
throw OperationCanceledException();
31+
}
32+
}
33+
34+
template<typename T>
35+
static RefPtr<CancellationToken> __YYAPI FormPtr(ObserverPtr<T> _pObserverPtr)
36+
{
37+
class CancellationTokenImpl : public CancellationToken
38+
{
39+
private:
40+
ObserverPtr<T> pObserverPtr;
41+
42+
public:
43+
CancellationTokenImpl(ObserverPtr<T> _pObserverPtr) noexcept
44+
: pObserverPtr(std::move(_pObserverPtr))
45+
{
46+
}
47+
48+
bool __YYAPI IsCancellationRequested() const override
49+
{
50+
return pObserverPtr.IsExpired();
51+
}
52+
};
53+
54+
return RefPtr<CancellationToken>(new CancellationTokenImpl(std::move(_pObserverPtr)));
55+
}
56+
57+
template<typename T>
58+
static RefPtr<CancellationToken> __YYAPI FormPtr(WeakPtr<T> _pWeakPtr)
59+
{
60+
class CancellationTokenImpl : public CancellationToken
61+
{
62+
private:
63+
WeakPtr<T> pWeakPtr;
64+
65+
public:
66+
CancellationTokenImpl(WeakPtr<T> _pWeakPtr) noexcept
67+
: pWeakPtr(std::move(_pWeakPtr))
68+
{
69+
}
70+
71+
bool __YYAPI IsCancellationRequested() const override
72+
{
73+
return pWeakPtr.IsExpired();
74+
}
75+
};
76+
77+
return RefPtr<CancellationToken>(new CancellationTokenImpl(std::move(_pWeakPtr)));
78+
}
79+
80+
template<typename T>
81+
static RefPtr<CancellationToken> __YYAPI FormPtr(std::weak_ptr<T> _pWeakPtr)
82+
{
83+
class CancellationTokenImpl : public CancellationToken
84+
{
85+
private:
86+
std::weak_ptr<T> pWeakPtr;
87+
88+
public:
89+
CancellationTokenImpl(std::weak_ptr<T> pWeakPtr) noexcept
90+
: pWeakPtr(std::move(_pWeakPtr))
91+
{
92+
}
93+
94+
bool __YYAPI IsCancellationRequested() const override
95+
{
96+
return pWeakPtr.expired();
97+
}
98+
};
99+
100+
return RefPtr<CancellationToken>(new CancellationTokenImpl(std::move(_pWeakPtr)));
101+
}
102+
};
103+
}
104+
}
105+
106+
using namespace Base::Threading;
107+
}
108+
109+
#pragma pack(pop)

include/YY/Base/Threading/TaskRunner.h

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include <YY/Base/Time/TimeSpan.h>
1313
#include <YY/Base/Threading/Coroutine.h>
1414
#include <YY/Base/Strings/String.h>
15+
#include <YY/Base/Threading/CancellationToken.h>
1516

1617
#pragma pack(push, __YY_PACKING)
1718

@@ -58,6 +59,9 @@ namespace YY
5859
// 这个任务完成后重新回到的 TaskRunner
5960
WeakPtr<TaskRunner> pResumeTaskRunnerWeak;
6061

62+
// 取消令牌
63+
RefPtr<CancellationToken> pCancellationToken;
64+
6165
TaskEntry() = default;
6266

6367
TaskEntry(const TaskEntry&) = delete;
@@ -96,7 +100,7 @@ namespace YY
96100

97101
bool __YYAPI IsCanceled() const noexcept
98102
{
99-
return HasFlags(fStyle, TaskEntryStyle::Canceled);
103+
return HasFlags(fStyle, TaskEntryStyle::Canceled) || (pCancellationToken && pCancellationToken->IsCancellationRequested());
100104
}
101105

102106
virtual bool __YYAPI Cancel();
@@ -222,8 +226,20 @@ namespace YY
222226
/// 返回一个用于异步睡眠的等待器,该等待器在指定时间间隔后完成并提供 HRESULT 结果。
223227
/// </summary>
224228
/// <param name="_uAfter">要等待的时间间隔(TimeSpan),表示异步睡眠的持续时间。</param>
225-
/// <returns>一个 TaskAwaiter<HRESULT>,可用于等待异步睡眠操作完成并获取表示操作结果的 HRESULT。</returns>
226-
static TaskAwaiter<HRESULT> __YYAPI AsyncSleep(_In_ TimeSpan _uAfter);
229+
/// <param name="_pCancellationToken">可选的取消令牌,用于取消异步操作。</param>
230+
/// <returns>一个协程Awaiter对象,可用于等待异步睡眠操作完成并获取表示操作结果的 HRESULT。</returns>
231+
static TaskAwaiter<HRESULT> __YYAPI AsyncSleep(_In_ TimeSpan _uAfter, _In_opt_ RefPtr<CancellationToken> _pCancellationToken = nullptr);
232+
233+
/// <summary>
234+
/// 返回一个用于异步睡眠的等待器,该等待器在指定时间间隔后完成并提供 HRESULT 结果。
235+
/// </summary>
236+
/// <param name="_uAfter">要等待的时间间隔(TimeSpan),表示异步睡眠的持续时间。</param>
237+
/// <param name="_pCancellationToken">可选的取消令牌,用于取消异步操作。</param>
238+
/// <returns>一个协程Awaiter对象,可用于等待异步睡眠操作完成并获取表示操作结果的 HRESULT。</returns>
239+
static TaskAwaiter<HRESULT> __YYAPI Delay(_In_ TimeSpan _uAfter, _In_opt_ RefPtr<CancellationToken> _pCancellationToken = nullptr)
240+
{
241+
return AsyncSleep(_uAfter, std::move(_pCancellationToken));
242+
}
227243
#endif
228244

229245
#if defined(_HAS_CXX20) && _HAS_CXX20 && defined(_WIN32)
@@ -232,13 +248,14 @@ namespace YY
232248
/// </summary>
233249
/// <param name="_hHandle">要等待的对象句柄。</param>
234250
/// <param name="_iWaitTimeOut">等待超时时间;可选,默认为 TimeSpan::GetMax()(表示最大/无限超时)。</param>
251+
/// <param name="_pCancellationToken">可选的取消令牌,用于取消异步操作。</param>
235252
/// <returns>返回一个 TaskAwaiter<DWORD>,用于异步等待并在完成后获取等待结果,其结果可能值为:
236253
/// * WAIT_ABANDONED
237254
/// * WAIT_OBJECT_0
238255
/// * WAIT_TIMEOUT
239256
/// * WAIT_FAILED
240257
/// </returns>
241-
static TaskAwaiter<DWORD> __YYAPI AsyncWaitForObject(_In_ HANDLE _hHandle, _In_ TimeSpan _iWaitTimeOut = TimeSpan::GetMax());
258+
static TaskAwaiter<DWORD> __YYAPI AsyncWaitForObject(_In_ HANDLE _hHandle, _In_ TimeSpan _iWaitTimeOut = TimeSpan::GetMax(), _In_opt_ RefPtr<CancellationToken> _pCancellationToken = nullptr);
242259
#endif
243260

244261
/// <summary>
@@ -263,17 +280,26 @@ namespace YY
263280
/// </summary>
264281
/// <param name="_uAfter">需要延迟执行的时间</param>
265282
/// <param name="_pfnLambdaCallback">需要异步执行的 Lambda 表达式</param>
283+
/// <param name="_pCancellationToken">可选的取消令牌,用于取消异步操作。</param>
266284
/// <returns>TaskAwaiter<void></returns>
267285
TaskAwaiter<void> __YYAPI AsyncDelayTask(
268286
_In_ TimeSpan _uAfter,
269-
_In_ std::function<void(void)>&& _pfnTaskCallback);
287+
_In_ std::function<void(void)>&& _pfnTaskCallback,
288+
_In_opt_ RefPtr<CancellationToken> _pCancellationToken = nullptr);
270289
#endif
271290

272291
#if defined(_HAS_CXX20) && _HAS_CXX20
292+
/// <summary>
293+
/// 创建一个异步可 co_await 任务。
294+
/// </summary>
295+
/// <param name="_pfnTaskCallback">要执行的回调函数。以右值引用传入,函数对象会被移动到任务内部并在异步上下文中调用。</param>
296+
/// <param name="_pCancellationToken">可选的取消令牌指针;用于在任务执行前或执行中请求取消。默认值为 nullptr,表示不使用取消。</param>
297+
/// <returns>返回一个协程Awaiter对象。</returns>
273298
TaskAwaiter<void> __YYAPI AsyncTask(
274-
_In_ std::function<void(void)>&& _pfnTaskCallback)
299+
_In_ std::function<void(void)>&& _pfnTaskCallback,
300+
_In_opt_ RefPtr<CancellationToken> _pCancellationToken = nullptr)
275301
{
276-
return AsyncDelayTask(TimeSpan(), std::move(_pfnTaskCallback));
302+
return AsyncDelayTask(TimeSpan(), std::move(_pfnTaskCallback), std::move(_pCancellationToken));
277303
}
278304
#endif
279305

src/YY.Base.vcxitems

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
</ClCompile>
6262
</ItemGroup>
6363
<ItemGroup>
64+
<ClInclude Include="$(MSBuildThisFileDirectory)..\include\YY\Base\Threading\CancellationToken.h" />
6465
<ClInclude Include="$(MSBuildThisFileDirectory)..\include\YY\Base\Time\TimeZone.h" />
6566
<ClInclude Include="$(MSBuildThisFileDirectory)..\include\YY\Base\Utils\AutoCleanup.h" />
6667
<ClInclude Include="$(MSBuildThisFileDirectory)..\include\YY\Base\Utils\Handle.h" />

src/YY.Base.vcxitems.filters

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -305,6 +305,9 @@
305305
<ClInclude Include="$(MSBuildThisFileDirectory)..\include\YY\Base\Utils\Handle.h">
306306
<Filter>头文件\YY\Base\Utils</Filter>
307307
</ClInclude>
308+
<ClInclude Include="$(MSBuildThisFileDirectory)..\include\YY\Base\Threading\CancellationToken.h">
309+
<Filter>头文件\YY\Base\Threading</Filter>
310+
</ClInclude>
308311
</ItemGroup>
309312
<ItemGroup>
310313
<Natvis Include="$(MSBuildThisFileDirectory)\YY.Base.natvis">

src/YY/Base/Threading/TaskRunner.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ namespace YY
163163
}
164164

165165
#if defined(_HAS_CXX20) && _HAS_CXX20
166-
TaskAwaiter<HRESULT>__YYAPI TaskRunner::AsyncSleep(TimeSpan _uAfter)
166+
TaskAwaiter<HRESULT>__YYAPI TaskRunner::AsyncSleep(TimeSpan _uAfter, RefPtr<CancellationToken> _pCancellationToken)
167167
{
168168
const auto _uExpire = TickCount::GetNow() + _uAfter;
169169

@@ -217,6 +217,8 @@ namespace YY
217217
throw Exception();
218218

219219
_pAsyncTaskEntry->uExpire = _uExpire;
220+
_pAsyncTaskEntry->pCancellationToken = std::move(_pCancellationToken);
221+
220222
HRESULT _hr = S_OK;
221223
do
222224
{
@@ -242,7 +244,7 @@ namespace YY
242244
#endif
243245

244246
#if defined(_HAS_CXX20) && _HAS_CXX20 && defined(_WIN32)
245-
TaskAwaiter<DWORD>__YYAPI TaskRunner::AsyncWaitForObject(HANDLE _hHandle, TimeSpan _iWaitTimeOut)
247+
TaskAwaiter<DWORD>__YYAPI TaskRunner::AsyncWaitForObject(HANDLE _hHandle, TimeSpan _iWaitTimeOut, RefPtr<CancellationToken> _pCancellationToken)
246248
{
247249
struct AsyncTaskEntry
248250
: public Wait
@@ -292,6 +294,7 @@ namespace YY
292294
throw Exception();
293295

294296
_pAsyncTaskEntry->hHandle = _hHandle;
297+
_pAsyncTaskEntry->pCancellationToken = std::move(_pCancellationToken);
295298

296299
// >= UINT32_MAX 时认为是无限等待。
297300
if (_iWaitTimeOut >= TimeSpan::FromMilliseconds(UINT32_MAX))
@@ -363,7 +366,7 @@ namespace YY
363366
}
364367

365368
#if defined(_HAS_CXX20) && _HAS_CXX20
366-
TaskAwaiter<void> __YYAPI TaskRunner::AsyncDelayTask(TimeSpan _uAfter, std::function<void(void)>&& _pfnTaskCallback)
369+
TaskAwaiter<void> __YYAPI TaskRunner::AsyncDelayTask(TimeSpan _uAfter, std::function<void(void)>&& _pfnTaskCallback, RefPtr<CancellationToken> _pCancellationToken)
367370
{
368371
struct AsyncTaskEntry
369372
: public Timer
@@ -423,6 +426,7 @@ namespace YY
423426
throw Exception();
424427

425428
_pAsyncTaskEntry->pfnTaskCallback = std::move(_pfnTaskCallback);
429+
_pAsyncTaskEntry->pCancellationToken = std::move(_pCancellationToken);
426430
_pAsyncTaskEntry->pResumeTaskRunnerWeak = _pCurrent;
427431

428432
HRESULT _hr;

0 commit comments

Comments
 (0)