1717
1818#include < boost/system/system_error.hpp>
1919
20- #include " common/async/parallel_for_each.h"
2120#include " include/fs_types.h"
2221#include " include/neorados/RADOS.hpp"
2322
@@ -356,17 +355,18 @@ class RGWDataChangesFIFO final : public RGWDataChangesBE {
356355 }
357356};
358357
359- RGWDataChangesLog::RGWDataChangesLog (CephContext* cct)
360- : cct(cct),
358+ RGWDataChangesLog::RGWDataChangesLog (rgw::sal::RadosStore* driver)
359+ : cct(driver->ctx ()), rados(driver->get_neorados ()),
360+ executor(driver->get_io_context ().get_executor()),
361361 num_shards(cct->_conf->rgw_data_log_num_shards),
362362 prefix(get_prefix()),
363363 changes(cct->_conf->rgw_data_log_changes_size) {}
364364
365365RGWDataChangesLog::RGWDataChangesLog (CephContext *cct, bool log_data,
366- neorados::RADOS * rados,
366+ neorados::RADOS rados,
367367 std::optional<int > num_shards,
368368 std::optional<uint64_t > sem_max_keys)
369- : cct(cct), rados(rados), log_data(log_data),
369+ : cct(cct), rados(rados), log_data(log_data), executor(rados.get_executor()),
370370 num_shards(num_shards ? *num_shards :
371371 cct->_conf->rgw_data_log_num_shards),
372372 prefix(get_prefix()), changes(cct->_conf->rgw_data_log_changes_size),
@@ -438,15 +438,13 @@ void DataLogBackends::handle_empty_to(uint64_t new_tail) {
438438int RGWDataChangesLog::start (const DoutPrefixProvider *dpp,
439439 const RGWZone* zone,
440440 const RGWZoneParams& zoneparams,
441- rgw::sal::RadosStore* store,
442441 bool background_tasks)
443442{
444443 log_data = zone->log_data ;
445- rados = &store->get_neorados ();
446444 try {
447445 // Blocking in startup code, not ideal, but won't hurt anything.
448446 std::exception_ptr eptr
449- = asio::co_spawn (store-> get_io_context () ,
447+ = asio::co_spawn (executor ,
450448 start (dpp, zoneparams.log_pool ,
451449 background_tasks, background_tasks,
452450 background_tasks),
@@ -476,7 +474,6 @@ RGWDataChangesLog::start(const DoutPrefixProvider *dpp,
476474 bool renew)
477475{
478476 down_flag = false ;
479- cancel_strand = asio::make_strand (rados->get_executor ());
480477 ran_background = (recovery || watch || renew);
481478
482479 auto defbacking = to_log_type (
@@ -512,10 +509,10 @@ RGWDataChangesLog::start(const DoutPrefixProvider *dpp,
512509
513510 if (renew) {
514511 asio::co_spawn (
515- co_await asio::this_coro::executor ,
516- renew_run (renew_signal ),
517- asio::bind_cancellation_slot (renew_signal-> slot (),
518- asio::bind_executor (*cancel_strand ,
512+ renew_strand ,
513+ renew_run (shared_from_this () ),
514+ asio::bind_cancellation_slot (renew_signal. slot (),
515+ asio::bind_executor (renew_strand ,
519516 asio::detached)));
520517 }
521518 if (watch) {
@@ -527,20 +524,20 @@ RGWDataChangesLog::start(const DoutPrefixProvider *dpp,
527524 " Unable to establish recovery watch!" };
528525 }
529526 asio::co_spawn (
530- co_await asio::this_coro::executor ,
531- watch_loop (watch_signal ),
532- asio::bind_cancellation_slot (watch_signal-> slot (),
533- asio::bind_executor (*cancel_strand ,
527+ watch_strand ,
528+ watch_loop (shared_from_this () ),
529+ asio::bind_cancellation_slot (watch_signal. slot (),
530+ asio::bind_executor (watch_strand ,
534531 asio::detached)));
535532 }
536533 if (recovery) {
537534 // Recovery can run concurrent with normal operation, so we don't
538535 // have to block startup while we do all that I/O.
539536 asio::co_spawn (
540- co_await asio::this_coro::executor ,
541- recover (dpp, recovery_signal ),
542- asio::bind_cancellation_slot (recovery_signal-> slot (),
543- asio::bind_executor (*cancel_strand ,
537+ recovery_strand ,
538+ recover (dpp, shared_from_this () ),
539+ asio::bind_cancellation_slot (recovery_signal. slot (),
540+ asio::bind_executor (recovery_strand ,
544541 asio::detached)));
545542 }
546543 co_return ;
@@ -667,7 +664,9 @@ RGWDataChangesLog::process_notification(const DoutPrefixProvider* dpp,
667664 }
668665}
669666
670- asio::awaitable<void > RGWDataChangesLog::watch_loop (decltype (watch_signal)) {
667+ asio::awaitable<void >
668+ RGWDataChangesLog::watch_loop (std::shared_ptr<RGWDataChangesLog>)
669+ {
671670 const DoutPrefix dp (cct, dout_subsys, " rgw data changes log: " );
672671 const auto oid = get_sem_set_oid (0 );
673672 bool need_rewatch = false ;
@@ -1363,21 +1362,18 @@ asio::awaitable<void> RGWDataChangesLog::shutdown() {
13631362 }
13641363 renew_stop ();
13651364 // Revisit this later
1366- if (renew_signal)
1367- asio::dispatch (*cancel_strand,
1368- [this ]() {
1369- renew_signal->emit (asio::cancellation_type::terminal);
1370- });
1371- if (recovery_signal)
1372- asio::dispatch (*cancel_strand,
1373- [this ]() {
1374- recovery_signal->emit (asio::cancellation_type::terminal);
1375- });
1376- if (watch_signal)
1377- asio::dispatch (*cancel_strand,
1378- [this ]() {
1379- watch_signal->emit (asio::cancellation_type::terminal);
1380- });
1365+ asio::dispatch (renew_strand,
1366+ [this ]() {
1367+ renew_signal.emit (asio::cancellation_type::terminal);
1368+ });
1369+ asio::dispatch (recovery_strand,
1370+ [this ]() {
1371+ recovery_signal.emit (asio::cancellation_type::terminal);
1372+ });
1373+ asio::dispatch (watch_strand,
1374+ [this ]() {
1375+ watch_signal.emit (asio::cancellation_type::terminal);
1376+ });
13811377 if (watchcookie && rados->check_watch (watchcookie)) {
13821378 auto wc = watchcookie;
13831379 watchcookie = 0 ;
@@ -1390,15 +1386,6 @@ asio::awaitable<void> RGWDataChangesLog::shutdown_or_timeout() {
13901386 using namespace asio ::experimental::awaitable_operators;
13911387 asio::steady_timer t (co_await asio::this_coro::executor, 3s);
13921388 co_await (shutdown () || t.async_wait (asio::use_awaitable));
1393- if (renew_signal) {
1394- renew_signal->emit (asio::cancellation_type::terminal);
1395- }
1396- if (recovery_signal) {
1397- recovery_signal->emit (asio::cancellation_type::terminal);
1398- }
1399- if (watch_signal) {
1400- watch_signal->emit (asio::cancellation_type::terminal);
1401- }
14021389}
14031390
14041391RGWDataChangesLog::~RGWDataChangesLog () {
@@ -1429,7 +1416,8 @@ void RGWDataChangesLog::blocking_shutdown() {
14291416 }
14301417}
14311418
1432- asio::awaitable<void > RGWDataChangesLog::renew_run (decltype (renew_signal)) {
1419+ asio::awaitable<void > RGWDataChangesLog::renew_run (
1420+ std::shared_ptr<RGWDataChangesLog>) {
14331421 static constexpr auto runs_per_prune = 150 ;
14341422 auto run = 0 ;
14351423 renew_timer.emplace (co_await asio::this_coro::executor);
@@ -1733,14 +1721,14 @@ RGWDataChangesLog::recover_shard(const DoutPrefixProvider* dpp, int index)
17331721 co_return ;
17341722}
17351723
1736- asio::awaitable<void > RGWDataChangesLog::recover (const DoutPrefixProvider* dpp,
1737- decltype (recovery_signal))
1724+ asio::awaitable<void > RGWDataChangesLog::recover (
1725+ const DoutPrefixProvider* dpp,
1726+ std::shared_ptr<RGWDataChangesLog>)
17381727{
1739- auto strand = asio::make_strand (co_await asio::this_coro::executor);
17401728 co_await asio::co_spawn (
1741- strand ,
1742- [this ](const DoutPrefixProvider* dpp)-> asio::awaitable<void , decltype (strand) > {
1743- auto ex = co_await boost::asio::this_coro::executor ;
1729+ recovery_strand ,
1730+ [this ](const DoutPrefixProvider* dpp)-> asio::awaitable<void , strand_t > {
1731+ auto ex = recovery_strand ;
17441732 auto group = async::spawn_group{ex, static_cast <size_t >(num_shards)};
17451733 for (auto i = 0 ; i < num_shards; ++i) {
17461734 boost::asio::co_spawn (ex, recover_shard (dpp, i), group);
0 commit comments