Skip to content

Commit b72767d

Browse files
authored
FutureAwaiter Support Cancellation (#428)
1 parent c9521fc commit b72767d

File tree

4 files changed

+102
-12
lines changed

4 files changed

+102
-12
lines changed

async_simple/coro/FutureAwaiter.h

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717
#define ASYNC_SIMPLE_CORO_FUTURE_AWAITER_H
1818

1919
#ifndef ASYNC_SIMPLE_USE_MODULES
20+
#include <atomic>
21+
#include <exception>
22+
#include <memory>
23+
#include <type_traits>
2024
#include "async_simple/Future.h"
25+
#include "async_simple/Signal.h"
2126
#include "async_simple/coro/Lazy.h"
2227
#include "async_simple/experimental/coroutine.h"
2328

24-
#include <type_traits>
25-
2629
#endif // ASYNC_SIMPLE_USE_MODULES
2730

2831
namespace async_simple {
@@ -31,25 +34,74 @@ namespace coro::detail {
3134
template <typename T>
3235
struct FutureAwaiter {
3336
Future<T> future_;
34-
3537
bool await_ready() { return future_.hasResult(); }
3638

3739
template <typename PromiseType>
38-
void await_suspend(CoroHandle<PromiseType> continuation) {
40+
bool await_suspend_cancellable(CoroHandle<PromiseType> continuation,
41+
Executor* ex, Executor::Context ctx,
42+
Slot* cancellationSlot) {
43+
auto future = std::make_unique<Future<T>>(std::move(future_));
44+
future_ = makeReadyFuture<T>(
45+
std::make_exception_ptr(async_simple::SignalException(
46+
SignalType::Terminate,
47+
"FutureAwaiter is only allowed to be called by Lazy")));
48+
auto state = std::make_shared<std::atomic<bool>>(true);
49+
50+
if (!signalHelper{Terminate}.tryEmplace(
51+
cancellationSlot,
52+
[continuation, ex, ctx, state](auto&&...) mutable {
53+
if (!state->exchange(false)) {
54+
return;
55+
}
56+
if (ex != nullptr) {
57+
ex->checkin(continuation, ctx);
58+
} else {
59+
continuation.resume();
60+
}
61+
})) {
62+
return false;
63+
}
64+
auto future_ptr = future.get();
65+
future_ptr->setContinuation(
66+
[this, continuation, ex, ctx, state = std::move(state),
67+
future = std::move(future)](auto&&...) mutable {
68+
if (!state->exchange(false)) {
69+
return;
70+
}
71+
this->future_ = std::move(*future);
72+
if (ex != nullptr) {
73+
ex->checkin(continuation, ctx);
74+
} else {
75+
continuation.resume();
76+
}
77+
});
78+
return true;
79+
}
80+
81+
template <typename PromiseType>
82+
bool await_suspend(CoroHandle<PromiseType> continuation) {
3983
static_assert(std::is_base_of_v<LazyPromiseBase, PromiseType>,
4084
"FutureAwaiter is only allowed to be called by Lazy");
4185
Executor* ex = continuation.promise()._executor;
4286
Executor::Context ctx = Executor::NULLCTX;
4387
if (ex != nullptr) {
4488
ctx = ex->checkout();
4589
}
90+
if (continuation.promise()._lazy_local) {
91+
if (auto cancellationSlot =
92+
continuation.promise()._lazy_local->getSlot()) {
93+
return await_suspend_cancellable(continuation, ex, ctx,
94+
cancellationSlot);
95+
}
96+
}
4697
future_.setContinuation([continuation, ex, ctx](Try<T>&& t) mutable {
4798
if (ex != nullptr) {
4899
ex->checkin(continuation, ctx);
49100
} else {
50101
continuation.resume();
51102
}
52103
});
104+
return true;
53105
}
54106
auto await_resume() {
55107
if constexpr (!std::is_same_v<T, void>)

async_simple/coro/test/FutureAwaiterTest.cpp

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717
#include <thread>
1818

1919
#include "async_simple/Promise.h"
20+
#include "async_simple/Signal.h"
21+
#include "async_simple/coro/Collect.h"
2022
#include "async_simple/coro/FutureAwaiter.h"
2123
#include "async_simple/coro/Lazy.h"
24+
#include "async_simple/coro/Sleep.h"
2225
#include "async_simple/coro/SyncAwait.h"
2326
#include "async_simple/executors/SimpleExecutor.h"
2427
#include "async_simple/test/unittest.h"
@@ -76,6 +79,43 @@ TEST_F(FutureAwaiterTest, testWithFuture) {
7679
syncAwait(lazy3().via(&ex2));
7780
}
7881

82+
TEST_F(FutureAwaiterTest, testWithFutureCancel) {
83+
async_simple::executors::SimpleExecutor ex1(2);
84+
auto lazy = [&]() -> Lazy<> {
85+
Promise<int> pr;
86+
auto fut = pr.getFuture();
87+
sum(1, 1, [pr = std::move(pr)](int val) mutable {
88+
std::this_thread::sleep_for(std::chrono::seconds::max());
89+
pr.setValue(val);
90+
});
91+
async_simple::SignalType type = None;
92+
try {
93+
co_await std::move(fut);
94+
} catch (const async_simple::SignalException& e) {
95+
type = e.value();
96+
} catch (...) {
97+
}
98+
EXPECT_EQ(type, async_simple::Terminate);
99+
};
100+
syncAwait(collectAll<async_simple::Terminate>(
101+
lazy().via(&ex1),
102+
async_simple::coro::sleep(std::chrono::microseconds{10}).via(&ex1)));
103+
auto lazy2 = [&]() -> Lazy<> {
104+
Promise<int> pr;
105+
auto fut = pr.getFuture();
106+
sum(1, 1, [pr = std::move(pr)](int val) mutable { pr.setValue(val); });
107+
int ret = 0;
108+
try {
109+
ret = co_await std::move(fut);
110+
} catch (...) {
111+
}
112+
EXPECT_EQ(ret, 2);
113+
};
114+
syncAwait(collectAll<async_simple::Terminate>(
115+
lazy2().via(&ex1),
116+
async_simple::coro::sleep(std::chrono::seconds::max()).via(&ex1)));
117+
}
118+
79119
namespace detail {
80120

81121
static_assert(HasGlobalCoAwaitOperator<Future<int>>);

docs/docs.cn/信号与任务的取消.md

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -257,22 +257,21 @@ syncAwait(Task().setLazyLocal(signal.get()).via(ex));
257257
258258
### 支持取消操作与信号转发的对象与函数
259259
260-
除了手动判断取消信号是否被触发,async-simple许多可能挂起的函数都为取消操作提供了支持。此外,`collect*`函数支持将外部收到的信号转发给由`collect*`函数启动的协程。
260+
除了手动判断取消信号是否被触发,async-simple许多可能挂起的函数都支持取消操作。此外,`collect*`函数支持将外部收到的信号转发给由`collect*`函数启动的协程。
261261
262-
以下各对象与函数支持取消操作,响应信号的协程可能抛出异常`async_simple::SignalException`,调用`value()`应返回信号类型(对于取消操作为`async_simple::terminate`)。
263-
此外,下列IO函数在挂起/恢复协程时都会自动插入两个检查点判断任务是否被取消。
262+
以下各函数支持取消操作,响应信号的协程可能抛出异常`async_simple::SignalException`,调用该异常的`value()`会返回信号类型(通常为`async_simple::terminate`)。此外,这些函数在挂起/恢复协程时都会自动插入两个检查点判断任务是否被取消。
264263
265264
1. CollectAny:CollectAny会将信号转发给所有子任务,如果收到取消信号,会抛出异常立即返回。
266265
2. CollectAll:CollectAny会将信号转发给所有子任务,即使收到取消信号,自身依然会等待所有子任务执行完毕后正常返回。
267-
3. Yield/SpinLock:如果被取消,会抛出异常。目前暂不支持取消在调度器中排队的任务。
266+
3. Yield/SpinLock上锁:如果被取消,会抛出异常。目前暂不支持取消在调度器中排队的任务。
267+
4. Future:返回值Try<T>中将包含异常。
268268
4. Sleep: 依赖于调度器是否重写了虚函数`void schedule(Func func, Duration dur, uint64_t schedule_info, Slot *slot = nullptr)`,并正确实现了取消功能。如果未重写该函数,默认实现支持取消Sleep。
269269
270270
以下IO对象与函数暂未支持取消操作,有待后续完善。
271271
1. Mutex
272272
2. ConditionVariable
273273
3. SharedMutex
274274
4. Latch
275-
5. Promise/Future
276275
6. CountingSemaphore
277276
278277
### 自定义Awaiter如何支持取消

docs/docs.en/SignalAndCancellation.md

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -255,20 +255,19 @@ syncAwait(Task().setLazyLocal(signal.get()).via(ex));
255255
256256
In addition to manually checking for triggered cancellation signals, many potentially suspendable functions in `async-simple` support cancellation operations. Additionally, `collect*` functions support forwarding signals received externally to coroutines initiated by `collect*`.
257257
258-
The following objects and functions support cancellation operations. Coroutines responding to signals may throw the `async_simple::SignalException` exception, and calling `value()` should return the signal type (for cancellation operations, this is `async_simple::terminate`).
259-
Also, the following IO functions will automatically insert two checkpoints to check if the task has been canceled during the suspension/resumption of the coroutine.
258+
The following objects and functions support cancellation operations. Coroutines responding to signals may throw the `async_simple::SignalException` exception, and calling `value()` should return the signal type (for cancellation operations, this is `async_simple::terminate`). Also, they will automatically insert two checkpoints to check if the task has been canceled during the suspension/resumption of the coroutine.
260259
261260
1. `CollectAny`: Forwards signals to all subtasks. If a cancellation signal is received, an exception is thrown and it returns immediately.
262261
2. `CollectAll`: Forwards signals to all subtasks. Even if a cancellation signal is received, it waits for all subtasks to complete before normally returning.
263262
3. `Yield/SpinLock`: Throws an exception if canceled. Currently, canceling tasks queued in the scheduler is not supported.
263+
3. `Future`: co_await a future could be canceled. It will return `Try<T>` which contains a exception.
264264
4. `Sleep`: Depends on whether the scheduler overrides the virtual function `void schedule(Func func, Duration dur, uint64_t schedule_info, Slot *slot = nullptr)` and correctly implements the cancellation functionality. If not overridden, the default implementation supports canceling Sleep.
265265
266266
The following IO objects and functions do not yet support cancellation operations and await further improvements:
267267
1. `Mutex`
268268
2. `ConditionVariable`
269269
3. `SharedMutex`
270270
4. `Latch`
271-
5. `Promise/Future`
272271
6. `CountingSemaphore`
273272
274273
### Supporting cancellation in custom Awaiters

0 commit comments

Comments
 (0)