Skip to content

Commit 3e40916

Browse files
committed
librados/asio: forward asio cancellations to AioCompletion::cancel()
if the completion handler has a cancellation slot connected, construct a cancellation handler for it that calls AioCompletion::cancel() read operations can support more cancellation types than writes. see the table of cancellation types and their associated requirements/guarantees: https://www.boost.org/doc/libs/1_85_0/doc/html/boost_asio/overview/core/cancellation.html#boost_asio.overview.core.cancellation.t0 Signed-off-by: Casey Bodley <[email protected]>
1 parent 0331c2f commit 3e40916

File tree

2 files changed

+188
-6
lines changed

2 files changed

+188
-6
lines changed

src/librados/librados_asio.h

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
#ifndef LIBRADOS_ASIO_H
1515
#define LIBRADOS_ASIO_H
1616

17+
#include <boost/asio/associated_cancellation_slot.hpp>
18+
#include <boost/asio/cancellation_type.hpp>
19+
1720
#include "include/rados/librados.hpp"
1821
#include "common/async/completion.h"
1922
#include "librados/AioCompletionImpl.h"
@@ -74,6 +77,7 @@ struct Invoker<void> {
7477
template <typename Result>
7578
struct AsyncOp : Invoker<Result> {
7679
unique_aio_completion_ptr aio_completion;
80+
boost::asio::cancellation_slot slot;
7781

7882
using Signature = typename Invoker<Result>::Signature;
7983
using Completion = ceph::async::Completion<Signature, AsyncOp<Result>>;
@@ -83,6 +87,7 @@ struct AsyncOp : Invoker<Result> {
8387
auto p = std::unique_ptr<Completion>{static_cast<Completion*>(arg)};
8488
// move result out of Completion memory being freed
8589
auto op = std::move(p->user_data);
90+
op.slot.clear(); // clear our cancellation handler
8691
// access AioCompletionImpl directly to avoid locking
8792
const librados::AioCompletionImpl* pc = op.aio_completion->pc;
8893
const int ret = pc->rval;
@@ -94,11 +99,46 @@ struct AsyncOp : Invoker<Result> {
9499
op.dispatch(std::move(p), ec, ver);
95100
}
96101

102+
struct op_cancellation {
103+
AioCompletion* completion = nullptr;
104+
bool is_read = false;
105+
106+
void operator()(boost::asio::cancellation_type type) {
107+
if (completion == nullptr) {
108+
return; // no AioCompletion attached
109+
} else if (type == boost::asio::cancellation_type::none) {
110+
return; // no cancellation requested
111+
} else if (is_read) {
112+
// read operations produce no side effects, so can satisfy the
113+
// requirements of 'total' cancellation. the weaker requirements
114+
// of 'partial' and 'terminal' are also satisfied
115+
completion->cancel();
116+
} else if (type == boost::asio::cancellation_type::terminal) {
117+
// write operations only support 'terminal' cancellation because we
118+
// can't guarantee that no osd has succeeded (or will succeed) in
119+
// applying the write
120+
completion->cancel();
121+
}
122+
}
123+
};
124+
97125
template <typename Executor1, typename CompletionHandler>
98-
static auto create(const Executor1& ex1, CompletionHandler&& handler) {
126+
static auto create(const Executor1& ex1, bool is_read,
127+
CompletionHandler&& handler) {
128+
op_cancellation* cancel_handler = nullptr;
129+
auto slot = boost::asio::get_associated_cancellation_slot(handler);
130+
if (slot.is_connected()) {
131+
cancel_handler = &slot.template emplace<op_cancellation>();
132+
}
133+
99134
auto p = Completion::create(ex1, std::move(handler));
100135
p->user_data.aio_completion.reset(
101136
Rados::aio_create_completion(p.get(), aio_dispatch));
137+
if (cancel_handler) {
138+
cancel_handler->completion = p->user_data.aio_completion.get();
139+
cancel_handler->is_read = is_read;
140+
p->user_data.slot = std::move(slot);
141+
}
102142
return p;
103143
}
104144
};
@@ -117,7 +157,8 @@ auto async_read(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
117157
return boost::asio::async_initiate<CompletionToken, Signature>(
118158
[] (auto handler, auto ex, IoCtx& io, const std::string& oid,
119159
size_t len, uint64_t off) {
120-
auto p = Op::create(ex, std::move(handler));
160+
constexpr bool is_read = true;
161+
auto p = Op::create(ex, is_read, std::move(handler));
121162
auto& op = p->user_data;
122163

123164
int ret = io.aio_read(oid, op.aio_completion.get(), &op.result, len, off);
@@ -142,7 +183,8 @@ auto async_write(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
142183
return boost::asio::async_initiate<CompletionToken, Signature>(
143184
[] (auto handler, auto ex, IoCtx& io, const std::string& oid,
144185
const bufferlist &bl, size_t len, uint64_t off) {
145-
auto p = Op::create(ex, std::move(handler));
186+
constexpr bool is_read = false;
187+
auto p = Op::create(ex, is_read, std::move(handler));
146188
auto& op = p->user_data;
147189

148190
int ret = io.aio_write(oid, op.aio_completion.get(), bl, len, off);
@@ -167,7 +209,8 @@ auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
167209
return boost::asio::async_initiate<CompletionToken, Signature>(
168210
[] (auto handler, auto ex, IoCtx& io, const std::string& oid,
169211
ObjectReadOperation *read_op, int flags) {
170-
auto p = Op::create(ex, std::move(handler));
212+
constexpr bool is_read = true;
213+
auto p = Op::create(ex, is_read, std::move(handler));
171214
auto& op = p->user_data;
172215

173216
int ret = io.aio_operate(oid, op.aio_completion.get(), read_op,
@@ -194,7 +237,8 @@ auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
194237
[] (auto handler, auto ex, IoCtx& io, const std::string& oid,
195238
ObjectWriteOperation *write_op, int flags,
196239
const jspan_context* trace_ctx) {
197-
auto p = Op::create(ex, std::move(handler));
240+
constexpr bool is_read = false;
241+
auto p = Op::create(ex, is_read, std::move(handler));
198242
auto& op = p->user_data;
199243

200244
int ret = io.aio_operate(oid, op.aio_completion.get(), write_op, flags, trace_ctx);
@@ -218,7 +262,8 @@ auto async_notify(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
218262
return boost::asio::async_initiate<CompletionToken, Signature>(
219263
[] (auto handler, auto ex, IoCtx& io, const std::string& oid,
220264
bufferlist& bl, uint64_t timeout_ms) {
221-
auto p = Op::create(ex, std::move(handler));
265+
constexpr bool is_read = false;
266+
auto p = Op::create(ex, is_read, std::move(handler));
222267
auto& op = p->user_data;
223268

224269
int ret = io.aio_notify(oid, op.aio_completion.get(),

src/test/librados/asio.cc

Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,14 @@
2121

2222
#include <boost/range/begin.hpp>
2323
#include <boost/range/end.hpp>
24+
#include <boost/asio/bind_cancellation_slot.hpp>
25+
#include <boost/asio/cancellation_signal.hpp>
2426
#include <boost/asio/io_context.hpp>
2527
#include <boost/asio/spawn.hpp>
2628
#include <boost/asio/use_future.hpp>
2729

30+
#include <optional>
31+
2832
#define dout_subsys ceph_subsys_rados
2933
#define dout_context g_ceph_context
3034

@@ -78,6 +82,15 @@ void rethrow(std::exception_ptr eptr) {
7882
if (eptr) std::rethrow_exception(eptr);
7983
}
8084

85+
auto capture(std::optional<error_code>& out) {
86+
return [&out] (error_code ec, ...) { out = ec; };
87+
}
88+
89+
auto capture(boost::asio::cancellation_signal& signal,
90+
std::optional<error_code>& out) {
91+
return boost::asio::bind_cancellation_slot(signal.slot(), capture(out));
92+
}
93+
8194
TEST_F(AsioRados, AsyncReadCallback)
8295
{
8396
boost::asio::io_context service;
@@ -385,6 +398,130 @@ TEST_F(AsioRados, AsyncWriteOperationYield)
385398
service.run();
386399
}
387400

401+
// FIXME: this crashes on windows with:
402+
// Thread 1 received signal SIGILL, Illegal instruction.
403+
#ifndef _WIN32
404+
405+
TEST_F(AsioRados, AsyncReadOperationCancelTerminal)
406+
{
407+
// cancellation tests are racy, so retry if completion beats the cancellation
408+
boost::system::error_code ec;
409+
int tries = 10;
410+
do {
411+
boost::asio::io_context service;
412+
boost::asio::cancellation_signal signal;
413+
std::optional<error_code> result;
414+
415+
librados::ObjectReadOperation op;
416+
op.assert_exists();
417+
librados::async_operate(service, io, "noexist", &op, 0, nullptr,
418+
capture(signal, result));
419+
420+
service.poll();
421+
EXPECT_FALSE(service.stopped());
422+
EXPECT_FALSE(result);
423+
424+
signal.emit(boost::asio::cancellation_type::terminal);
425+
426+
service.run();
427+
ASSERT_TRUE(result);
428+
ec = *result;
429+
430+
signal.emit(boost::asio::cancellation_type::all); // noop
431+
} while (ec == std::errc::no_such_file_or_directory && --tries);
432+
433+
EXPECT_EQ(ec, boost::asio::error::operation_aborted);
434+
}
435+
436+
TEST_F(AsioRados, AsyncReadOperationCancelTotal)
437+
{
438+
// cancellation tests are racy, so retry if completion beats the cancellation
439+
boost::system::error_code ec;
440+
int tries = 10;
441+
do {
442+
boost::asio::io_context service;
443+
boost::asio::cancellation_signal signal;
444+
std::optional<error_code> result;
445+
446+
librados::ObjectReadOperation op;
447+
op.assert_exists();
448+
librados::async_operate(service, io, "noexist", &op, 0, nullptr,
449+
capture(signal, result));
450+
451+
service.poll();
452+
EXPECT_FALSE(service.stopped());
453+
EXPECT_FALSE(result);
454+
455+
signal.emit(boost::asio::cancellation_type::total);
456+
457+
service.run();
458+
ASSERT_TRUE(result);
459+
ec = *result;
460+
461+
signal.emit(boost::asio::cancellation_type::all); // noop
462+
} while (ec == std::errc::no_such_file_or_directory && --tries);
463+
464+
EXPECT_EQ(ec, boost::asio::error::operation_aborted);
465+
}
466+
467+
TEST_F(AsioRados, AsyncWriteOperationCancelTerminal)
468+
{
469+
// cancellation tests are racy, so retry if completion beats the cancellation
470+
boost::system::error_code ec;
471+
int tries = 10;
472+
do {
473+
boost::asio::io_context service;
474+
boost::asio::cancellation_signal signal;
475+
std::optional<error_code> result;
476+
477+
librados::ObjectWriteOperation op;
478+
op.assert_exists();
479+
librados::async_operate(service, io, "noexist", &op, 0, nullptr,
480+
capture(signal, result));
481+
482+
service.poll();
483+
EXPECT_FALSE(service.stopped());
484+
EXPECT_FALSE(result);
485+
486+
signal.emit(boost::asio::cancellation_type::terminal);
487+
488+
service.run();
489+
ASSERT_TRUE(result);
490+
ec = *result;
491+
492+
signal.emit(boost::asio::cancellation_type::all); // noop
493+
} while (ec == std::errc::no_such_file_or_directory && --tries);
494+
495+
EXPECT_EQ(ec, boost::asio::error::operation_aborted);
496+
}
497+
498+
TEST_F(AsioRados, AsyncWriteOperationCancelTotal)
499+
{
500+
boost::asio::io_context service;
501+
boost::asio::cancellation_signal signal;
502+
std::optional<error_code> ec;
503+
504+
librados::ObjectWriteOperation op;
505+
op.assert_exists();
506+
librados::async_operate(service, io, "noexist", &op, 0, nullptr,
507+
capture(signal, ec));
508+
509+
service.poll();
510+
EXPECT_FALSE(service.stopped());
511+
EXPECT_FALSE(ec);
512+
513+
// noop, write only supports terminal
514+
signal.emit(boost::asio::cancellation_type::total);
515+
516+
service.run();
517+
ASSERT_TRUE(ec);
518+
EXPECT_EQ(ec, std::errc::no_such_file_or_directory);
519+
520+
signal.emit(boost::asio::cancellation_type::all); // noop
521+
}
522+
523+
#endif // not _WIN32
524+
388525
int main(int argc, char **argv)
389526
{
390527
auto args = argv_to_vec(argc, argv);

0 commit comments

Comments
 (0)