Skip to content

Commit 4732944

Browse files
committed
rgw: Use run_coro to call coroutines at use
This avoids having two entry points with different error checking preparation, etc. to get out of sync or have a fix get forgotten. Signed-off-by: Adam C. Emerson <[email protected]>
1 parent 9f17b63 commit 4732944

File tree

6 files changed

+135
-261
lines changed

6 files changed

+135
-261
lines changed

src/rgw/driver/rados/rgw_datalog.cc

Lines changed: 37 additions & 188 deletions
Original file line numberDiff line numberDiff line change
@@ -1083,7 +1083,7 @@ DataLogBackends::list(const DoutPrefixProvider *dpp, int shard,
10831083
}
10841084

10851085
asio::awaitable<std::tuple<std::vector<rgw_data_change_log_entry>,
1086-
std::string>>
1086+
std::string, bool>>
10871087
RGWDataChangesLog::list_entries(const DoutPrefixProvider* dpp, int shard,
10881088
int max_entries, std::string marker)
10891089
{
@@ -1095,71 +1095,24 @@ RGWDataChangesLog::list_entries(const DoutPrefixProvider* dpp, int shard,
10951095
}
10961096
if (max_entries <= 0) {
10971097
co_return std::make_tuple(std::vector<rgw_data_change_log_entry>{},
1098-
std::string{});
1098+
std::string{}, false);
10991099
}
11001100
std::vector<rgw_data_change_log_entry> entries(max_entries);
11011101
entries.resize(max_entries);
11021102
auto [spanentries, outmark] = co_await bes->list(dpp, shard, entries, marker);
11031103
entries.resize(spanentries.size());
1104-
co_return std::make_tuple(std::move(entries), std::move(outmark));
1105-
}
1106-
1107-
int RGWDataChangesLog::list_entries(
1108-
const DoutPrefixProvider *dpp, int shard,
1109-
int max_entries, std::vector<rgw_data_change_log_entry>& entries,
1110-
std::string_view marker, std::string* out_marker, bool* truncated,
1111-
std::string* errstr, optional_yield y)
1112-
{
1113-
std::tuple<std::span<rgw_data_change_log_entry>,
1114-
std::string> out;
1115-
if (shard >= num_shards) [[unlikely]] {
1116-
if (errstr) {
1117-
*errstr = fmt::format("{} is not a valid shard. Valid shards are integers in [0, {})",
1118-
shard, num_shards);
1119-
}
1120-
return -EINVAL;
1121-
}
1122-
if (std::ssize(entries) < max_entries) {
1123-
entries.resize(max_entries);
1124-
}
1125-
try {
1126-
if (y) {
1127-
auto& yield = y.get_yield_context();
1128-
out = asio::co_spawn(yield.get_executor(),
1129-
bes->list(dpp, shard, entries,
1130-
std::string{marker}),
1131-
yield);
1132-
} else {
1133-
maybe_warn_about_blocking(dpp);
1134-
out = asio::co_spawn(rados->get_executor(),
1135-
bes->list(dpp, shard, entries,
1136-
std::string{marker}),
1137-
async::use_blocked);
1138-
}
1139-
} catch (const std::exception&) {
1140-
return ceph::from_exception(std::current_exception());
1141-
}
1142-
auto& [outries, outmark] = out;
1143-
if (auto size = std::ssize(outries); size < std::ssize(entries)) {
1144-
entries.resize(size);
1145-
}
1146-
if (truncated) {
1147-
*truncated = !outmark.empty();
1148-
}
1149-
if (out_marker) {
1150-
*out_marker = std::move(outmark);
1151-
}
1152-
return 0;
1104+
bool truncated = !outmark.empty();
1105+
co_return std::make_tuple(std::move(entries), std::move(outmark), truncated);
11531106
}
11541107

11551108
asio::awaitable<std::tuple<std::vector<rgw_data_change_log_entry>,
1156-
RGWDataChangesLogMarker>>
1109+
RGWDataChangesLogMarker, bool>>
11571110
RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp,
11581111
int max_entries, RGWDataChangesLogMarker marker)
11591112
{
11601113
if (max_entries <= 0) {
11611114
co_return std::make_tuple(std::vector<rgw_data_change_log_entry>{},
1162-
RGWDataChangesLogMarker{});
1115+
RGWDataChangesLogMarker{}, false);
11631116
}
11641117

