From 7309449b42669256988cdb60443c25d664ce2fb0 Mon Sep 17 00:00:00 2001 From: saipubw Date: Wed, 11 Jun 2025 18:56:09 +0800 Subject: [PATCH] FutureAwaiter Support Cancellation --- async_simple/coro/FutureAwaiter.h | 60 +++++++++++++++++-- async_simple/coro/test/FutureAwaiterTest.cpp | 40 +++++++++++++ ...41\347\232\204\345\217\226\346\266\210.md" | 9 ++- docs/docs.en/SignalAndCancellation.md | 5 +- 4 files changed, 102 insertions(+), 12 deletions(-) diff --git a/async_simple/coro/FutureAwaiter.h b/async_simple/coro/FutureAwaiter.h index b8ac96dab..d17dde2f7 100644 --- a/async_simple/coro/FutureAwaiter.h +++ b/async_simple/coro/FutureAwaiter.h @@ -17,12 +17,15 @@ #define ASYNC_SIMPLE_CORO_FUTURE_AWAITER_H #ifndef ASYNC_SIMPLE_USE_MODULES +#include +#include +#include +#include #include "async_simple/Future.h" +#include "async_simple/Signal.h" #include "async_simple/coro/Lazy.h" #include "async_simple/experimental/coroutine.h" -#include - #endif // ASYNC_SIMPLE_USE_MODULES namespace async_simple { @@ -31,11 +34,52 @@ namespace coro::detail { template struct FutureAwaiter { Future future_; - bool await_ready() { return future_.hasResult(); } template - void await_suspend(CoroHandle continuation) { + bool await_suspend_cancellable(CoroHandle continuation, + Executor* ex, Executor::Context ctx, + Slot* cancellationSlot) { + auto future = std::make_unique>(std::move(future_)); + future_ = makeReadyFuture( + std::make_exception_ptr(async_simple::SignalException( + SignalType::Terminate, + "FutureAwaiter is only allowed to be called by Lazy"))); + auto state = std::make_shared>(true); + + if (!signalHelper{Terminate}.tryEmplace( + cancellationSlot, + [continuation, ex, ctx, state](auto&&...) mutable { + if (!state->exchange(false)) { + return; + } + if (ex != nullptr) { + ex->checkin(continuation, ctx); + } else { + continuation.resume(); + } + })) { + return false; + } + auto future_ptr = future.get(); + future_ptr->setContinuation( + [this, continuation, ex, ctx, state = std::move(state), + future = std::move(future)](auto&&...) mutable { + if (!state->exchange(false)) { + return; + } + this->future_ = std::move(*future); + if (ex != nullptr) { + ex->checkin(continuation, ctx); + } else { + continuation.resume(); + } + }); + return true; + } + + template + bool await_suspend(CoroHandle continuation) { static_assert(std::is_base_of_v, "FutureAwaiter is only allowed to be called by Lazy"); Executor* ex = continuation.promise()._executor; @@ -43,6 +87,13 @@ struct FutureAwaiter { if (ex != nullptr) { ctx = ex->checkout(); } + if (continuation.promise()._lazy_local) { + if (auto cancellationSlot = + continuation.promise()._lazy_local->getSlot()) { + return await_suspend_cancellable(continuation, ex, ctx, + cancellationSlot); + } + } future_.setContinuation([continuation, ex, ctx](Try&& t) mutable { if (ex != nullptr) { ex->checkin(continuation, ctx); @@ -50,6 +101,7 @@ struct FutureAwaiter { continuation.resume(); } }); + return true; } auto await_resume() { if constexpr (!std::is_same_v) diff --git a/async_simple/coro/test/FutureAwaiterTest.cpp b/async_simple/coro/test/FutureAwaiterTest.cpp index 1dcf71dd1..0c0bed7af 100644 --- a/async_simple/coro/test/FutureAwaiterTest.cpp +++ b/async_simple/coro/test/FutureAwaiterTest.cpp @@ -17,8 +17,11 @@ #include #include "async_simple/Promise.h" +#include "async_simple/Signal.h" +#include "async_simple/coro/Collect.h" #include "async_simple/coro/FutureAwaiter.h" #include "async_simple/coro/Lazy.h" +#include "async_simple/coro/Sleep.h" #include "async_simple/coro/SyncAwait.h" #include "async_simple/executors/SimpleExecutor.h" #include "async_simple/test/unittest.h" @@ -76,6 +79,43 @@ TEST_F(FutureAwaiterTest, testWithFuture) { syncAwait(lazy3().via(&ex2)); } +TEST_F(FutureAwaiterTest, testWithFutureCancel) { + async_simple::executors::SimpleExecutor ex1(2); + auto lazy = [&]() -> Lazy<> { + Promise pr; + auto fut = pr.getFuture(); + sum(1, 1, [pr = std::move(pr)](int val) mutable { + std::this_thread::sleep_for(std::chrono::seconds::max()); + pr.setValue(val); + }); + async_simple::SignalType type = None; + try { + co_await std::move(fut); + } catch (const async_simple::SignalException& e) { + type = e.value(); + } catch (...) { + } + EXPECT_EQ(type, async_simple::Terminate); + }; + syncAwait(collectAll( + lazy().via(&ex1), + async_simple::coro::sleep(std::chrono::microseconds{10}).via(&ex1))); + auto lazy2 = [&]() -> Lazy<> { + Promise pr; + auto fut = pr.getFuture(); + sum(1, 1, [pr = std::move(pr)](int val) mutable { pr.setValue(val); }); + int ret = 0; + try { + ret = co_await std::move(fut); + } catch (...) { + } + EXPECT_EQ(ret, 2); + }; + syncAwait(collectAll( + lazy2().via(&ex1), + async_simple::coro::sleep(std::chrono::seconds::max()).via(&ex1))); +} + namespace detail { static_assert(HasGlobalCoAwaitOperator>); diff --git "a/docs/docs.cn/\344\277\241\345\217\267\344\270\216\344\273\273\345\212\241\347\232\204\345\217\226\346\266\210.md" "b/docs/docs.cn/\344\277\241\345\217\267\344\270\216\344\273\273\345\212\241\347\232\204\345\217\226\346\266\210.md" index 39599c113..9749e94ff 100644 --- "a/docs/docs.cn/\344\277\241\345\217\267\344\270\216\344\273\273\345\212\241\347\232\204\345\217\226\346\266\210.md" +++ "b/docs/docs.cn/\344\277\241\345\217\267\344\270\216\344\273\273\345\212\241\347\232\204\345\217\226\346\266\210.md" @@ -257,14 +257,14 @@ syncAwait(Task().setLazyLocal(signal.get()).via(ex)); ### 支持取消操作与信号转发的对象与函数 -除了手动判断取消信号是否被触发,async-simple许多可能挂起的函数都为取消操作提供了支持。此外,`collect*`函数支持将外部收到的信号转发给由`collect*`函数启动的协程。 +除了手动判断取消信号是否被触发,async-simple许多可能挂起的函数都支持取消操作。此外,`collect*`函数支持将外部收到的信号转发给由`collect*`函数启动的协程。 -以下各对象与函数支持取消操作,响应信号的协程可能抛出异常`async_simple::SignalException`,调用`value()`应返回信号类型(对于取消操作为`async_simple::terminate`)。 -此外,下列IO函数在挂起/恢复协程时都会自动插入两个检查点判断任务是否被取消。 +以下各函数支持取消操作,响应信号的协程可能抛出异常`async_simple::SignalException`,调用该异常的`value()`会返回信号类型(通常为`async_simple::terminate`)。此外,这些函数在挂起/恢复协程时都会自动插入两个检查点判断任务是否被取消。 1. CollectAny:CollectAny会将信号转发给所有子任务,如果收到取消信号,会抛出异常立即返回。 2. CollectAll:CollectAny会将信号转发给所有子任务,即使收到取消信号,自身依然会等待所有子任务执行完毕后正常返回。 -3. Yield/SpinLock:如果被取消,会抛出异常。目前暂不支持取消在调度器中排队的任务。 +3. Yield/SpinLock上锁:如果被取消,会抛出异常。目前暂不支持取消在调度器中排队的任务。 +4. Future:返回值Try中将包含异常。 4. Sleep: 依赖于调度器是否重写了虚函数`void schedule(Func func, Duration dur, uint64_t schedule_info, Slot *slot = nullptr)`,并正确实现了取消功能。如果未重写该函数,默认实现支持取消Sleep。 以下IO对象与函数暂未支持取消操作,有待后续完善。 @@ -272,7 +272,6 @@ syncAwait(Task().setLazyLocal(signal.get()).via(ex)); 2. ConditionVariable 3. SharedMutex 4. Latch -5. Promise/Future 6. CountingSemaphore ### 自定义Awaiter如何支持取消 diff --git a/docs/docs.en/SignalAndCancellation.md b/docs/docs.en/SignalAndCancellation.md index 4a23a24fa..dbee8444c 100644 --- a/docs/docs.en/SignalAndCancellation.md +++ b/docs/docs.en/SignalAndCancellation.md @@ -255,12 +255,12 @@ syncAwait(Task().setLazyLocal(signal.get()).via(ex)); In addition to manually checking for triggered cancellation signals, many potentially suspendable functions in `async-simple` support cancellation operations. Additionally, `collect*` functions support forwarding signals received externally to coroutines initiated by `collect*`. -The following objects and functions support cancellation operations. Coroutines responding to signals may throw the `async_simple::SignalException` exception, and calling `value()` should return the signal type (for cancellation operations, this is `async_simple::terminate`). -Also, the following IO functions will automatically insert two checkpoints to check if the task has been canceled during the suspension/resumption of the coroutine. +The following objects and functions support cancellation operations. Coroutines responding to signals may throw the `async_simple::SignalException` exception, and calling `value()` should return the signal type (for cancellation operations, this is `async_simple::terminate`). Also, they will automatically insert two checkpoints to check if the task has been canceled during the suspension/resumption of the coroutine. 1. `CollectAny`: Forwards signals to all subtasks. If a cancellation signal is received, an exception is thrown and it returns immediately. 2. `CollectAll`: Forwards signals to all subtasks. Even if a cancellation signal is received, it waits for all subtasks to complete before normally returning. 3. `Yield/SpinLock`: Throws an exception if canceled. Currently, canceling tasks queued in the scheduler is not supported. +3. `Future`: co_await a future could be canceled. It will return `Try` which contains a exception. 4. `Sleep`: Depends on whether the scheduler overrides the virtual function `void schedule(Func func, Duration dur, uint64_t schedule_info, Slot *slot = nullptr)` and correctly implements the cancellation functionality. If not overridden, the default implementation supports canceling Sleep. The following IO objects and functions do not yet support cancellation operations and await further improvements: @@ -268,7 +268,6 @@ The following IO objects and functions do not yet support cancellation operation 2. `ConditionVariable` 3. `SharedMutex` 4. `Latch` -5. `Promise/Future` 6. `CountingSemaphore` ### Supporting cancellation in custom Awaiters