99#include " common/Formatter.h"
1010#include " common/iso_8601.h"
1111#include " common/async/completion.h"
12+ #include " rgw_asio_thread.h"
1213#include " rgw_common.h"
1314#include " rgw_data_sync.h"
1415#include " rgw_pubsub.h"
@@ -88,7 +89,8 @@ class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
8889 }
8990 }
9091
91- int send (const rgw_pubsub_s3_event& event, optional_yield y) override {
92+ int send (const DoutPrefixProvider* dpp, const rgw_pubsub_s3_event& event,
93+ optional_yield y) override {
9294 std::shared_lock lock (s_http_manager_mutex);
9395 if (!s_http_manager) {
9496 ldout (cct, 1 ) << " ERROR: send failed. http endpoint manager not running" << dendl;
@@ -114,7 +116,7 @@ class RGWPubSubHTTPEndpoint : public RGWPubSubEndpoint {
114116 if (perfcounter) perfcounter->inc (l_rgw_pubsub_push_pending);
115117 auto rc = s_http_manager->add_request (&request);
116118 if (rc == 0 ) {
117- rc = request.wait (y);
119+ rc = request.wait (dpp, y);
118120 }
119121 if (perfcounter) perfcounter->dec (l_rgw_pubsub_push_pending);
120122 // TODO: use read_bl to process return code and handle according to ack level
@@ -144,7 +146,7 @@ class Waiter {
144146 mutable std::condition_variable cond;
145147
146148public:
147- int wait (optional_yield y) {
149+ int wait (const DoutPrefixProvider* dpp, optional_yield y) {
148150 std::unique_lock l{lock};
149151 if (done) {
150152 return ret;
@@ -160,6 +162,8 @@ class Waiter {
160162 }, token, yield.get_executor ());
161163 return -ec.value ();
162164 }
165+ maybe_warn_about_blocking (dpp);
166+
163167 cond.wait (l, [this ]{return (done==true );});
164168 return ret;
165169 }
@@ -247,7 +251,7 @@ class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
247251 }
248252 }
249253
250- int send (const rgw_pubsub_s3_event& event, optional_yield y) override {
254+ int send (const DoutPrefixProvider* dpp, const rgw_pubsub_s3_event& event, optional_yield y) override {
251255 if (ack_level == ack_level_t ::None) {
252256 return amqp::publish (conn_id, topic, json_format_pubsub_event (event));
253257 } else {
@@ -262,7 +266,7 @@ class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
262266 // failed to publish, does not wait for reply
263267 return rc;
264268 }
265- return w->wait (y);
269+ return w->wait (dpp, y);
266270 }
267271 }
268272
@@ -320,7 +324,8 @@ class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint {
320324 }
321325 }
322326
323- int send (const rgw_pubsub_s3_event& event, optional_yield y) override {
327+ int send (const DoutPrefixProvider* dpp, const rgw_pubsub_s3_event& event,
328+ optional_yield y) override {
324329 if (ack_level == ack_level_t ::None) {
325330 return kafka::publish (conn_id, topic, json_format_pubsub_event (event));
326331 } else {
@@ -332,7 +337,7 @@ class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint {
332337 // failed to publish, does not wait for reply
333338 return rc;
334339 }
335- return w->wait (y);
340+ return w->wait (dpp, y);
336341 }
337342 }
338343
0 commit comments