11651118
std::vector<rgw_data_change_log_entry> entries(max_entries);
@@ -1183,78 +1136,25 @@ RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp,
11831136
if (!remaining.empty()) {
11841137
entries.resize(entries.size() - remaining.size());
11851138
}
1186-
co_return std::make_tuple(std::move(entries), std::move(marker));
1139+
bool truncated = marker;
1140+
co_return std::make_tuple(std::move(entries), std::move(marker), truncated);
11871141
}
11881142

1189-
int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp,int max_entries,
1190-
std::vector<rgw_data_change_log_entry>& entries,
1191-
RGWDataChangesLogMarker& marker, bool *ptruncated,
1192-
optional_yield y)
1193-
{
1194-
std::tuple<std::vector<rgw_data_change_log_entry>,
1195-
RGWDataChangesLogMarker> out;
1196-
if (std::ssize(entries) < max_entries) {
1197-
entries.resize(max_entries);
1198-
}
1199-
try {
1200-
if (y) {
1201-
auto& yield = y.get_yield_context();
1202-
out = asio::co_spawn(yield.get_executor(),
1203-
list_entries(dpp, max_entries,
1204-
RGWDataChangesLogMarker{marker}),
1205-
yield);
1206-
} else {
1207-
maybe_warn_about_blocking(dpp);
1208-
out = asio::co_spawn(rados->get_executor(),
1209-
list_entries(dpp, max_entries,
1210-
RGWDataChangesLogMarker{marker}),
1211-
async::use_blocked);
1212-
}
1213-
} catch (const std::exception&) {
1214-
return ceph::from_exception(std::current_exception());
1215-
}
1216-
auto& [outries, outmark] = out;
1217-
if (auto size = std::ssize(outries); size < std::ssize(entries)) {
1218-
entries.resize(size);
1219-
}
1220-
if (ptruncated) {
1221-
*ptruncated = (outmark.shard > 0 || !outmark.marker.empty());
1222-
}
1223-
marker = std::move(outmark);
1224-
return 0;
1225-
}
1226-
1227-
int RGWDataChangesLog::get_info(const DoutPrefixProvider* dpp, int shard_id,
1228-
RGWDataChangesLogInfo* info,
1229-
std::string* errstr, optional_yield y)
1143+
asio::awaitable<RGWDataChangesLogInfo>
1144+
RGWDataChangesLog::get_info(const DoutPrefixProvider* dpp, int shard_id)
12301145
{
12311146
if (shard_id >= num_shards) [[unlikely]] {
1232-
if (errstr) {
1233-
*errstr = fmt::format(
1147+
throw sys::system_error{-EINVAL, sys::generic_category(),
1148+
fmt::format(
12341149
"{} is not a valid shard. Valid shards are integers in [0, {})",
1235-
shard_id, num_shards);
1236-
}
1150+
shard_id, num_shards)};
12371151
}
12381152
auto be = bes->head();
1239-
try {
1240-
if (y) {
1241-
auto& yield = y.get_yield_context();
1242-
*info = asio::co_spawn(yield.get_executor(),
1243-
be->get_info(dpp, shard_id),
1244-
yield);
1245-
} else {
1246-
maybe_warn_about_blocking(dpp);
1247-
*info = asio::co_spawn(rados->get_executor(),
1248-
be->get_info(dpp, shard_id),
1249-
async::use_blocked);
1250-
}
1251-
} catch (const std::exception&) {
1252-
return ceph::from_exception(std::current_exception());
1253-
}
1254-
if (!info->marker.empty()) {
1255-
info->marker = gencursor(be->gen_id, info->marker);
1153+
auto info = co_await be->get_info(dpp, shard_id);
1154+
if (!info.marker.empty()) {
1155+
info.marker = gencursor(be->gen_id, info.marker);
12561156
}
1257-
return 0;
1157+
co_return info;
12581158
}
12591159

12601160
asio::awaitable<void> DataLogBackends::trim_entries(
@@ -1284,46 +1184,27 @@ asio::awaitable<void> DataLogBackends::trim_entries(
12841184
co_return;
12851185
}
12861186

1287-
int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id,
1288-
std::string_view marker, std::string* errstr,
1289-
optional_yield y)
1187+
asio::awaitable<void>
1188+
RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id,
1189+
std::string_view marker)
12901190
{
12911191
if (shard_id >= num_shards) [[unlikely]] {
1292-
if (errstr) {
1293-
*errstr = fmt::format(
1192+
throw sys::system_error{-EINVAL, sys::generic_category(),
1193+
fmt::format(
12941194
"{} is not a valid shard. Valid shards are integers in [0, {})",
1295-
shard_id, num_shards);
1296-
}
1297-
}
1298-
try {
1299-
if (y) {
1300-
auto& yield = y.get_yield_context();
1301-
asio::co_spawn(yield.get_executor(),
1302-
bes->trim_entries(dpp, shard_id, marker),
1303-
yield);
1304-
} else {
1305-
maybe_warn_about_blocking(dpp);
1306-
asio::co_spawn(rados->get_executor(),
1307-
bes->trim_entries(dpp, shard_id, marker),
1308-
async::use_blocked);
1309-
}
1310-
} catch (const std::exception& e) {
1311-
return ceph::from_exception(std::current_exception());
1195+
shard_id, num_shards)};
13121196
}
1313-
return 0;
1197+
auto be = bes->head();
1198+
co_return co_await bes->trim_entries(dpp, shard_id, marker);
13141199
}
13151200

