Skip to content

Commit ae89a56

Browse files
committed
rgw/datalog: Manage and shutdown tasks properly
This is slightly ugly but good enough for now. Make sure we can block when shutting down background tasks. Remove a few `driver` parameters that are unused. This lets us simplify the IAM Policy and Lua tests and not construct stores we never use. (Which is good since we aren't running them under a cluster.) Signed-off-by: Adam C. Emerson <[email protected]>
1 parent a2d2664 commit ae89a56

23 files changed

+343
-275
lines changed

src/rgw/driver/rados/rgw_datalog.cc

Lines changed: 84 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ class RGWDataChangesOmap final : public RGWDataChangesBE {
131131
std::vector<std::string> oids;
132132

133133
public:
134-
RGWDataChangesOmap(neorados::RADOS& r,
134+
RGWDataChangesOmap(neorados::RADOS r,
135135
neorados::IOContext loc,
136136
RGWDataChangesLog& datalog,
137137
uint64_t gen_id,
@@ -272,7 +272,7 @@ class RGWDataChangesFIFO final : public RGWDataChangesBE {
272272
tiny_vector<LazyFIFO> fifos;
273273

274274
public:
275-
RGWDataChangesFIFO(neorados::RADOS& r,
275+
RGWDataChangesFIFO(neorados::RADOS r,
276276
neorados::IOContext loc,
277277
RGWDataChangesLog& datalog,
278278
uint64_t gen_id,
@@ -508,12 +508,12 @@ RGWDataChangesLog::start(const DoutPrefixProvider *dpp,
508508
}
509509

510510
if (renew) {
511-
asio::co_spawn(
511+
renew_future = asio::co_spawn(
512512
renew_strand,
513-
renew_run(shared_from_this()),
513+
renew_run(),
514514
asio::bind_cancellation_slot(renew_signal.slot(),
515515
asio::bind_executor(renew_strand,
516-
asio::detached)));
516+
asio::use_future)));
517517
}
518518
if (watch) {
519519
// Establish watch here so we won't be 'started up' until we're watching.
@@ -523,22 +523,22 @@ RGWDataChangesLog::start(const DoutPrefixProvider *dpp,
523523
throw sys::system_error{ENOTCONN, sys::generic_category(),
524524
"Unable to establish recovery watch!"};
525525
}
526-
asio::co_spawn(
526+
watch_future = asio::co_spawn(
527527
watch_strand,
528-
watch_loop(shared_from_this()),
528+
watch_loop(),
529529
asio::bind_cancellation_slot(watch_signal.slot(),
530530
asio::bind_executor(watch_strand,
531-
asio::detached)));
531+
asio::use_future)));
532532
}
533533
if (recovery) {
534534
// Recovery can run concurrent with normal operation, so we don't
535535
// have to block startup while we do all that I/O.
536-
asio::co_spawn(
536+
recovery_future = asio::co_spawn(
537537
recovery_strand,
538-
recover(dpp, shared_from_this()),
538+
recover(dpp),
539539
asio::bind_cancellation_slot(recovery_signal.slot(),
540540
asio::bind_executor(recovery_strand,
541-
asio::detached)));
541+
asio::use_future)));
542542
}
543543
co_return;
544544
}
@@ -665,7 +665,7 @@ RGWDataChangesLog::process_notification(const DoutPrefixProvider* dpp,
665665
}
666666

667667
asio::awaitable<void>
668-
RGWDataChangesLog::watch_loop(std::shared_ptr<RGWDataChangesLog>)
668+
RGWDataChangesLog::watch_loop()
669669
{
670670
const DoutPrefix dp(cct, dout_subsys, "rgw data changes log: ");
671671
const auto oid = get_sem_set_oid(0);
@@ -1351,7 +1351,9 @@ bool RGWDataChangesLog::going_down() const
13511351
return down_flag;
13521352
}
13531353

