Skip to content

Commit 2f0e4f7

Browse files
committed
rgw/multisite/datalog: Make add_entry a stackful coroutine
Since, outside of testing, it's only called from stackful coroutines, for now. Signed-off-by: Adam Emerson <[email protected]>
1 parent 5c9ff9d commit 2f0e4f7

File tree

3 files changed

+93
-54
lines changed

3 files changed

+93
-54
lines changed

src/rgw/driver/rados/rgw_datalog.cc

Lines changed: 50 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -163,14 +163,13 @@ class RGWDataChangesOmap final : public RGWDataChangesBE {
163163
asio::use_awaitable);
164164
co_return;
165165
}
166-
asio::awaitable<void> push(const DoutPrefixProvider *dpp, int index,
167-
ceph::real_time now, const std::string& key,
168-
buffer::list&& bl) override {
169-
co_await r.execute(
170-
oids[index], loc,
171-
neorados::WriteOp{}.exec(nlog::add(now, {}, key, std::move(bl))),
172-
asio::use_awaitable);
173-
co_return;
166+
void push(const DoutPrefixProvider *dpp, int index,
167+
ceph::real_time now, const std::string& key,
168+
buffer::list&& bl, asio::yield_context y) override {
169+
r.execute(oids[index], loc,
170+
neorados::WriteOp{}.exec(nlog::add(now, {}, key, std::move(bl))),
171+
y);
172+
return;
174173
}
175174

176175
asio::awaitable<std::tuple<std::span<rgw_data_change_log_entry>,
@@ -270,7 +269,7 @@ class RGWDataChangesOmap final : public RGWDataChangesBE {
270269
};
271270

272271
class RGWDataChangesFIFO final : public RGWDataChangesBE {
273-
using centries = std::vector<buffer::list>;
272+
using centries = std::deque<buffer::list>;
274273
tiny_vector<LazyFIFO> fifos;
275274

276275
public:
@@ -296,10 +295,10 @@ class RGWDataChangesFIFO final : public RGWDataChangesBE {
296295
entries&& items) override {
297296
co_return co_await fifos[index].push(dpp, std::get<centries>(items));
298297
}
299-
asio::awaitable<void> push(const DoutPrefixProvider* dpp, int index,
300-
ceph::real_time, const std::string&,
301-
buffer::list&& bl) override {
302-
co_return co_await fifos[index].push(dpp, std::move(bl));
298+
void push(const DoutPrefixProvider* dpp, int index,
299+
ceph::real_time, const std::string&,
300+
buffer::list&& bl, asio::yield_context y) override {
301+
fifos[index].push(dpp, std::move(bl), y);
303302
}
304303
asio::awaitable<std::tuple<std::span<rgw_data_change_log_entry>,
305304
std::string>>
@@ -854,20 +853,15 @@ int RGWDataChangesLog::get_log_shard_id(rgw_bucket& bucket, int shard_id) {
854853
return choose_oid(bs);
855854
}
856855

857-
asio::awaitable<bool>
858-
RGWDataChangesLog::filter_bucket(const DoutPrefixProvider *dpp,
859-
const rgw_bucket& bucket) const
856+
bool RGWDataChangesLog::filter_bucket(const DoutPrefixProvider *dpp,
857+
const rgw_bucket& bucket,
858+
asio::yield_context y) const
860859
{
861860
if (!bucket_filter) {
862-
co_return true;
861+
return true;
863862
}
864863

865-
co_return co_await asio::spawn(
866-
co_await asio::this_coro::executor,
867-
[this, dpp, &bucket](asio::yield_context yc) {
868-
optional_yield y(yc);
869-
return bucket_filter(bucket, y, dpp);
870-
}, asio::use_awaitable);
864+
return bucket_filter(bucket, y, dpp);
871865
}
872866

873867
std::string RGWDataChangesLog::get_oid(uint64_t gen_id, int i) const {
@@ -885,15 +879,29 @@ RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp,
885879
const RGWBucketInfo& bucket_info,
886880
const rgw::bucket_log_layout_generation& gen,
887881
int shard_id)
882+
{
883+
co_await asio::spawn(
884+
co_await asio::this_coro::executor,
885+
[this, dpp, &bucket_info, &gen, shard_id](asio::yield_context y) {
886+
return add_entry(dpp, bucket_info, gen, shard_id, y);
887+
}, asio::use_awaitable);
888+
co_return;
889+
}
890+
891+
892+
void RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp,
893+
const RGWBucketInfo& bucket_info,
894+
const rgw::bucket_log_layout_generation& gen,
895+
int shard_id, asio::yield_context y)
888896
{
889897
if (!log_data) {
890-
co_return;
898+
return;
891899
}
892900

893901
auto& bucket = bucket_info.bucket;
894902

895-
if (!co_await filter_bucket(dpp, bucket)) {
896-
co_return;
903+
if (!filter_bucket(dpp, bucket, y)) {
904+
return;
897905
}
898906

899907
if (observer) {
@@ -921,8 +929,8 @@ RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp,
921929

922930
auto be = bes->head();
923931
// Failure on push is fatal if we're bypassing semaphores.
924-
co_await be->push(dpp, index, now, change.key, std::move(bl));
925-
co_return;
932+
be->push(dpp, index, now, change.key, std::move(bl), y);
933+
return;
926934
}
927935

928936
mark_modified(index, bs, gen.gen);
@@ -950,17 +958,16 @@ RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp,
950958
if (need_sem_set) {
951959
using neorados::WriteOp;
952960
using neorados::cls::sem_set::increment;
953-
co_await rados->execute(get_sem_set_oid(index), loc,
954-
WriteOp{}.exec(increment(std::move(key))),
955-
asio::use_awaitable);
961+
rados->execute(get_sem_set_oid(index), loc,
962+
WriteOp{}.exec(increment(std::move(key))), y);
956963
}
957-
co_return;
964+
return;
958965
}
959966

960967
if (status->pending) {
961-
co_await status->cond.async_wait(sl, asio::use_awaitable);
968+
status->cond.async_wait(sl, y);
962969
sl.unlock();
963-
co_return;
970+
return;
964971
}
965972

966973
status->cond.notify(sl);
@@ -987,7 +994,7 @@ RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp,
987994
auto be = bes->head();
988995
// Failure on push isn't fatal.
989996
try {
990-
co_await be->push(dpp, index, now, change.key, std::move(bl));
997+
be->push(dpp, index, now, change.key, std::move(bl), y);
991998
} catch (const std::exception& e) {
992999
ldpp_dout(dpp, 5) << "RGWDataChangesLog::add_entry(): Backend push failed "
9931000
<< "with exception: " << e.what() << dendl;
@@ -1005,7 +1012,7 @@ RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp,
10051012
status->cond.notify(sl);
10061013
sl.unlock();
10071014

1008-
co_return;
1015+
return;
10091016
}
10101017

10111018
int RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp,
@@ -1015,19 +1022,19 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp,
10151022
{
10161023
std::exception_ptr eptr;
10171024
if (y) {
1018-
auto& yield = y.get_yield_context();
10191025
try {
1020-
asio::co_spawn(yield.get_executor(),
1021-
add_entry(dpp, bucket_info, gen, shard_id),
1022-
yield);
1026+
add_entry(dpp, bucket_info, gen, shard_id, y.get_yield_context());
10231027
} catch (const std::exception&) {
10241028
eptr = std::current_exception();
10251029
}
10261030
} else {
10271031
maybe_warn_about_blocking(dpp);
1028-
eptr = asio::co_spawn(rados->get_executor(),
1029-
add_entry(dpp, bucket_info, gen, shard_id),
1030-
async::use_blocked);
1032+
eptr = asio::spawn(rados->get_executor(),
1033+
[this, dpp, &bucket_info, &gen,
1034+
&shard_id](asio::yield_context y) {
1035+
add_entry(dpp, bucket_info, gen, shard_id, y);
1036+
},
1037+
async::use_blocked);
10311038
}
10321039
return ceph::from_exception(eptr);
10331040
}

src/rgw/driver/rados/rgw_datalog.h

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -415,8 +415,9 @@ class RGWDataChangesLog {
415415
std::function<bool(const rgw_bucket& bucket, optional_yield y,
416416
const DoutPrefixProvider *dpp)> bucket_filter;
417417
bool going_down() const;
418-
asio::awaitable<bool> filter_bucket(const DoutPrefixProvider* dpp,
419-
const rgw_bucket& bucket) const;
418+
bool filter_bucket(const DoutPrefixProvider* dpp,
419+
const rgw_bucket& bucket,
420+
asio::yield_context y) const;
420421
asio::awaitable<void> renew_entries(const DoutPrefixProvider *dpp);
421422

422423
uint64_t watchcookie = 0;
@@ -451,6 +452,10 @@ class RGWDataChangesLog {
451452
const RGWBucketInfo& bucket_info,
452453
const rgw::bucket_log_layout_generation& gen,
453454
int shard_id);
455+
void add_entry(const DoutPrefixProvider *dpp,
456+
const RGWBucketInfo& bucket_info,
457+
const rgw::bucket_log_layout_generation& gen,
458+
int shard_id, asio::yield_context y);
454459
int add_entry(const DoutPrefixProvider *dpp,
455460
const RGWBucketInfo& bucket_info,
456461
const rgw::bucket_log_layout_generation& gen,
@@ -540,7 +545,7 @@ class RGWDataChangesBE : public boost::intrusive_ref_counter<RGWDataChangesBE> {
540545
}
541546
public:
542547
using entries = std::variant<std::vector<cls::log::entry>,
543-
std::vector<ceph::buffer::list>>;
548+
std::deque<ceph::buffer::list>>;
544549

545550
const uint64_t gen_id;
546551

@@ -555,10 +560,11 @@ class RGWDataChangesBE : public boost::intrusive_ref_counter<RGWDataChangesBE> {
555560
ceph::buffer::list&& entry, entries& out) = 0;
556561
virtual asio::awaitable<void> push(const DoutPrefixProvider *dpp, int index,
557562
entries&& items) = 0;
558-
virtual asio::awaitable<void> push(const DoutPrefixProvider *dpp, int index,
559-
ceph::real_time now,
560-
const std::string& key,
561-
ceph::buffer::list&& bl) = 0;
563+
virtual void push(const DoutPrefixProvider *dpp, int index,
564+
ceph::real_time now,
565+
const std::string& key,
566+
ceph::buffer::list&& bl,
567+
asio::yield_context y) = 0;
562568
virtual asio::awaitable<std::tuple<std::span<rgw_data_change_log_entry>,
563569
std::string>>
564570
list(const DoutPrefixProvider* dpp, int shard,

src/rgw/driver/rados/rgw_log_backing.h

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <string_view>
1010

1111
#include <boost/asio/awaitable.hpp>
12+
#include <boost/asio/spawn.hpp>
1213
#include <boost/asio/strand.hpp>
1314
#include <boost/asio/use_awaitable.hpp>
1415

@@ -277,16 +278,41 @@ class LazyFIFO {
277278
co_return;
278279
}
279280

281+
void lazy_init(const DoutPrefixProvider *dpp, asio::yield_context y) {
282+
std::unique_lock l(m);
283+
if (fifo) {
284+
return;
285+
} else {
286+
l.unlock();
287+
// FIFO supports multiple clients by design, so it's safe to
288+
// race to create them.
289+
auto fifo_tmp = fifo::FIFO::create(dpp, r, oid, loc, y);
290+
l.lock();
291+
if (!fifo) {
292+
// We won the race
293+
fifo = std::move(fifo_tmp);
294+
}
295+
}
296+
l.unlock();
297+
return;
298+
}
299+
280300
public:
281301

282302
LazyFIFO(neorados::RADOS& r, std::string oid, neorados::IOContext loc)
283303
: r(r), oid(std::move(oid)), loc(std::move(loc)) {}
284304

285-
template <typename... Args>
286-
asio::awaitable<void> push(const DoutPrefixProvider *dpp, Args&& ...args) {
305+
asio::awaitable<void> push(const DoutPrefixProvider *dpp,
306+
std::deque<ceph::buffer::list> entries) {
287307
co_await lazy_init(dpp);
288-
co_return co_await fifo->push(dpp, std::forward<Args>(args)...,
289-
asio::use_awaitable);
308+
co_return co_await fifo->push(dpp, std::move(entries), asio::use_awaitable);
309+
}
310+
311+
void push(const DoutPrefixProvider *dpp,
312+
ceph::buffer::list entry,
313+
asio::yield_context y) {
314+
lazy_init(dpp, y);
315+
fifo->push(dpp, std::move(entry), y);
290316
}
291317

292318
asio::awaitable<std::tuple<std::span<fifo::entry>, std::optional<std::string>>>

0 commit comments

Comments
 (0)