Skip to content

Commit 202258f

Browse files
authored
Merge pull request ceph#58765 from yuvalif/wip-yuval-64184
rgw/async/notifications: use common async waiter in pubsub push Reviewed-by: Casey Bodley <[email protected]>
2 parents a2b779f + 2872c75 commit 202258f

File tree

2 files changed

+65
-62
lines changed

2 files changed

+65
-62
lines changed

src/rgw/driver/rados/rgw_pubsub_push.cc

Lines changed: 39 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
#include "common/Formatter.h"
1010
#include "common/iso_8601.h"
1111
#include "common/async/completion.h"
12+
#include "common/async/yield_waiter.h"
13+
#include "common/async/waiter.h"
1214
#include "rgw_asio_thread.h"
1315
#include "rgw_common.h"
1416
#include "rgw_data_sync.h"
@@ -131,57 +133,6 @@ class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
131133
}
132134
};
133135

134-
namespace {
135-
// this allows waiting untill "finish()" is called from a different thread
136-
// waiting could be blocking the waiting thread or yielding, depending
137-
// with compilation flag support and whether the optional_yield is set
138-
class Waiter {
139-
using Signature = void(boost::system::error_code);
140-
using Completion = ceph::async::Completion<Signature>;
141-
std::unique_ptr<Completion> completion = nullptr;
142-
int ret;
143-
144-
bool done = false;
145-
mutable std::mutex lock;
146-
mutable std::condition_variable cond;
147-
148-
public:
149-
int wait(const DoutPrefixProvider* dpp, optional_yield y) {
150-
std::unique_lock l{lock};
151-
if (done) {
152-
return ret;
153-
}
154-
if (y) {
155-
boost::system::error_code ec;
156-
auto yield = y.get_yield_context();
157-
auto&& token = yield[ec];
158-
boost::asio::async_initiate<boost::asio::yield_context, Signature>(
159-
[this, &l] (auto handler, auto ex) {
160-
completion = Completion::create(ex, std::move(handler));
161-
l.unlock(); // unlock before suspend
162-
}, token, yield.get_executor());
163-
return -ec.value();
164-
}
165-
maybe_warn_about_blocking(dpp);
166-
167-
cond.wait(l, [this]{return (done==true);});
168-
return ret;
169-
}
170-
171-
void finish(int r) {
172-
std::unique_lock l{lock};
173-
ret = r;
174-
done = true;
175-
if (completion) {
176-
boost::system::error_code ec(-ret, boost::system::system_category());
177-
Completion::post(std::move(completion), ec);
178-
} else {
179-
cond.notify_all();
180-
}
181-
}
182-
};
183-
} // namespace
184-
185136
#ifdef WITH_RADOSGW_AMQP_ENDPOINT
186137
class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
187138
private:
@@ -256,17 +207,29 @@ class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
256207
return amqp::publish(conn_id, topic, json_format_pubsub_event(event));
257208
} else {
258209
// TODO: currently broker and routable are the same - this will require different flags but the same mechanism
259-
auto w = std::make_unique<Waiter>();
260-
const auto rc = amqp::publish_with_confirm(conn_id,
261-
topic,
262-
json_format_pubsub_event(event),
263-
[wp = w.get()](int r) { wp->finish(r);}
264-
);
210+
if (y) {
211+
auto& yield = y.get_yield_context();
212+
ceph::async::yield_waiter<int> w;
213+
boost::asio::defer(yield.get_executor(),[&w, &event, this]() {
214+
const auto rc = amqp::publish_with_confirm(
215+
conn_id, topic, json_format_pubsub_event(event),
216+
[&w](int r) {w.complete(boost::system::error_code{}, r);});
217+
if (rc < 0) {
218+
// failed to publish, does not wait for reply
219+
w.complete(boost::system::error_code{}, rc);
220+
}
221+
});
222+
return w.async_wait(yield);
223+
}
224+
ceph::async::waiter<int> w;
225+
const auto rc = amqp::publish_with_confirm(
226+
conn_id, topic, json_format_pubsub_event(event),
227+
[&w](int r) {w(r);});
265228
if (rc < 0) {
266229
// failed to publish, does not wait for reply
267230
return rc;
268231
}
269-
return w->wait(dpp, y);
232+
return w.wait();
270233
}
271234
}
272235

@@ -329,15 +292,29 @@ class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint {
329292
if (ack_level == ack_level_t::None) {
330293
return kafka::publish(conn_id, topic, json_format_pubsub_event(event));
331294
} else {
332-
auto w = std::make_unique<Waiter>();
295+
if (y) {
296+
auto& yield = y.get_yield_context();
297+
ceph::async::yield_waiter<int> w;
298+
boost::asio::defer(yield.get_executor(),[&w, &event, this]() {
299+
const auto rc = kafka::publish_with_confirm(
300+
conn_id, topic, json_format_pubsub_event(event),
301+
[&w](int r) {w.complete(boost::system::error_code{}, r);});
302+
if (rc < 0) {
303+
// failed to publish, does not wait for reply
304+
w.complete(boost::system::error_code{}, rc);
305+
}
306+
});
307+
return w.async_wait(yield);
308+
}
309+
ceph::async::waiter<int> w;
333310
const auto rc = kafka::publish_with_confirm(
334-
conn_id, topic, json_format_pubsub_event(event),
335-
[wp = w.get()](int r) { wp->finish(r); });
311+
conn_id, topic, json_format_pubsub_event(event),
312+
[&w](int r) {w(r);});
336313
if (rc < 0) {
337314
// failed to publish, does not wait for reply
338315
return rc;
339316
}
340-
return w->wait(dpp, y);
317+
return w.wait();
341318
}
342319
}
343320

src/test/common/test_async_yield_waiter.cc

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <exception>
1818
#include <memory>
1919
#include <optional>
20+
#include <thread>
2021
#include <boost/asio/io_context.hpp>
2122
#include <boost/asio/spawn.hpp>
2223
#include <gtest/gtest.h>
@@ -240,4 +241,29 @@ TEST(YieldWaiterPtr, wait_error)
240241
}
241242
}
242243

244+
void invoke_callback(int expected_reply, std::function<void(int)> cb) {
245+
auto t = std::thread([cb, expected_reply] {
246+
cb(expected_reply);
247+
});
248+
t.detach();
249+
}
250+
251+
TEST(YieldWaiterInt, mt_wait_complete)
252+
{
253+
boost::asio::io_context io_context;
254+
int reply;
255+
const int expected_reply = 42;
256+
boost::asio::spawn(io_context,
257+
[&reply](boost::asio::yield_context yield) {
258+
yield_waiter<int> waiter;
259+
boost::asio::defer(yield.get_executor(),[&waiter] {
260+
invoke_callback(expected_reply, [&waiter](int r) {waiter.complete(boost::system::error_code{}, r);});
261+
});
262+
reply = waiter.async_wait(yield);
263+
}, rethrow);
264+
io_context.run();
265+
EXPECT_EQ(reply, expected_reply);
266+
}
267+
243268
} // namespace ceph::async
269+

0 commit comments

Comments
 (0)