@@ -52,6 +52,7 @@ namespace sys = boost::system;
5252
5353namespace nlog = ::neorados::cls::log;
5454namespace fifo = ::neorados::cls::fifo;
55+ namespace ss = neorados::cls::sem_set;
5556
5657namespace async = ceph::async;
5758namespace buffer = ceph::buffer;
@@ -369,8 +370,7 @@ RGWDataChangesLog::RGWDataChangesLog(CephContext *cct, bool log_data,
369370 num_shards(num_shards ? *num_shards :
370371 cct->_conf->rgw_data_log_num_shards),
371372 prefix(get_prefix()), changes(cct->_conf->rgw_data_log_changes_size),
372- sem_max_keys(sem_max_keys ? *sem_max_keys :
373- neorados::cls::sem_set::max_keys) {}
373+ sem_max_keys(sem_max_keys ? *sem_max_keys : ss::max_keys) {}
374374
375375
376376void DataLogBackends::handle_init (entries_t e) {
@@ -786,7 +786,6 @@ RGWDataChangesLog::renew_entries(const DoutPrefixProvider* dpp)
786786 co_return ;
787787 }
788788
789- namespace sem_set = neorados::cls::sem_set;
790789 // If we didn't error in pushing, we can now decrement the semaphores
791790 l.lock ();
792791 for (auto index = 0u ; index < unsigned (num_shards); ++index) {
@@ -800,7 +799,7 @@ RGWDataChangesLog::renew_entries(const DoutPrefixProvider* dpp)
800799 auto to_copy = std::min (sem_max_keys, keys.size ());
801800 std::copy_n (keys.begin (), to_copy,
802801 std::inserter (batch, batch.end ()));
803- auto op = WriteOp{}.exec (sem_set ::decrement (std::move (batch)));
802+ auto op = WriteOp{}.exec (ss ::decrement (std::move (batch)));
804803 l.unlock ();
805804 co_await rados->execute (get_sem_set_oid (index), loc, std::move (op),
806805 asio::use_awaitable);
@@ -956,9 +955,8 @@ void RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp,
956955 auto need_sem_set = register_renew (std::move (bg));
957956 if (need_sem_set) {
958957 using neorados::WriteOp;
959- using neorados::cls::sem_set::increment;
960958 rados->execute (get_sem_set_oid (index), loc,
961- WriteOp{}.exec (increment (std::move (key))), y);
959+ WriteOp{}.exec (ss:: increment (std::move (key))), y);
962960 }
963961 return ;
964962 }
@@ -1573,8 +1571,8 @@ RGWDataChangesLog::read_sems(int index, std::string cursor) {
15731571 try {
15741572 co_await rados->execute (
15751573 get_sem_set_oid (index), loc,
1576- neorados::ReadOp{}.exec (sem_set ::list (sem_max_keys, std::move (cursor),
1577- &out, &cursor)),
1574+ neorados::ReadOp{}.exec (ss ::list (sem_max_keys, std::move (cursor),
1575+ &out, &cursor)),
15781576 nullptr , asio::use_awaitable);
15791577 } catch (const sys::system_error& e) {
15801578 if (e.code () != sys::errc::no_such_file_or_directory) {
@@ -1606,7 +1604,7 @@ RGWDataChangesLog::synthesize_entries(
16061604 change.gen = bg.gen ;
16071605 encode (change, bl);
16081606 be->prepare (timestamp, change.key , std::move (bl), batch);
1609- } catch (const sys::error_code & e) {
1607+ } catch (const sys::system_error & e) {
16101608 push_failed = true ;
16111609 ldpp_dout (dpp, -1 ) << " RGWDataChangesLog::synthesize_entries(): Unable to "
16121610 << " parse Bucketgen key: " << key << " Got exception: "
@@ -1695,7 +1693,7 @@ RGWDataChangesLog::decrement_sems(
16951693 auto grace = ((ceph::mono_clock::now () - fetch_time) * 4 ) / 3 ;
16961694 co_await rados->execute (
16971695 get_sem_set_oid (index), loc, neorados::WriteOp{}.exec (
1698- sem_set ::decrement (std::move (batch), grace)),
1696+ ss ::decrement (std::move (batch), grace)),
16991697 asio::use_awaitable);
17001698 }
17011699}
@@ -1761,6 +1759,108 @@ asio::awaitable<void> RGWDataChangesLog::recover(const DoutPrefixProvider* dpp,
17611759 l.unlock ();
17621760}
17631761
1762+ asio::awaitable<void >
1763+ RGWDataChangesLog::admin_sem_list (std::optional<int > req_shard,
1764+ std::uint64_t max_entries,
1765+ std::string marker,
1766+ std::ostream& m,
1767+ ceph::Formatter& formatter)
1768+ {
1769+ int shard = req_shard.value_or (0 );
1770+ std::string keptmark;
1771+
1772+ if (!marker.empty ()) {
1773+ // Signal caught by radosgw-admin
1774+ BucketGen bg{marker};
1775+ auto index = choose_oid (bg.shard );
1776+ if (req_shard && *req_shard != index) {
1777+ throw sys::system_error{
1778+ EINVAL, sys::generic_category (),
1779+ fmt::format (" Requested shard {} but marker is for shard {}" ,
1780+ shard, index)};
1781+ }
1782+ }
1783+ bc::flat_map<std::string, std::uint64_t > entries;
1784+ std::uint64_t count = 0 ;
1785+ bool begin_next = false ;
1786+ // So the marker traverses between shards if the last entry in the
1787+ // shard is the last needed for max_entries
1788+ std::string mkeep;
1789+ entries.reserve (sem_max_keys);
1790+ formatter.open_object_section (" semaphores" );
1791+ formatter.open_array_section (" entries" );
1792+ while ((max_entries == 0 || (count < max_entries)) && shard < num_shards) {
1793+ entries.clear ();
1794+ try {
1795+ if (begin_next) {
1796+ marker.clear ();
1797+ begin_next = false ;
1798+ }
1799+ co_await rados->execute (get_sem_set_oid (shard), loc,
1800+ neorados::ReadOp{}.
1801+ exec (ss::list (std::min (max_entries - count,
1802+ sem_max_keys),
1803+ marker,
1804+ &entries, &marker)),
1805+ nullptr , asio::use_awaitable);
1806+ if (!marker.empty ()) {
1807+ mkeep = marker;
1808+ }
1809+ } catch (const sys::system_error& e) {
1810+ if (e.code () == sys::errc::no_such_file_or_directory) {
1811+ if (!req_shard) {
1812+ begin_next = true ;
1813+ ++shard;
1814+ continue ;
1815+ } else {
1816+ break ;
1817+ }
1818+ } else {
1819+ throw ;
1820+ }
1821+ }
1822+ for (auto i = entries.cbegin (); i != entries.cend (); ++i) {
1823+ const auto & [k, v] = *i;
1824+ formatter.open_object_section (" semaphore" );
1825+ formatter.dump_string (" key" , k);
1826+ formatter.dump_unsigned (" count" , v);
1827+ formatter.close_section ();
1828+ ++count;
1829+ }
1830+ formatter.flush (m);
1831+ if (marker.empty ()) {
1832+ if (!entries.empty ()) {
1833+ mkeep = (entries.cend () - 1 )->first ;
1834+ }
1835+ if (!req_shard) {
1836+ ++shard;
1837+ } else {
1838+ break ;
1839+ }
1840+ }
1841+ }
1842+ if (shard < num_shards && !req_shard && count == max_entries) {
1843+ marker = std::move (mkeep);
1844+ }
1845+ formatter.close_section ();
1846+ formatter.dump_string (" marker" , marker);
1847+ formatter.close_section ();
1848+ formatter.flush (m);
1849+ co_return ;
1850+ }
1851+
1852+ asio::awaitable<void >
1853+ RGWDataChangesLog::admin_sem_reset (std::string_view marker,
1854+ std::uint64_t count)
1855+ {
1856+ // Exceptions here are caught by radosgw-admin
1857+ BucketGen bg{marker};
1858+ unsigned index = choose_oid (bg.shard );
1859+ auto wop = neorados::WriteOp{}.exec (ss::reset (std::string (marker), count));
1860+ co_await rados->execute (get_sem_set_oid (index), loc,
1861+ std::move (wop), asio::use_awaitable);
1862+ }
1863+
17641864void RGWDataChangesLogInfo::dump (Formatter *f) const
17651865{
17661866 encode_json (" marker" , marker, f);
0 commit comments