@@ -47,7 +47,6 @@ static constexpr auto dout_subsys = ceph_subsys_rgw;
4747using namespace std ::literals;
4848
4949namespace ranges = std::ranges;
50- namespace views = ranges::views;
5150
5251namespace sys = boost::system;
5352
@@ -572,37 +571,37 @@ RGWDataChangesLog::establish_watch(const DoutPrefixProvider* dpp,
572571}
573572
574573struct recovery_check {
575- uint64_t shard = 0 ;
574+ int64_t shard = 0 ;
575+ std::vector<std::string> keys;
576576
577577 recovery_check () = default ;
578578
579- recovery_check (uint64_t shard) : shard(shard) {}
579+ recovery_check (uint64_t shard, std::vector<std::string> keys)
580+ : shard(shard), keys(std::move(keys)) {}
580581
581582 void encode (buffer::list& bl) const {
582583 ENCODE_START (1 , 1 , bl);
583584 encode (shard, bl);
585+ encode (keys, bl);
584586 ENCODE_FINISH (bl);
585587 }
586588
587589 void decode (buffer::list::const_iterator& bl) {
588590 DECODE_START (1 , bl);
589591 decode (shard, bl);
592+ decode (keys, bl);
590593 DECODE_FINISH (bl);
591594 }
592-
593- operator uint64_t () {
594- return shard;
595- }
596595};
597596WRITE_CLASS_ENCODER (recovery_check);
598597
599598
600599struct recovery_reply {
601- std::unordered_set<std::string > reply_set;
600+ std::vector< unsigned > reply_set;
602601
603602 recovery_reply () = default ;
604603
605- recovery_reply (std::unordered_set<std::string > reply_set)
604+ recovery_reply (std::vector< unsigned > reply_set)
606605 : reply_set(std::move(reply_set)) {}
607606
608607 void encode (buffer::list& bl) const {
@@ -616,44 +615,44 @@ struct recovery_reply {
616615 decode (reply_set, bl);
617616 DECODE_FINISH (bl);
618617 }
619-
620- operator std::unordered_set<std::string>&() {
621- return reply_set;
622- }
623618};
624619WRITE_CLASS_ENCODER (recovery_reply);
625620
626-
627621asio::awaitable<void >
628622RGWDataChangesLog::process_notification (const DoutPrefixProvider* dpp,
629623 std::string_view oid) {
630624 auto notification = co_await rados->next_notification (watchcookie,
631625 asio::use_awaitable);
632- int shard = 0 ;
626+ recovery_check rc ;
633627 // Don't send a reply if we get a bogus notification, we don't
634628 // want recovery to delete semaphores improperly.
635629 try {
636- recovery_check rc;
637630 decode (rc, notification.bl );
638- shard = rc;
639631 } catch (const std::exception& e) {
640- ldpp_dout (dpp, 2 ) << " Got malformed notification! " << dendl;
632+ ldpp_dout (dpp, 2 ) << " Got malformed notification: " << e. what () << dendl;
641633 co_return ;
642634 }
643- if (shard >= num_shards) {
644- ldpp_dout (dpp, 2 ) << " Got unknown shard " << shard << dendl;
635+ if (rc. shard >= num_shards) {
636+ ldpp_dout (dpp, 2 ) << " Got unknown shard " << rc. shard << dendl;
645637 co_return ;
646638 }
647639 recovery_reply reply;
640+ reply.reply_set .resize (rc.keys .size (), 0 );
648641 std::unique_lock l (lock);
649- for (const auto & bg : cur_cycle) {
650- if (choose_oid (bg.shard ) == shard) {
651- reply.reply_set .insert (bg.get_key ());
642+ for (auto i = 0u ; i < rc.keys .size (); ++i) {
643+ const auto & key = rc.keys [i];
644+ try {
645+ if (cur_cycle.contains (BucketGen{key})) {
646+ ++reply.reply_set [i];
647+ }
648+ } catch (const std::exception&) {
649+ ldpp_dout (dpp, 2 ) << " Got invalid BucketGen key: " << key << dendl;
650+ co_return ;
651+ }
652+ if (semaphores[rc.shard ].contains (key)) {
653+ ++reply.reply_set [i];
652654 }
653655 }
654- std::copy (semaphores[shard].begin (),
655- semaphores[shard].end (),
656- std::inserter (reply.reply_set , reply.reply_set .end ()));
657656 l.unlock ();
658657 buffer::list replybl;
659658 encode (reply, replybl);
@@ -1561,28 +1560,23 @@ int RGWDataChangesLog::trim_generations(const DoutPrefixProvider *dpp,
15611560 return ceph::from_exception (eptr);
15621561}
15631562
1564- asio::awaitable<void >
1565- RGWDataChangesLog::read_all_sems (int index,
1566- bc::flat_map<std::string, uint64_t >* out)
1567- {
1563+ asio::awaitable<std::pair<bc::flat_map<std::string, uint64_t >,
1564+ std::string>>
1565+ RGWDataChangesLog::read_sems (int index, std::string cursor) {
15681566 namespace sem_set = neorados::cls::sem_set;
1569- std::string cursor;
1570- do {
1571- try {
1572- co_await rados->execute (
1573- get_sem_set_oid (index), loc,
1574- neorados::ReadOp{}.exec (sem_set::list (sem_max_keys, cursor, out,
1575- &cursor)),
1576- nullptr , asio::use_awaitable);
1577- } catch (const sys::system_error& e) {
1578- if (e.code () == sys::errc::no_such_file_or_directory) {
1579- break ;
1580- } else {
1581- throw ;
1582- }
1567+ bc::flat_map<std::string, uint64_t > out;
1568+ try {
1569+ co_await rados->execute (
1570+ get_sem_set_oid (index), loc,
1571+ neorados::ReadOp{}.exec (sem_set::list (sem_max_keys, std::move (cursor),
1572+ &out, &cursor)),
1573+ nullptr , asio::use_awaitable);
1574+ } catch (const sys::system_error& e) {
1575+ if (e.code () != sys::errc::no_such_file_or_directory) {
1576+ throw ;
15831577 }
1584- } while (!cursor. empty ());
1585- co_return ;
1578+ }
1579+ co_return std::make_pair ( std::move (out), std::move (cursor)) ;
15861580}
15871581
15881582asio::awaitable<bool >
@@ -1627,11 +1621,16 @@ RGWDataChangesLog::synthesize_entries(
16271621asio::awaitable<bool >
16281622RGWDataChangesLog::gather_working_sets (
16291623 const DoutPrefixProvider* dpp,
1630- int index ,
1624+ int shard ,
16311625 bc::flat_map<std::string, uint64_t >& semcount)
16321626{
16331627 buffer::list bl;
1634- recovery_check rc = index;
1628+ recovery_check rc;
1629+ rc.shard = shard;
1630+ rc.keys .reserve (semcount.size ());
1631+ for (const auto & [key, count] : semcount) {
1632+ rc.keys .emplace_back (key);
1633+ }
16351634 encode (rc, bl);
16361635 auto [reply_map, missed_set] = co_await rados->notify (
16371636 get_sem_set_oid (0 ), loc, bl, 60s, asio::use_awaitable);
@@ -1642,24 +1641,32 @@ RGWDataChangesLog::gather_working_sets(
16421641 co_return false ;
16431642 }
16441643 for (const auto & [source, reply] : reply_map) {
1645- recovery_reply keys ;
1644+ recovery_reply counts ;
16461645 try {
1647- decode (keys , reply);
1646+ decode (counts , reply);
16481647 } catch (const std::exception& e) {
16491648 ldpp_dout (dpp, -1 )
16501649 << " RGWDataChangesLog::gather_working_sets(): Failed decoding reply from: "
16511650 << source << dendl;
16521651 co_return false ;
16531652 }
1654- for (const auto & key : keys.reply_set ) {
1653+ if (rc.keys .size () != counts.reply_set .size ()) {
1654+ ldpp_dout (dpp, -1 )
1655+ << " RGWDataChangesLog::gather_working_sets(): reply set does not match: "
1656+ << source << dendl;
1657+ co_return false ;
1658+ }
1659+ for (auto i = 0u ; i < rc.keys .size (); ++i) {
1660+ const auto & key = rc.keys [i];
1661+ const auto & count = counts.reply_set [i];
16551662 auto iter = semcount.find (key);
16561663 if (iter == semcount.end ()) {
16571664 continue ;
16581665 }
1659- if (iter->second == 1 ) {
1666+ if (iter->second <= count ) {
16601667 semcount.erase (iter);
16611668 } else {
1662- -- (iter->second );
1669+ (iter->second ) -= count ;
16631670 }
16641671 }
16651672 }
@@ -1689,35 +1696,39 @@ RGWDataChangesLog::decrement_sems(
16891696asio::awaitable<void >
16901697RGWDataChangesLog::recover_shard (const DoutPrefixProvider* dpp, int index)
16911698{
1692- bc::flat_map<std::string, uint64_t > semcount;
1699+ std::string cursor;
1700+ do {
1701+ bc::flat_map<std::string, uint64_t > semcount;
16931702
1694- // Gather entries in the shard
1695- co_await read_all_sems (index, &semcount);
1696- // If we have none, no point doing the rest
1697- if (semcount.empty ()) {
1698- co_return ;
1699- }
1700- // Synthesize entries to push
1701- auto pushed = co_await synthesize_entries (dpp, index, semcount);
1702- if (!pushed) {
1703- // If pushing failed, don't decrement any semaphores
1704- ldpp_dout (dpp, 5 ) << " RGWDataChangesLog::recover(): Pushing shard "
1705- << index << " failed, skipping decrement" << dendl;
1706- co_return ;
1707- }
1703+ // Gather entries in the shard
1704+ std::tie (semcount, cursor) = co_await read_sems (index, std::move (cursor));
1705+ // If we have none, no point doing the rest
1706+ if (semcount.empty ()) {
1707+ break ;
1708+ }
17081709
1709- // Check with other running RGWs, make sure not to decrement
1710- // anything they have in flight. This doesn't cause an issue for
1711- // partial upgrades, since older versions won't be using the
1712- // semaphores at all.
1713- auto notified = co_await gather_working_sets (dpp, index, semcount);
1714- if (!notified) {
1715- ldpp_dout (dpp, 5 ) << " RGWDataChangesLog::recover(): Gathering "
1716- << " working sets for shard " << index
1717- << " failed, skipping decrement" << dendl;
1718- co_return ;
1719- }
1720- co_await decrement_sems (index, std::move (semcount));
1710+ // Synthesize entries to push
1711+ auto pushed = co_await synthesize_entries (dpp, index, semcount);
1712+ if (!pushed) {
1713+ // If pushing failed, don't decrement any semaphores
1714+ ldpp_dout (dpp, 5 ) << " RGWDataChangesLog::recover_shard(): Pushing shard "
1715+ << index << " failed, skipping decrement" << dendl;
1716+ continue ;
1717+ }
1718+
1719+ // Check with other running RGWs, make sure not to decrement
1720+ // anything they have in flight. This doesn't cause an issue for
1721+ // partial upgrades, since older versions won't be using the
1722+ // semaphores at all.
1723+ auto notified = co_await gather_working_sets (dpp, index, semcount);
1724+ if (!notified) {
1725+ ldpp_dout (dpp, 5 ) << " RGWDataChangesLog::recover_shard(): Gathering "
1726+ << " working sets for shard " << index
1727+ << " failed, skipping decrement" << dendl;
1728+ continue ;
1729+ }
1730+ co_await decrement_sems (index, std::move (semcount));
1731+ } while (!cursor.empty ());
17211732 co_return ;
17221733}
17231734
0 commit comments