Skip to content

Commit 707da49

Browse files
committed
feat: add asio adaptor
1 parent ad6f3d5 commit 707da49

File tree

4 files changed

+783
-0
lines changed

4 files changed

+783
-0
lines changed

.github/workflows/ci.yml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,12 @@ jobs:
8787
steps:
8888
- uses: actions/checkout@v4
8989

90+
- name: Init ASIO
91+
run: git clone https://github.com/chriskohlhoff/asio.git -b asio-1-32-0 --depth=1
92+
9093
- name: Configure ${{ matrix.build_type }} disable_exception:${{ matrix.disable_exception }} sanitize_address:${{ matrix.sanitize_address }} sanitize_thread:${{ matrix.sanitize_thread }}
94+
env:
95+
ASIO_PATH: asio/asio/include
9196
run: |
9297
cmake -S . -B build \
9398
-DCMAKE_BUILD_TYPE=${{ matrix.build_type }} \
@@ -206,3 +211,9 @@ jobs:
206211
working-directory: build
207212
run: |
208213
./coro_coro_local${{ matrix.env.BIN_SUFFIX }}
214+
215+
- name: Test asio_adaptor
216+
if: always()
217+
working-directory: build
218+
run: |
219+
./coro_asio_adaptor${{ matrix.env.BIN_SUFFIX }}

