Skip to content

Commit ae9fe18

Browse files
authored
Merge pull request ceph#54697 from yuvalif/wip-yuval-63314
rgw/kafka/amqp: fix race conditionn in async completion handlers reviewed-by: cbodley
2 parents 9bc2c79 + 3e6d527 commit ae9fe18

File tree

1 file changed

+55
-114
lines changed

1 file changed

+55
-114
lines changed

src/rgw/driver/rados/rgw_pubsub_push.cc

Lines changed: 55 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,55 @@ class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
115115
}
116116
};
117117

118+
namespace {
119+
// this allows waiting untill "finish()" is called from a different thread
120+
// waiting could be blocking the waiting thread or yielding, depending
121+
// with compilation flag support and whether the optional_yield is set
122+
class Waiter {
123+
using Signature = void(boost::system::error_code);
124+
using Completion = ceph::async::Completion<Signature>;
125+
using CompletionInit = boost::asio::async_completion<yield_context, Signature>;
126+
std::unique_ptr<Completion> completion = nullptr;
127+
int ret;
128+
129+
bool done = false;
130+
mutable std::mutex lock;
131+
mutable std::condition_variable cond;
132+
133+
public:
134+
int wait(optional_yield y) {
135+
std::unique_lock l{lock};
136+
if (done) {
137+
return ret;
138+
}
139+
if (y) {
140+
boost::system::error_code ec;
141+
auto&& token = y.get_yield_context()[ec];
142+
CompletionInit init(token);
143+
completion = Completion::create(y.get_io_context().get_executor(),
144+
std::move(init.completion_handler));
145+
l.unlock();
146+
init.result.get();
147+
return -ec.value();
148+
}
149+
cond.wait(l, [this]{return (done==true);});
150+
return ret;
151+
}
152+
153+
void finish(int r) {
154+
std::unique_lock l{lock};
155+
ret = r;
156+
done = true;
157+
if (completion) {
158+
boost::system::error_code ec(-ret, boost::system::system_category());
159+
Completion::post(std::move(completion), ec);
160+
} else {
161+
cond.notify_all();
162+
}
163+
}
164+
};
165+
} // namespace
166+
118167
#ifdef WITH_RADOSGW_AMQP_ENDPOINT
119168
class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
120169
private:
@@ -187,71 +236,17 @@ class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
187236
}
188237
}
189238

190-
// this allows waiting untill "finish()" is called from a different thread
191-
// waiting could be blocking the waiting thread or yielding, depending
192-
// with compilation flag support and whether the optional_yield is set
193-
class Waiter {
194-
using Signature = void(boost::system::error_code);
195-
using Completion = ceph::async::Completion<Signature>;
196-
std::unique_ptr<Completion> completion = nullptr;
197-
int ret;
198-
199-
mutable std::atomic<bool> done = false;
200-
mutable std::mutex lock;
201-
mutable std::condition_variable cond;
202-
203-
template <typename ExecutionContext, typename CompletionToken>
204-
auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
205-
boost::asio::async_completion<CompletionToken, Signature> init(token);
206-
auto& handler = init.completion_handler;
207-
{
208-
std::unique_lock l{lock};
209-
completion = Completion::create(ctx.get_executor(), std::move(handler));
210-
}
211-
return init.result.get();
212-
}
213-
214-
public:
215-
int wait(optional_yield y) {
216-
if (done) {
217-
return ret;
218-
}
219-
if (y) {
220-
auto& io_ctx = y.get_io_context();
221-
auto& yield_ctx = y.get_yield_context();
222-
boost::system::error_code ec;
223-
async_wait(io_ctx, yield_ctx[ec]);
224-
return -ec.value();
225-
}
226-
std::unique_lock l(lock);
227-
cond.wait(l, [this]{return (done==true);});
228-
return ret;
229-
}
230-
231-
void finish(int r) {
232-
std::unique_lock l{lock};
233-
ret = r;
234-
done = true;
235-
if (completion) {
236-
boost::system::error_code ec(-ret, boost::system::system_category());
237-
Completion::post(std::move(completion), ec);
238-
} else {
239-
cond.notify_all();
240-
}
241-
}
242-
};
243-
244239
int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
245240
if (ack_level == ack_level_t::None) {
246241
return amqp::publish(conn_id, topic, json_format_pubsub_event(event));
247242
} else {
248243
// TODO: currently broker and routable are the same - this will require different flags but the same mechanism
249-
// note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
250-
auto w = std::unique_ptr<Waiter>(new Waiter);
244+
auto w = std::make_unique<Waiter>();
251245
const auto rc = amqp::publish_with_confirm(conn_id,
252246
topic,
253247
json_format_pubsub_event(event),
254-
std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
248+
[wp = w.get()](int r) { wp->finish(r);}
249+
);
255250
if (rc < 0) {
256251
// failed to publish, does not wait for reply
257252
return rc;
@@ -314,70 +309,16 @@ class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint {
314309
}
315310
}
316311

317-
// this allows waiting untill "finish()" is called from a different thread
318-
// waiting could be blocking the waiting thread or yielding, depending
319-
// with compilation flag support and whether the optional_yield is set
320-
class Waiter {
321-
using Signature = void(boost::system::error_code);
322-
using Completion = ceph::async::Completion<Signature>;
323-
std::unique_ptr<Completion> completion = nullptr;
324-
int ret;
325-
326-
mutable std::atomic<bool> done = false;
327-
mutable std::mutex lock;
328-
mutable std::condition_variable cond;
329-
330-
template <typename ExecutionContext, typename CompletionToken>
331-
auto async_wait(ExecutionContext& ctx, CompletionToken&& token) {
332-
boost::asio::async_completion<CompletionToken, Signature> init(token);
333-
auto& handler = init.completion_handler;
334-
{
335-
std::unique_lock l{lock};
336-
completion = Completion::create(ctx.get_executor(), std::move(handler));
337-
}
338-
return init.result.get();
339-
}
340-
341-
public:
342-
int wait(optional_yield y) {
343-
if (done) {
344-
return ret;
345-
}
346-
if (y) {
347-
auto& io_ctx = y.get_io_context();
348-
auto& yield_ctx = y.get_yield_context();
349-
boost::system::error_code ec;
350-
async_wait(io_ctx, yield_ctx[ec]);
351-
return -ec.value();
352-
}
353-
std::unique_lock l(lock);
354-
cond.wait(l, [this]{return (done==true);});
355-
return ret;
356-
}
357-
358-
void finish(int r) {
359-
std::unique_lock l{lock};
360-
ret = r;
361-
done = true;
362-
if (completion) {
363-
boost::system::error_code ec(-ret, boost::system::system_category());
364-
Completion::post(std::move(completion), ec);
365-
} else {
366-
cond.notify_all();
367-
}
368-
}
369-
};
370-
371312
int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
372313
if (ack_level == ack_level_t::None) {
373314
return kafka::publish(conn_name, topic, json_format_pubsub_event(event));
374315
} else {
375-
// note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
376-
auto w = std::unique_ptr<Waiter>(new Waiter);
316+
auto w = std::make_unique<Waiter>();
377317
const auto rc = kafka::publish_with_confirm(conn_name,
378318
topic,
379319
json_format_pubsub_event(event),
380-
std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
320+
[wp = w.get()](int r) { wp->finish(r); }
321+
);
381322
if (rc < 0) {
382323
// failed to publish, does not wait for reply
383324
return rc;

0 commit comments

Comments
 (0)