Skip to content

Commit 525c945

Browse files
authored
Merge pull request ceph#60379 from cbodley/wip-librados-cancellation
librados/asio: forward asio cancellations to AioCompletion::cancel() Reviewed-by: Adam Emerson <[email protected]> Reviewed-by: Ilya Dryomov <[email protected]>
2 parents a0368dc + ca3cd0e commit 525c945

File tree

7 files changed

+385
-13
lines changed

7 files changed

+385
-13
lines changed

PendingReleaseNotes

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
(--yes-i-really-mean-it). This has been added as a precaution to tell the
3535
users that modifying "max_mds" may not help with troubleshooting or recovery
3636
effort. Instead, it might further destabilize the cluster.
37+
* RADOS: Added convenience function `librados::AioCompletion::cancel()` with
38+
the same behavior as `librados::IoCtx::aio_cancel()`.
3739

3840
* mgr/restful, mgr/zabbix: both modules, already deprecated since 2020, have been
3941
finally removed. They have not been actively maintenance in the last years,

src/include/rados/librados.hpp

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,8 @@ inline namespace v14_2_0 {
202202
int set_complete_callback(void *cb_arg, callback_t cb);
203203
int set_safe_callback(void *cb_arg, callback_t cb)
204204
__attribute__ ((deprecated));
205+
/// Request immediate cancellation as if by IoCtx::aio_cancel().
206+
int cancel();
205207
int wait_for_complete();
206208
int wait_for_safe() __attribute__ ((deprecated));
207209
int wait_for_complete_and_cb();
@@ -772,17 +774,30 @@ inline namespace v14_2_0 {
772774
void tier_evict();
773775
};
774776

775-
/* IoCtx : This is a context in which we can perform I/O.
776-
* It includes a Pool,
777+
/**
778+
* @brief A handle to a RADOS pool used to perform I/O operations.
777779
*
778780
* Typical use (error checking omitted):
779-
*
781+
* @code
780782
* IoCtx p;
781783
* rados.ioctx_create("my_pool", p);
782-
* p->stat(&stats);
783-
* ... etc ...
784+
* p.stat("my_object", &size, &mtime);
785+
* @endcode
786+
*
787+
* IoCtx holds a pointer to its underlying implementation. The dup()
788+
* method performs a deep copy of this implementation, but the copy
789+
* construction and assignment operations perform shallow copies by
790+
* sharing that pointer.
791+
*
792+
* Function names starting with aio_ are asynchronous operations that
793+
* return immediately after submitting a request, and whose completions
794+
* are managed by the given AioCompletion pointer. The IoCtx's underlying
795+
* implementation is involved in the delivery of these completions, so
796+
* the caller must guarantee that its lifetime is preserved until then -
797+
* if not by preserving the IoCtx instance that submitted the request,
798+
* then by a copied/moved instance that shares the same implementation.
784799
*
785-
* NOTE: be sure to call watch_flush() prior to destroying any IoCtx
800+
* @note Be sure to call watch_flush() prior to destroying any IoCtx
786801
* that is used for watch events to ensure that racing callbacks
787802
* have completed.
788803
*/
@@ -791,9 +806,13 @@ inline namespace v14_2_0 {
791806
public:
792807
IoCtx();
793808
static void from_rados_ioctx_t(rados_ioctx_t p, IoCtx &pool);
809+
/// Construct a shallow copy of rhs, sharing its underlying implementation.
794810
IoCtx(const IoCtx& rhs);
811+
/// Assign a shallow copy of rhs, sharing its underlying implementation.
795812
IoCtx& operator=(const IoCtx& rhs);
813+
/// Move construct from rhs, transferring its underlying implementation.
796814
IoCtx(IoCtx&& rhs) noexcept;
815+
/// Move assign from rhs, transferring its underlying implementation.
797816
IoCtx& operator=(IoCtx&& rhs) noexcept;
798817

799818
~IoCtx();
@@ -1150,7 +1169,8 @@ inline namespace v14_2_0 {
11501169
int aio_stat2(const std::string& oid, AioCompletion *c, uint64_t *psize, struct timespec *pts);
11511170

11521171
/**
1153-
* Cancel aio operation
1172+
* Request immediate cancellation with error code -ECANCELED
1173+
* if the operation hasn't already completed.
11541174
*
11551175
* @param c completion handle
11561176
* @returns 0 on success, negative error code on failure

src/librados/librados_asio.h

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414
#ifndef LIBRADOS_ASIO_H
1515
#define LIBRADOS_ASIO_H
1616

17+
#include <boost/asio/associated_cancellation_slot.hpp>
18+
#include <boost/asio/cancellation_type.hpp>
19+
1720
#include "include/rados/librados.hpp"
1821
#include "common/async/completion.h"
1922
#include "librados/AioCompletionImpl.h"
@@ -74,6 +77,7 @@ struct Invoker<void> {
7477
template <typename Result>
7578
struct AsyncOp : Invoker<Result> {
7679
unique_aio_completion_ptr aio_completion;
80+
boost::asio::cancellation_slot slot;
7781

7882
using Signature = typename Invoker<Result>::Signature;
7983
using Completion = ceph::async::Completion<Signature, AsyncOp<Result>>;
@@ -83,6 +87,7 @@ struct AsyncOp : Invoker<Result> {
8387
auto p = std::unique_ptr<Completion>{static_cast<Completion*>(arg)};
8488
// move result out of Completion memory being freed
8589
auto op = std::move(p->user_data);
90+
op.slot.clear(); // clear our cancellation handler
8691
// access AioCompletionImpl directly to avoid locking
8792
const librados::AioCompletionImpl* pc = op.aio_completion->pc;
8893
const int ret = pc->rval;
@@ -94,11 +99,46 @@ struct AsyncOp : Invoker<Result> {
9499
op.dispatch(std::move(p), ec, ver);
95100
}
96101

102+
struct op_cancellation {
103+
AioCompletion* completion = nullptr;
104+
bool is_read = false;
105+
106+
void operator()(boost::asio::cancellation_type type) {
107+
if (completion == nullptr) {
108+
return; // no AioCompletion attached
109+
} else if (type == boost::asio::cancellation_type::none) {
110+
return; // no cancellation requested
111+
} else if (is_read) {
112+
// read operations produce no side effects, so can satisfy the
113+
// requirements of 'total' cancellation. the weaker requirements
114+
// of 'partial' and 'terminal' are also satisfied
115+
completion->cancel();
116+
} else if (type == boost::asio::cancellation_type::terminal) {
117+
// write operations only support 'terminal' cancellation because we
118+
// can't guarantee that no osd has succeeded (or will succeed) in
119+
// applying the write
120+
completion->cancel();
121+
}
122+
}
123+
};
124+
97125
template <typename Executor1, typename CompletionHandler>
98-
static auto create(const Executor1& ex1, CompletionHandler&& handler) {
126+
static auto create(const Executor1& ex1, bool is_read,
127+
CompletionHandler&& handler) {
128+
op_cancellation* cancel_handler = nullptr;
129+
auto slot = boost::asio::get_associated_cancellation_slot(handler);
130+
if (slot.is_connected()) {
131+
cancel_handler = &slot.template emplace<op_cancellation>();
132+
}
133+
99134
auto p = Completion::create(ex1, std::move(handler));
100135
p->user_data.aio_completion.reset(
101136
Rados::aio_create_completion(p.get(), aio_dispatch));
137+
if (cancel_handler) {
138+
cancel_handler->completion = p->user_data.aio_completion.get();
139+
cancel_handler->is_read = is_read;
140+
p->user_data.slot = std::move(slot);
141+
}
102142
return p;
103143
}
104144
};
@@ -108,6 +148,9 @@ struct AsyncOp : Invoker<Result> {
108148

109149
/// Calls IoCtx::aio_read() and arranges for the AioCompletion to call a
110150
/// given handler with signature (error_code, version_t, bufferlist).
151+
///
152+
/// The given IoCtx reference is not required to remain valid, but some IoCtx
153+
/// instance must preserve its underlying implementation until completion.
111154
template <typename ExecutionContext, typename CompletionToken>
112155
auto async_read(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
113156
size_t len, uint64_t off, CompletionToken&& token)
@@ -117,7 +160,8 @@ auto async_read(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
117160
return boost::asio::async_initiate<CompletionToken, Signature>(
118161
[] (auto handler, auto ex, IoCtx& io, const std::string& oid,
119162
size_t len, uint64_t off) {
120-
auto p = Op::create(ex, std::move(handler));
163+
constexpr bool is_read = true;
164+
auto p = Op::create(ex, is_read, std::move(handler));
121165
auto& op = p->user_data;
122166

123167
int ret = io.aio_read(oid, op.aio_completion.get(), &op.result, len, off);
@@ -132,6 +176,9 @@ auto async_read(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
132176

133177
/// Calls IoCtx::aio_write() and arranges for the AioCompletion to call a
134178
/// given handler with signature (error_code, version_t).
179+
///
180+
/// The given IoCtx reference is not required to remain valid, but some IoCtx
181+
/// instance must preserve its underlying implementation until completion.
135182
template <typename ExecutionContext, typename CompletionToken>
136183
auto async_write(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
137184
const bufferlist &bl, size_t len, uint64_t off,
@@ -142,7 +189,8 @@ auto async_write(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
142189
return boost::asio::async_initiate<CompletionToken, Signature>(
143190
[] (auto handler, auto ex, IoCtx& io, const std::string& oid,
144191
const bufferlist &bl, size_t len, uint64_t off) {
145-
auto p = Op::create(ex, std::move(handler));
192+
constexpr bool is_read = false;
193+
auto p = Op::create(ex, is_read, std::move(handler));
146194
auto& op = p->user_data;
147195

148196
int ret = io.aio_write(oid, op.aio_completion.get(), bl, len, off);
@@ -157,6 +205,9 @@ auto async_write(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
157205

158206
/// Calls IoCtx::aio_operate() and arranges for the AioCompletion to call a
159207
/// given handler with signature (error_code, version_t, bufferlist).
208+
///
209+
/// The given IoCtx reference is not required to remain valid, but some IoCtx
210+
/// instance must preserve its underlying implementation until completion.
160211
template <typename ExecutionContext, typename CompletionToken>
161212
auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
162213
ObjectReadOperation *read_op, int flags,
@@ -167,7 +218,8 @@ auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
167218
return boost::asio::async_initiate<CompletionToken, Signature>(
168219
[] (auto handler, auto ex, IoCtx& io, const std::string& oid,
169220
ObjectReadOperation *read_op, int flags) {
170-
auto p = Op::create(ex, std::move(handler));
221+
constexpr bool is_read = true;
222+
auto p = Op::create(ex, is_read, std::move(handler));
171223
auto& op = p->user_data;
172224

173225
int ret = io.aio_operate(oid, op.aio_completion.get(), read_op,
@@ -183,6 +235,9 @@ auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
183235

184236
/// Calls IoCtx::aio_operate() and arranges for the AioCompletion to call a
185237
/// given handler with signature (error_code, version_t).
238+
///
239+
/// The given IoCtx reference is not required to remain valid, but some IoCtx
240+
/// instance must preserve its underlying implementation until completion.
186241
template <typename ExecutionContext, typename CompletionToken>
187242
auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
188243
ObjectWriteOperation *write_op, int flags,
@@ -194,7 +249,8 @@ auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
194249
[] (auto handler, auto ex, IoCtx& io, const std::string& oid,
195250
ObjectWriteOperation *write_op, int flags,
196251
const jspan_context* trace_ctx) {
197-
auto p = Op::create(ex, std::move(handler));
252+
constexpr bool is_read = false;
253+
auto p = Op::create(ex, is_read, std::move(handler));
198254
auto& op = p->user_data;
199255

200256
int ret = io.aio_operate(oid, op.aio_completion.get(), write_op, flags, trace_ctx);
@@ -209,6 +265,9 @@ auto async_operate(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
209265

210266
/// Calls IoCtx::aio_notify() and arranges for the AioCompletion to call a
211267
/// given handler with signature (error_code, version_t, bufferlist).
268+
///
269+
/// The given IoCtx reference is not required to remain valid, but some IoCtx
270+
/// instance must preserve its underlying implementation until completion.
212271
template <typename ExecutionContext, typename CompletionToken>
213272
auto async_notify(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
214273
bufferlist& bl, uint64_t timeout_ms, CompletionToken &&token)
@@ -218,7 +277,8 @@ auto async_notify(ExecutionContext& ctx, IoCtx& io, const std::string& oid,
218277
return boost::asio::async_initiate<CompletionToken, Signature>(
219278
[] (auto handler, auto ex, IoCtx& io, const std::string& oid,
220279
bufferlist& bl, uint64_t timeout_ms) {
221-
auto p = Op::create(ex, std::move(handler));
280+
constexpr bool is_read = false;
281+
auto p = Op::create(ex, is_read, std::move(handler));
222282
auto& op = p->user_data;
223283

224284
int ret = io.aio_notify(oid, op.aio_completion.get(),

src/librados/librados_cxx.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1103,6 +1103,14 @@ void librados::AioCompletion::release()
11031103
delete this;
11041104
}
11051105

1106+
int librados::AioCompletion::cancel()
1107+
{
1108+
if (!pc->io) {
1109+
return 0; // no operation was started
1110+
}
1111+
return pc->io->aio_cancel(pc);
1112+
}
1113+
11061114
///////////////////////////// IoCtx //////////////////////////////
11071115
librados::IoCtx::IoCtx() : io_ctx_impl(NULL)
11081116
{

src/test/librados/aio.cc

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1722,3 +1722,59 @@ TEST(LibRadosAioEC, MultiWrite) {
17221722
rados_aio_release(my_completion2);
17231723
rados_aio_release(my_completion3);
17241724
}
1725+
1726+
TEST(LibRadosAio, CancelBeforeSubmit) {
1727+
AioTestData test_data;
1728+
ASSERT_EQ("", test_data.init());
1729+
1730+
rados_completion_t completion;
1731+
ASSERT_EQ(0, rados_aio_create_completion2(nullptr, nullptr, &completion));
1732+
1733+
ASSERT_EQ(0, rados_aio_cancel(test_data.m_ioctx, completion));
1734+
rados_aio_release(completion);
1735+
}
1736+
1737+
TEST(LibRadosAio, CancelBeforeComplete) {
1738+
AioTestData test_data;
1739+
ASSERT_EQ("", test_data.init());
1740+
1741+
// cancellation tests are racy, so retry if completion beats the cancellation
1742+
int ret = 0;
1743+
int tries = 10;
1744+
do {
1745+
rados_completion_t completion;
1746+
ASSERT_EQ(0, rados_aio_create_completion2(nullptr, nullptr, &completion));
1747+
char buf[128];
1748+
ASSERT_EQ(0, rados_aio_read(test_data.m_ioctx, "nonexistent",
1749+
completion, buf, sizeof(buf), 0));
1750+
1751+
ASSERT_EQ(0, rados_aio_cancel(test_data.m_ioctx, completion));
1752+
{
1753+
TestAlarm alarm;
1754+
ASSERT_EQ(0, rados_aio_wait_for_complete(completion));
1755+
}
1756+
ret = rados_aio_get_return_value(completion);
1757+
rados_aio_release(completion);
1758+
} while (ret == -ENOENT && --tries);
1759+
1760+
ASSERT_EQ(-ECANCELED, ret);
1761+
}
1762+
1763+
TEST(LibRadosAio, CancelAfterComplete) {
1764+
AioTestData test_data;
1765+
rados_completion_t completion;
1766+
ASSERT_EQ("", test_data.init());
1767+
1768+
ASSERT_EQ(0, rados_aio_create_completion2(nullptr, nullptr, &completion));
1769+
char buf[128];
1770+
ASSERT_EQ(0, rados_aio_read(test_data.m_ioctx, "nonexistent",
1771+
completion, buf, sizeof(buf), 0));
1772+
1773+
{
1774+
TestAlarm alarm;
1775+
ASSERT_EQ(0, rados_aio_wait_for_complete(completion));
1776+
}
1777+
ASSERT_EQ(0, rados_aio_cancel(test_data.m_ioctx, completion));
1778+
ASSERT_EQ(-ENOENT, rados_aio_get_return_value(completion));
1779+
rados_aio_release(completion);
1780+
}

0 commit comments

Comments
 (0)