CMakeLists.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,4 +137,13 @@ if (CORO_BUILD_TEST)
137137
set(TARGET_NAME ${PROJECT_NAME}_coro_local)
138138
add_executable(${TARGET_NAME} test/coro_local.cpp)
139139
target_compile_definitions(${TARGET_NAME} PRIVATE -DCORO_ENABLE_LOCAL_STORAGE)
140+
141+
message(STATUS "ASIO_PATH: $ENV{ASIO_PATH}")
142+
# test asio adaptor (optional, requires asio)
143+
if (NOT "$ENV{ASIO_PATH}" STREQUAL "")
144+
add_definitions(-DASIO_NO_DEPRECATED)
145+
set(TARGET_NAME ${PROJECT_NAME}_asio_adaptor)
146+
add_executable(${TARGET_NAME} test/coro_asio_adaptor.cpp)
147+
target_include_directories(${TARGET_NAME} PRIVATE $ENV{ASIO_PATH})
148+
endif()
140149
endif ()
Lines changed: 342 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,342 @@
1+
#pragma once
2+
3+
/// ASIO Adaptor for coro library
4+
/// This header provides interoperability between coro::async<T> and asio::awaitable<T>
5+
///
6+
/// Features:
7+
/// 1. asio_executor: Adapts asio::io_context to coro::executor interface
8+
/// 2. await_asio(): Allows coro coroutines to co_await asio::awaitable<T>
9+
/// 3. await_coro(): Allows asio coroutines to co_await coro::async<T>
10+
///
11+
/// Usage:
12+
/// // In coro coroutine, await asio awaitable:
13+
/// coro::async<void> my_coro_task() {
14+
/// auto result = co_await coro::await_asio(some_asio_awaitable());
15+
/// }
16+
///
17+
/// // In asio coroutine, await coro async:
18+
/// asio::awaitable<void> my_asio_task() {
19+
/// auto result = co_await coro::await_coro(some_coro_async());
20+
/// }
21+
22+
#include <asio.hpp>
23+
#include <cassert>
24+
#include <coroutine>
25+
#include <functional>
26+
#include <memory>
27+
#include <optional>
28+
#include <type_traits>
29+
#include <utility>
30+
31+
#include "../coro.hpp"
32+
#include "../executor.hpp"
33+
34+
namespace coro {
35+
36+
/// asio_executor: Adapts asio::io_context to coro::executor interface
37+
/// This allows coro coroutines to run on an asio event loop
38+
class asio_executor : public executor {
39+
public:
40+
explicit asio_executor(asio::io_context& io_context) : io_context_(io_context) {}
41+
42+
void dispatch(std::function<void()> fn) override {
43+
asio::dispatch(io_context_, std::move(fn));
44+
}
45+
46+
void post(std::function<void()> fn) override {
47+
asio::post(io_context_, std::move(fn));
48+
}
49+
50+
void post_delayed_ns(std::function<void()> fn, uint64_t delay_ns) override {
51+
auto timer = std::make_shared<asio::steady_timer>(io_context_);
52+
timer->expires_after(std::chrono::nanoseconds(delay_ns));
53+
timer->async_wait([timer, fn = std::move(fn)](const asio::error_code& ec) mutable {
54+
timer = nullptr;
55+
if (!ec) {
56+
fn();
57+
}
58+
});
59+
}
60+
61+
void stop() override {
62+
io_context_.stop();
63+
}
64+
65+
asio::io_context& get_io_context() {
66+
return io_context_;
67+
}
68+
69+
private:
70+
asio::io_context& io_context_;
71+
};
72+
73+
namespace detail {
74+
75+
/// Helper to get io_context from executor (works with asio_executor)
76+
inline asio::io_context* get_io_context_from_executor(executor* exec) {
77+
if (auto* asio_exec = dynamic_cast<asio_executor*>(exec)) {
78+
return &asio_exec->get_io_context();
79+
}
80+
return nullptr;
81+
}
82+
83+
/// Awaiter for awaiting asio::awaitable<T> inside coro::async<T>
84+
template <typename T>
85+
struct asio_awaitable_awaiter {
86+
// Shared state to safely communicate between asio callback and coro coroutine
87+
struct shared_state {
88+
std::optional<T> result;
89+
#ifndef CORO_DISABLE_EXCEPTION
90+
std::exception_ptr exception;
91+
#endif
92+
};
93+
94+
asio::awaitable<T> asio_awaitable_;
95+
std::shared_ptr<shared_state> state_;
96+
97+
explicit asio_awaitable_awaiter(asio::awaitable<T> aw) : asio_awaitable_(std::move(aw)), state_(std::make_shared<shared_state>()) {}
98+
99+
bool await_ready() const noexcept {
100+
return false;
101+
}
102+
103+
template <typename Promise>
104+
void await_suspend(std::coroutine_handle<Promise> handle) {
105+
auto* exec = handle.promise().executor_;
106+
assert(exec && "executor must be set");
107+
108+
auto* io_ctx = get_io_context_from_executor(exec);
109+
assert(io_ctx && "asio_executor required for await_asio");
110+
111+
auto state = state_; // Copy shared_ptr for the lambda
112+
113+
// Spawn the asio awaitable and resume when complete
114+
asio::co_spawn(*io_ctx, std::move(asio_awaitable_), [state, handle, io_ctx](std::exception_ptr ep, T value) {
115+
// If io_context is stopped, don't resume the coroutine as it may have been
116+
// destroyed or its captured references may be invalid
117+
if (io_ctx->stopped()) {
118+
return;
119+
}
120+
#ifndef CORO_DISABLE_EXCEPTION
121+
if (ep) {
122+
state->exception = ep;
123+
} else {
124+
state->result = std::move(value);
125+
}
126+
#else
127+
state->result = std::move(value);
128+
(void)ep;
129+
#endif
130+
// Use asio::post to schedule the resume. Check stopped() again inside
131+
// the posted handler since io_context may be stopped between now and
132+
// when the handler executes.
133+
asio::post(*io_ctx, [handle, io_ctx]() {
134+
if (!io_ctx->stopped()) {
135+
handle.resume();
136+
}
137+
});
138+
});
139+
}
140+
141+
T await_resume() {
142+
#ifndef CORO_DISABLE_EXCEPTION
143+
if (state_->exception) {
144+
std::rethrow_exception(state_->exception);
145+
}
146+
#endif
147+
return std::move(state_->result.value());
148+
}
149+
};
150+
151+
/// Specialization for void
152+
template <>
153+
struct asio_awaitable_awaiter<void> {
154+
#ifndef CORO_DISABLE_EXCEPTION
155+
// Shared state to safely communicate between asio callback and coro coroutine
156+
struct shared_state {
157+
std::exception_ptr exception;
158+
};
159+
#endif
160+
161+
asio::awaitable<void> asio_awaitable_;
162+
#ifndef CORO_DISABLE_EXCEPTION
163+
std::shared_ptr<shared_state> state_;
164+
165+
explicit asio_awaitable_awaiter(asio::awaitable<void> aw) : asio_awaitable_(std::move(aw)), state_(std::make_shared<shared_state>()) {}
166+
#else
167+
explicit asio_awaitable_awaiter(asio::awaitable<void> aw) : asio_awaitable_(std::move(aw)) {}
168+
#endif
169+
170+
bool await_ready() const noexcept {
171+
return false;
172+
}
173+
174+
template <typename Promise>
175+
void await_suspend(std::coroutine_handle<Promise> handle) {
176+
auto* exec = handle.promise().executor_;
177+
assert(exec && "executor must be set");
178+
179+
auto* io_ctx = get_io_context_from_executor(exec);
180+
assert(io_ctx && "asio_executor required for await_asio");
181+
182+
#ifndef CORO_DISABLE_EXCEPTION
183+
auto state = state_; // Copy shared_ptr for the lambda
184+
185+
asio::co_spawn(*io_ctx, std::move(asio_awaitable_), [state, handle, io_ctx](std::exception_ptr ep) {
186+
// If io_context is stopped, don't resume the coroutine as it may have been
187+
// destroyed or its captured references may be invalid
188+
if (io_ctx->stopped()) {
189+
return;
190+
}
191+
state->exception = ep;
192+
// Use asio::post to schedule the resume. Check stopped() again inside
193+
// the posted handler since io_context may be stopped between now and
194+
// when the handler executes.
195+
asio::post(*io_ctx, [handle, io_ctx]() {
196+
if (!io_ctx->stopped()) {
197+
handle.resume();
198+
}
199+
});
200+
});
201+
#else
202+
asio::co_spawn(*io_ctx, std::move(asio_awaitable_), [handle, io_ctx](std::exception_ptr) {
203+
if (io_ctx->stopped()) {
204+
return;
205+
}
206+
asio::post(*io_ctx, [handle, io_ctx]() {
207+
if (!io_ctx->stopped()) {
208+
handle.resume();
209+
}
210+
});
211+
});
212+
#endif
213+
}
214+
215+
void await_resume() {
216+
#ifndef CORO_DISABLE_EXCEPTION
217+
if (state_->exception) {
218+
std::rethrow_exception(state_->exception);
219+
}
220+
#endif
221+
}
222+
};
223+
224+
} // namespace detail
225+
226+
/// await_asio: Allows coro coroutines to co_await asio::awaitable<T>
227+
/// Usage: auto result = co_await await_asio(some_asio_awaitable());
228+
template <typename T>
229+
[[nodiscard]] auto await_asio(asio::awaitable<T> aw) {
230+
return detail::asio_awaitable_awaiter<T>(std::move(aw));
231+
}
232+
233+
/// await_coro: Allows asio coroutines to co_await coro::async<T>
234+
/// Returns an asio::awaitable<T> that wraps the coro::async<T>
235+
/// Usage: auto result = co_await await_coro(some_coro_async());
236+
template <typename T>
237+
[[nodiscard]] asio::awaitable<T> await_coro(async<T> aw) {
238+
auto executor = co_await asio::this_coro::executor;
239+
auto& io_ctx = static_cast<asio::io_context&>(executor.context());
240+
241+
// Use a shared state to communicate between callbacks
242+
// Also stores asio_executor to keep it alive during coro execution
243+
struct state {
244+
asio_executor exec;
245+
std::optional<T> result;
246+
#ifndef CORO_DISABLE_EXCEPTION
247+
std::exception_ptr exception;
248+
#endif
249+
explicit state(asio::io_context& ctx) : exec(ctx) {}
250+
};
251+
auto st = std::make_shared<state>(io_ctx);
252+
253+
// Create a deferred timer that we'll use to signal completion
254+
auto signal_timer = std::make_shared<asio::steady_timer>(io_ctx);
255+
signal_timer->expires_at(asio::steady_timer::time_point::max());
256+
257+
aw.detach_with_callback(
258+
st->exec,
259+
[st, signal_timer](T value) {
260+
st->result = std::move(value);
261+
signal_timer->cancel();
262+
},
263+
[st, signal_timer]([[maybe_unused]] std::exception_ptr ep) {
264+
#ifndef CORO_DISABLE_EXCEPTION
265+
st->exception = ep;
266+
#endif
267+
signal_timer->cancel();
268+
});
269+
270+
// Wait for the signal (timer cancel)
271+
asio::error_code ec;
272+
co_await signal_timer->async_wait(asio::redirect_error(asio::use_awaitable, ec));
273+
274+
#ifndef CORO_DISABLE_EXCEPTION
275+
if (st->exception) {
276+
std::rethrow_exception(st->exception);
277+
}
278+
#endif
279+
co_return std::move(st->result.value());
280+
}
281+
282+
/// Specialization for void
283+
template <>
284+
[[nodiscard]] inline asio::awaitable<void> await_coro(async<void> aw) {
285+
auto executor = co_await asio::this_coro::executor;
286+
auto& io_ctx = static_cast<asio::io_context&>(executor.context());
287+
288+
// Create a deferred timer that we'll use to signal completion
289+
auto signal_timer = std::make_shared<asio::steady_timer>(io_ctx);
290+
signal_timer->expires_at(asio::steady_timer::time_point::max());
291+
292+
#ifndef CORO_DISABLE_EXCEPTION
293+
// Use a shared state to communicate exception between callbacks
294+
// Also stores asio_executor to keep it alive during coro execution
295+
struct state {
296+
asio_executor exec;
297+
std::exception_ptr exception;
298+
explicit state(asio::io_context& ctx) : exec(ctx) {}
299+
};
300+
auto st = std::make_shared<state>(io_ctx);
301+
302+
aw.detach_with_callback(
303+
st->exec,
304+
[signal_timer]() {
305+
signal_timer->cancel();
306+
},
307+
[st, signal_timer](std::exception_ptr ep) {
308+
st->exception = ep;
309+
signal_timer->cancel();
310+
});
311+
312+
// Wait for the signal (timer cancel)
313+
asio::error_code ec;
314+
co_await signal_timer->async_wait(asio::redirect_error(asio::use_awaitable, ec));
315+
316+
if (st->exception) {
317+
std::rethrow_exception(st->exception);
318+
}
319+
#else
320+
// Without exceptions, asio_executor can be stored in shared state
321+
struct state {
322+
asio_executor exec;
323+
explicit state(asio::io_context& ctx) : exec(ctx) {}
324+
};
325+
auto st = std::make_shared<state>(io_ctx);
326+
327+
aw.detach_with_callback(
328+
st->exec,
329+
[signal_timer]() {
330+
signal_timer->cancel();
331+
},
332+
[signal_timer](std::exception_ptr) {
333+
signal_timer->cancel();
334+
});
335+
336+
// Wait for the signal (timer cancel)
337+
asio::error_code ec;
338+
co_await signal_timer->async_wait(asio::redirect_error(asio::use_awaitable, ec));
339+
#endif
340+
}
341+
342+
} // namespace coro

0 commit comments

Comments
 (0)