Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 56 additions & 4 deletions async_simple/coro/FutureAwaiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
#define ASYNC_SIMPLE_CORO_FUTURE_AWAITER_H

#ifndef ASYNC_SIMPLE_USE_MODULES
#include <atomic>
#include <exception>
#include <memory>
#include <type_traits>
#include "async_simple/Future.h"
#include "async_simple/Signal.h"
#include "async_simple/coro/Lazy.h"
#include "async_simple/experimental/coroutine.h"

#include <type_traits>

#endif // ASYNC_SIMPLE_USE_MODULES

namespace async_simple {
Expand All @@ -31,25 +34,74 @@ namespace coro::detail {
template <typename T>
struct FutureAwaiter {
Future<T> future_;

bool await_ready() { return future_.hasResult(); }

template <typename PromiseType>
void await_suspend(CoroHandle<PromiseType> continuation) {
bool await_suspend_cancellable(CoroHandle<PromiseType> continuation,
Executor* ex, Executor::Context ctx,
Slot* cancellationSlot) {
auto future = std::make_unique<Future<T>>(std::move(future_));
future_ = makeReadyFuture<T>(
std::make_exception_ptr(async_simple::SignalException(
SignalType::Terminate,
"FutureAwaiter is only allowed to be called by Lazy")));
auto state = std::make_shared<std::atomic<bool>>(true);

if (!signalHelper{Terminate}.tryEmplace(
cancellationSlot,
[continuation, ex, ctx, state](auto&&...) mutable {
if (!state->exchange(false)) {
return;
}
if (ex != nullptr) {
ex->checkin(continuation, ctx);
} else {
continuation.resume();
}
})) {
return false;
}
auto future_ptr = future.get();
future_ptr->setContinuation(
[this, continuation, ex, ctx, state = std::move(state),
future = std::move(future)](auto&&...) mutable {
if (!state->exchange(false)) {
return;
}
this->future_ = std::move(*future);
if (ex != nullptr) {
ex->checkin(continuation, ctx);
} else {
continuation.resume();
}
});
return true;
}

template <typename PromiseType>
bool await_suspend(CoroHandle<PromiseType> continuation) {
static_assert(std::is_base_of_v<LazyPromiseBase, PromiseType>,
"FutureAwaiter is only allowed to be called by Lazy");
Executor* ex = continuation.promise()._executor;
Executor::Context ctx = Executor::NULLCTX;
if (ex != nullptr) {
ctx = ex->checkout();
}
if (continuation.promise()._lazy_local) {
if (auto cancellationSlot =
continuation.promise()._lazy_local->getSlot()) {
return await_suspend_cancellable(continuation, ex, ctx,
cancellationSlot);
}
}
future_.setContinuation([continuation, ex, ctx](Try<T>&& t) mutable {
if (ex != nullptr) {
ex->checkin(continuation, ctx);
} else {
continuation.resume();
}
});
return true;
}
auto await_resume() {
if constexpr (!std::is_same_v<T, void>)
Expand Down
40 changes: 40 additions & 0 deletions async_simple/coro/test/FutureAwaiterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
#include <thread>

#include "async_simple/Promise.h"
#include "async_simple/Signal.h"
#include "async_simple/coro/Collect.h"
#include "async_simple/coro/FutureAwaiter.h"
#include "async_simple/coro/Lazy.h"
#include "async_simple/coro/Sleep.h"
#include "async_simple/coro/SyncAwait.h"
#include "async_simple/executors/SimpleExecutor.h"
#include "async_simple/test/unittest.h"
Expand Down Expand Up @@ -76,6 +79,43 @@ TEST_F(FutureAwaiterTest, testWithFuture) {
syncAwait(lazy3().via(&ex2));
}

TEST_F(FutureAwaiterTest, testWithFutureCancel) {
async_simple::executors::SimpleExecutor ex1(2);
auto lazy = [&]() -> Lazy<> {
Promise<int> pr;
auto fut = pr.getFuture();
sum(1, 1, [pr = std::move(pr)](int val) mutable {
std::this_thread::sleep_for(std::chrono::seconds::max());
pr.setValue(val);
});
async_simple::SignalType type = None;
try {
co_await std::move(fut);
} catch (const async_simple::SignalException& e) {
type = e.value();
} catch (...) {
}
EXPECT_EQ(type, async_simple::Terminate);
};
syncAwait(collectAll<async_simple::Terminate>(
lazy().via(&ex1),
async_simple::coro::sleep(std::chrono::microseconds{10}).via(&ex1)));
auto lazy2 = [&]() -> Lazy<> {
Promise<int> pr;
auto fut = pr.getFuture();
sum(1, 1, [pr = std::move(pr)](int val) mutable { pr.setValue(val); });
int ret = 0;
try {
ret = co_await std::move(fut);
} catch (...) {
}
EXPECT_EQ(ret, 2);
};
syncAwait(collectAll<async_simple::Terminate>(
lazy2().via(&ex1),
async_simple::coro::sleep(std::chrono::seconds::max()).via(&ex1)));
}

namespace detail {

static_assert(HasGlobalCoAwaitOperator<Future<int>>);
Expand Down
9 changes: 4 additions & 5 deletions docs/docs.cn/信号与任务的取消.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,22 +257,21 @@ syncAwait(Task().setLazyLocal(signal.get()).via(ex));

### 支持取消操作与信号转发的对象与函数

除了手动判断取消信号是否被触发,async-simple许多可能挂起的函数都为取消操作提供了支持。此外,`collect*`函数支持将外部收到的信号转发给由`collect*`函数启动的协程。
除了手动判断取消信号是否被触发,async-simple许多可能挂起的函数都支持取消操作。此外,`collect*`函数支持将外部收到的信号转发给由`collect*`函数启动的协程。

以下各对象与函数支持取消操作,响应信号的协程可能抛出异常`async_simple::SignalException`,调用`value()`应返回信号类型(对于取消操作为`async_simple::terminate`)。
此外,下列IO函数在挂起/恢复协程时都会自动插入两个检查点判断任务是否被取消。
以下各函数支持取消操作,响应信号的协程可能抛出异常`async_simple::SignalException`,调用该异常的`value()`会返回信号类型(通常为`async_simple::terminate`)。此外,这些函数在挂起/恢复协程时都会自动插入两个检查点判断任务是否被取消。

1. CollectAny:CollectAny会将信号转发给所有子任务,如果收到取消信号,会抛出异常立即返回。
2. CollectAll:CollectAny会将信号转发给所有子任务,即使收到取消信号,自身依然会等待所有子任务执行完毕后正常返回。
3. Yield/SpinLock:如果被取消,会抛出异常。目前暂不支持取消在调度器中排队的任务。
3. Yield/SpinLock上锁:如果被取消,会抛出异常。目前暂不支持取消在调度器中排队的任务。
4. Future:返回值Try<T>中将包含异常。
4. Sleep: 依赖于调度器是否重写了虚函数`void schedule(Func func, Duration dur, uint64_t schedule_info, Slot *slot = nullptr)`,并正确实现了取消功能。如果未重写该函数,默认实现支持取消Sleep。

以下IO对象与函数暂未支持取消操作,有待后续完善。
1. Mutex
2. ConditionVariable
3. SharedMutex
4. Latch
5. Promise/Future
6. CountingSemaphore

### 自定义Awaiter如何支持取消
Expand Down
5 changes: 2 additions & 3 deletions docs/docs.en/SignalAndCancellation.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,20 +255,19 @@ syncAwait(Task().setLazyLocal(signal.get()).via(ex));

In addition to manually checking for triggered cancellation signals, many potentially suspendable functions in `async-simple` support cancellation operations. Additionally, `collect*` functions support forwarding signals received externally to coroutines initiated by `collect*`.

The following objects and functions support cancellation operations. Coroutines responding to signals may throw the `async_simple::SignalException` exception, and calling `value()` should return the signal type (for cancellation operations, this is `async_simple::terminate`).
Also, the following IO functions will automatically insert two checkpoints to check if the task has been canceled during the suspension/resumption of the coroutine.
The following objects and functions support cancellation operations. Coroutines responding to signals may throw the `async_simple::SignalException` exception, and calling `value()` should return the signal type (for cancellation operations, this is `async_simple::terminate`). Also, they will automatically insert two checkpoints to check if the task has been canceled during the suspension/resumption of the coroutine.

1. `CollectAny`: Forwards signals to all subtasks. If a cancellation signal is received, an exception is thrown and it returns immediately.
2. `CollectAll`: Forwards signals to all subtasks. Even if a cancellation signal is received, it waits for all subtasks to complete before normally returning.
3. `Yield/SpinLock`: Throws an exception if canceled. Currently, canceling tasks queued in the scheduler is not supported.
3. `Future`: co_await a future could be canceled. It will return `Try<T>` which contains a exception.
4. `Sleep`: Depends on whether the scheduler overrides the virtual function `void schedule(Func func, Duration dur, uint64_t schedule_info, Slot *slot = nullptr)` and correctly implements the cancellation functionality. If not overridden, the default implementation supports canceling Sleep.

The following IO objects and functions do not yet support cancellation operations and await further improvements:
1. `Mutex`
2. `ConditionVariable`
3. `SharedMutex`
4. `Latch`
5. `Promise/Future`
6. `CountingSemaphore`

### Supporting cancellation in custom Awaiters
Expand Down
Loading