1354-
asio::awaitable<void> RGWDataChangesLog::shutdown() {
1354+
// Now, if we had an awaitable future…
1355+
asio::awaitable<void> RGWDataChangesLog::async_shutdown()
1356+
{
13551357
DoutPrefix dp{cct, ceph_subsys_rgw, "Datalog Shutdown"};
13561358
if (down_flag) {
13571359
co_return;
@@ -1377,15 +1379,77 @@ asio::awaitable<void> RGWDataChangesLog::shutdown() {
13771379
if (watchcookie && rados->check_watch(watchcookie)) {
13781380
auto wc = watchcookie;
13791381
watchcookie = 0;
1380-
co_await rados->unwatch(wc, loc, asio::use_awaitable);
1382+
try {
1383+
co_await rados->unwatch(wc, loc, asio::use_awaitable);
1384+
} catch (const std::exception& e) {
1385+
ldpp_dout(&dp, 2)
1386+
<< "RGWDataChangesLog::async_shutdown: unwatch failed: " << e.what()
1387+
<< dendl;
1388+
}
13811389
}
13821390
co_return;
13831391
}
13841392

1385-
asio::awaitable<void> RGWDataChangesLog::shutdown_or_timeout() {
1386-
using namespace asio::experimental::awaitable_operators;
1387-
asio::steady_timer t(co_await asio::this_coro::executor, 3s);
1388-
co_await (shutdown() || t.async_wait(asio::use_awaitable));
1393+
void RGWDataChangesLog::blocking_shutdown()
1394+
{
1395+
DoutPrefix dp{cct, ceph_subsys_rgw, "Datalog Shutdown"};
1396+
if (down_flag) {
1397+
return;
1398+
}
1399+
down_flag = true;
1400+
if (ran_background) {
1401+
renew_stop();
1402+
// Revisit this later
1403+
asio::dispatch(renew_strand,
1404+
[this]() {
1405+
renew_signal.emit(asio::cancellation_type::terminal);
1406+
});
1407+
try {
1408+
renew_future.wait();
1409+
} catch (const std::future_error& e) {
1410+
if (e.code() != std::future_errc::no_state) {
1411+
throw;
1412+
}
1413+
}
1414+
asio::dispatch(recovery_strand,
1415+
[this]() {
1416+
recovery_signal.emit(asio::cancellation_type::terminal);
1417+
});
1418+
try {
1419+
recovery_future.wait();
1420+
} catch (const std::future_error& e) {
1421+
if (e.code() != std::future_errc::no_state) {
1422+
throw;
1423+
}
1424+
}
1425+
asio::dispatch(watch_strand,
1426+
[this]() {
1427+
watch_signal.emit(asio::cancellation_type::terminal);
1428+
});
1429+
try {
1430+
watch_future.wait();
1431+
} catch (const std::future_error& e) {
1432+
if (e.code() != std::future_errc::no_state) {
1433+
throw;
1434+
}
1435+
}
1436+
if (watchcookie && rados->check_watch(watchcookie)) {
1437+
auto wc = watchcookie;
1438+
watchcookie = 0;
1439+
try {
1440+
rados->unwatch(wc, loc, async::use_blocked);
1441+
} catch (const std::exception& e) {
1442+
ldpp_dout(&dp, 2)
1443+
<< "RGWDataChangesLog::blocking_shutdown: unwatch failed: " << e.what()
1444+
<< dendl;
1445+
}
1446+
}
1447+
}
1448+
if (bes) {
1449+
bes->shutdown();
1450+
bes.reset();
1451+
}
1452+
return;
13891453
}
13901454

13911455
RGWDataChangesLog::~RGWDataChangesLog() {
@@ -1395,29 +1459,7 @@ RGWDataChangesLog::~RGWDataChangesLog() {
13951459
}
13961460
}
13971461

1398-
void RGWDataChangesLog::blocking_shutdown() {
1399-
if (!down_flag) {
1400-
try {
1401-
auto eptr = asio::co_spawn(rados->get_io_context(),
1402-
shutdown_or_timeout(),
1403-
async::use_blocked);
1404-
if (eptr) {
1405-
std::rethrow_exception(eptr);
1406-
}
1407-
} catch (const sys::system_error& e) {
1408-
lderr(cct) << __PRETTY_FUNCTION__
1409-
<< ": Failed to shutting down: " << e.what()
1410-
<< dendl;
1411-
} catch (const std::exception& e) {
1412-
lderr(cct) << __PRETTY_FUNCTION__
1413-
<< ": Failed to shutting down: " << e.what()
1414-
<< dendl;
1415-
}
1416-
}
1417-
}
1418-
1419-
asio::awaitable<void> RGWDataChangesLog::renew_run(
1420-
std::shared_ptr<RGWDataChangesLog>) {
1462+
asio::awaitable<void> RGWDataChangesLog::renew_run() {
14211463
static constexpr auto runs_per_prune = 150;
14221464
auto run = 0;
14231465
renew_timer.emplace(co_await asio::this_coro::executor);
@@ -1722,8 +1764,7 @@ RGWDataChangesLog::recover_shard(const DoutPrefixProvider* dpp, int index)
17221764
}
17231765

17241766
asio::awaitable<void> RGWDataChangesLog::recover(
1725-
const DoutPrefixProvider* dpp,
1726-
std::shared_ptr<RGWDataChangesLog>)
1767+
const DoutPrefixProvider* dpp)
17271768
{
17281769
co_await asio::co_spawn(
17291770
recovery_strand,

src/rgw/driver/rados/rgw_datalog.h

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#pragma once
55

66
#include <cstdint>
7+
#include <future>
78
#include <list>
89
#include <memory>
910
#include <mutex>
@@ -14,6 +15,7 @@
1415
#include <variant>
1516
#include <vector>
1617

18+
#include <boost/asio/use_future.hpp>
1719
#include <boost/asio/steady_timer.hpp>
1820

1921
#include <boost/container/flat_map.hpp>
@@ -206,7 +208,7 @@ class DataLogBackends final
206208
std::mutex m;
207209
RGWDataChangesLog& datalog;
208210

209-
DataLogBackends(neorados::RADOS& rados,
211+
DataLogBackends(neorados::RADOS rados,
210212
const neorados::Object oid,
211213
const neorados::IOContext& loc,
212214
fu2::unique_function<std::string(
@@ -350,8 +352,7 @@ struct hash<BucketGen> {
350352
};
351353
}
352354

353-
class RGWDataChangesLog
354-
: public std::enable_shared_from_this<RGWDataChangesLog> {
355+
class RGWDataChangesLog {
355356
friend class DataLogTestBase;
356357
friend DataLogBackends;
357358
CephContext *cct;
@@ -366,10 +367,13 @@ class RGWDataChangesLog
366367
using strand_t = asio::strand<executor_t>;
367368
strand_t renew_strand{executor};
368369
asio::cancellation_signal renew_signal = asio::cancellation_signal();
370+
std::future<void> renew_future;
369371
strand_t watch_strand{executor};
370372
asio::cancellation_signal watch_signal = asio::cancellation_signal();
373+
std::future<void> watch_future;
371374
strand_t recovery_strand{executor};
372375
asio::cancellation_signal recovery_signal = asio::cancellation_signal();
376+
std::future<void> recovery_future;
373377

374378
ceph::mono_time last_recovery = ceph::mono_clock::zero();
375379

@@ -414,8 +418,7 @@ class RGWDataChangesLog
414418
ceph::real_time expiration);
415419

416420
std::optional<asio::steady_timer> renew_timer;
417-
asio::awaitable<void> renew_run(
418-
std::shared_ptr<RGWDataChangesLog> renew_signal);
421+
asio::awaitable<void> renew_run();
419422
void renew_stop();
420423

421424
std::function<bool(const rgw_bucket& bucket, optional_yield y,
@@ -451,7 +454,7 @@ class RGWDataChangesLog
451454
std::string_view oid);
452455
asio::awaitable<void> process_notification(const DoutPrefixProvider* dpp,
453456
std::string_view oid);
454-
asio::awaitable<void> watch_loop(std::shared_ptr<RGWDataChangesLog>);
457+
asio::awaitable<void> watch_loop();
455458
int choose_oid(const rgw_bucket_shard& bs);
456459
asio::awaitable<void> add_entry(const DoutPrefixProvider *dpp,
457460
const RGWBucketInfo& bucket_info,
@@ -532,10 +535,8 @@ class RGWDataChangesLog
532535
ceph::mono_time fetch_time,
533536
bc::flat_map<std::string, uint64_t>&& semcount);
534537
asio::awaitable<void> recover_shard(const DoutPrefixProvider* dpp, int index);
535-
asio::awaitable<void> recover(const DoutPrefixProvider* dpp,
536-
std::shared_ptr<RGWDataChangesLog>);
537-
asio::awaitable<void> shutdown();
538-
asio::awaitable<void> shutdown_or_timeout();
538+
asio::awaitable<void> recover(const DoutPrefixProvider* dpp);
539+
asio::awaitable<void> async_shutdown();
539540
void blocking_shutdown();
540541

541542
asio::awaitable<void> admin_sem_list(std::optional<int> req_shard,
@@ -549,7 +550,7 @@ class RGWDataChangesLog
549550

550551
class RGWDataChangesBE : public boost::intrusive_ref_counter<RGWDataChangesBE> {
551552
protected:
552-
neorados::RADOS& r;
553+
neorados::RADOS r;
553554
neorados::IOContext loc;
554555
RGWDataChangesLog& datalog;
555556

@@ -564,7 +565,7 @@ class RGWDataChangesBE : public boost::intrusive_ref_counter<RGWDataChangesBE> {
564565

565566
const uint64_t gen_id;
566567

567-
RGWDataChangesBE(neorados::RADOS& r,
568+
RGWDataChangesBE(neorados::RADOS r,
568569
neorados::IOContext loc,
569570
RGWDataChangesLog& datalog,
570571
uint64_t gen_id)

src/rgw/driver/rados/rgw_log_backing.cc

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ inline std::ostream& operator <<(std::ostream& m, const shard_check& t) {
5252
namespace {
5353
/// Return the shard type, and a bool to see whether it has entries.
5454
asio::awaitable<shard_check>
55-
probe_shard(const DoutPrefixProvider* dpp, neorados::RADOS& rados,
55+
probe_shard(const DoutPrefixProvider* dpp, neorados::RADOS rados,
5656
const neorados::Object& obj, const neorados::IOContext& loc,
5757
bool& fifo_unsupported)
5858
{
@@ -98,7 +98,7 @@ probe_shard(const DoutPrefixProvider* dpp, neorados::RADOS& rados,
9898
}
9999

100100
asio::awaitable<log_type> handle_dne(const DoutPrefixProvider *dpp,
101-
neorados::RADOS& rados,
101+
neorados::RADOS rados,
102102
const neorados::Object& obj,
103103
const neorados::IOContext& loc,
104104
log_type def,
@@ -127,7 +127,7 @@ asio::awaitable<log_type> handle_dne(const DoutPrefixProvider *dpp,
127127

128128
asio::awaitable<log_type>
129129
log_backing_type(const DoutPrefixProvider* dpp,
130-
neorados::RADOS& rados,
130+
neorados::RADOS rados,
131131
const neorados::IOContext& loc,
132132
log_type def,
133133
int shards,
@@ -167,7 +167,7 @@ log_backing_type(const DoutPrefixProvider* dpp,
167167

168168
asio::awaitable<void> log_remove(
169169
const DoutPrefixProvider *dpp,
170-
neorados::RADOS& rados,
170+
neorados::RADOS rados,
171171
const neorados::IOContext& loc,
172172
int shards,
173173
const fu2::unique_function<std::string(int) const>& get_oid,
@@ -224,16 +224,26 @@ asio::awaitable<void> log_remove(
224224
co_return;
225225
}
226226

227-
logback_generations::~logback_generations() {
227+
void logback_generations::shutdown() {
228228
if (watchcookie > 0) {
229229
auto cct = rados.cct();
230230
sys::error_code ec;
231-
rados.unwatch(watchcookie, loc, asio::detached);
231+
rados.unwatch(watchcookie, loc, async::use_blocked[ec]);
232232
if (ec) {
233233
lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
234234
<< ": failed unwatching oid=" << oid
235235
<< ", " << ec.message() << dendl;
236236
}
237+
watchcookie = 0;
238+
}
239+
}
240+
241+
242+
logback_generations::~logback_generations() {
243+
auto cct = rados.cct();
244+
if (watchcookie > 0) {
245+
lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
246+
<< ": logback_generations destroyed without shutdown." << dendl;
237247
}
238248
}
239249

0 commit comments

Comments
 (0)