1316-
int RGWDataChangesLog::trim_entries(const DoutPrefixProvider* dpp, int shard_id,
1317-
std::string_view marker,
1318-
librados::AioCompletion* c)
1201+
void RGWDataChangesLog::trim_entries(const DoutPrefixProvider* dpp, int shard_id,
1202+
std::string_view marker,
1203+
librados::AioCompletion* c)
13191204
{
1320-
if (shard_id >= num_shards) [[unlikely]] {
1321-
return -EINVAL;
1322-
}
13231205
asio::co_spawn(rados->get_executor(),
1324-
bes->trim_entries(dpp, shard_id, marker),
1206+
trim_entries(dpp, shard_id, marker),
13251207
c);
1326-
return 0;
13271208
}
13281209

13291210

@@ -1493,7 +1374,7 @@ asio::awaitable<void> RGWDataChangesLog::renew_run() {
14931374
std::optional<uint64_t> through;
14941375
ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: pruning old generations" << dendl;
14951376
operation = "trim_generations"sv;
1496-
co_await bes->trim_generations(&dp, through);
1377+
co_await trim_generations(&dp, through);
14971378
operation = {};
14981379
if (through) {
14991380
ldpp_dout(&dp, 2)
@@ -1560,54 +1441,22 @@ std::string RGWDataChangesLog::max_marker() const {
15601441
"~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
15611442
}
15621443

1563-
int RGWDataChangesLog::change_format(const DoutPrefixProvider *dpp,
1564-
log_type type,optional_yield y)
1444+
asio::awaitable<void>
1445+
RGWDataChangesLog::change_format(const DoutPrefixProvider *dpp, log_type type)
15651446
{
1566-
try {
1567-
if (y) {
1568-
auto& yield = y.get_yield_context();
1569-
asio::co_spawn(yield.get_executor(),
1570-
bes->new_backing(dpp, type),
1571-
yield);
1572-
} else {
1573-
maybe_warn_about_blocking(dpp);
1574-
asio::co_spawn(rados->get_executor(),
1575-
bes->new_backing(dpp, type),
1576-
async::use_blocked);
1577-
}
1578-
} catch (const std::exception&) {
1579-
return ceph::from_exception(std::current_exception());
1580-
}
1581-
return 0;;
1447+
co_return co_await bes->new_backing(dpp, type);
15821448
}
15831449

1584-
int RGWDataChangesLog::trim_generations(const DoutPrefixProvider *dpp,
1585-
std::optional<uint64_t>& through,
1586-
optional_yield y)
1450+
asio::awaitable<void>
1451+
RGWDataChangesLog::trim_generations(const DoutPrefixProvider *dpp,
1452+
std::optional<uint64_t>& through)
15871453
{
1588-
try {
1589-
if (y) {
1590-
auto& yield = y.get_yield_context();
1591-
asio::co_spawn(yield.get_executor(),
1592-
bes->trim_generations(dpp, through),
1593-
yield);
1594-
} else {
1595-
maybe_warn_about_blocking(dpp);
1596-
asio::co_spawn(rados->get_executor(),
1597-
bes->trim_generations(dpp, through),
1598-
async::use_blocked);
1599-
}
1600-
} catch (const std::exception& e) {
1601-
return ceph::from_exception(std::current_exception());
1602-
}
1603-
1604-
return 0;
1454+
co_return co_await bes->trim_generations(dpp, through);
16051455
}
16061456

16071457
asio::awaitable<std::pair<bc::flat_map<std::string, uint64_t>,
16081458
std::string>>
16091459
RGWDataChangesLog::read_sems(int index, std::string cursor) {
1610-
namespace sem_set = neorados::cls::sem_set;
16111460
bc::flat_map<std::string, uint64_t> out;
16121461
try {
16131462
co_await rados->execute(

src/rgw/driver/rados/rgw_datalog.h

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -470,30 +470,25 @@ class RGWDataChangesLog {
470470
int shard_id, optional_yield y);
471471
int get_log_shard_id(rgw_bucket& bucket, int shard_id);
472472
asio::awaitable<std::tuple<std::vector<rgw_data_change_log_entry>,
473-
std::string>>
473+
std::string, bool>>
474474
list_entries(const DoutPrefixProvider* dpp, int shard, int max_entries,
475475
std::string marker);
476-
int list_entries(const DoutPrefixProvider *dpp, int shard, int max_entries,
477-
std::vector<rgw_data_change_log_entry>& entries,
478-
std::string_view marker, std::string* out_marker,
479-
bool* truncated, std::string* errstr, optional_yield y);
480476
asio::awaitable<std::tuple<std::vector<rgw_data_change_log_entry>,
481-
RGWDataChangesLogMarker>>
477+
RGWDataChangesLogMarker, bool>>
482478
list_entries(const DoutPrefixProvider *dpp, int max_entries,
483479
RGWDataChangesLogMarker marker);
484-
int list_entries(const DoutPrefixProvider *dpp, int max_entries,
485-
std::vector<rgw_data_change_log_entry>& entries,
486-
RGWDataChangesLogMarker& marker, bool* ptruncated,
487-
optional_yield y);
488-
489-
int trim_entries(const DoutPrefixProvider *dpp, int shard_id,
490-
std::string_view marker, std::string* errstr, optional_yield y);
491-
int trim_entries(const DoutPrefixProvider *dpp, int shard_id,
492-
std::string_view marker, librados::AioCompletion* c);
493-
int get_info(const DoutPrefixProvider *dpp, int shard_id,
494-
RGWDataChangesLogInfo *info, std::string* errstr,
495-
optional_yield y);
496-
480+
asio::awaitable<RGWDataChangesLogInfo>
481+
get_info(const DoutPrefixProvider* dpp, int shard_id);
482+
asio::awaitable<void>
483+
trim_entries(const DoutPrefixProvider *dpp, int shard_id,
484+
std::string_view marker);
485+
void trim_entries(const DoutPrefixProvider *dpp, int shard_id,
486+
std::string_view marker, librados::AioCompletion* c);
487+
asio::awaitable<void>
488+
trim_generations(const DoutPrefixProvider *dpp,
489+
std::optional<uint64_t>& through);
490+
asio::awaitable<void>
491+
change_format(const DoutPrefixProvider *dpp, log_type type);
497492
void mark_modified(int shard_id, const rgw_bucket_shard& bs, uint64_t gen);
498493
auto read_clear_modified() {
499494
std::unique_lock wl{modified_lock};
@@ -516,11 +511,6 @@ class RGWDataChangesLog {
516511
std::string get_sem_set_oid(int shard_id) const;
517512

518513

519-
int change_format(const DoutPrefixProvider *dpp, log_type type,
520-
optional_yield y);
521-
int trim_generations(const DoutPrefixProvider *dpp,
522-
std::optional<uint64_t>& through,
523-
optional_yield y);
524514
asio::awaitable<std::pair<bc::flat_map<std::string, uint64_t>,
525515
std::string>>
526516
read_sems(int index, std::string cursor);

0 commit comments

Comments
 (0)