@@ -218,10 +218,15 @@ class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
218218
219219 int send (const DoutPrefixProvider* dpp, const rgw_pubsub_s3_event& event, optional_yield y) override {
220220 if (ack_level == ack_level_t ::None) {
221- return amqp::publish (conn_id, topic, json_format_pubsub_event (event));
221+ if (perfcounter) perfcounter->inc (l_rgw_pubsub_push_pending);
222+ const auto rc = amqp::publish (conn_id, topic, json_format_pubsub_event (event));
223+ if (perfcounter) perfcounter->dec (l_rgw_pubsub_push_pending);
224+ if (rc < 0 && perfcounter) perfcounter->inc (l_rgw_pubsub_push_failed);
225+ return rc;
222226 } else {
223227 // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
224228 if (y) {
229+ if (perfcounter) perfcounter->inc (l_rgw_pubsub_push_pending);
225230 auto & yield = y.get_yield_context ();
226231 ceph::async::yield_waiter<int > w;
227232 boost::asio::defer (yield.get_executor (),[&w, &event, this ]() {
@@ -233,17 +238,25 @@ class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
233238 w.complete (boost::system::error_code{}, rc);
234239 }
235240 });
236- return w.async_wait (yield);
241+ const auto rc = w.async_wait (yield);
242+ if (perfcounter) perfcounter->dec (l_rgw_pubsub_push_pending);
243+ if (rc < 0 && perfcounter) perfcounter->inc (l_rgw_pubsub_push_failed);
244+ return rc;
237245 }
238246 ceph::async::waiter<int > w;
239247 const auto rc = amqp::publish_with_confirm (
240248 conn_id, topic, json_format_pubsub_event (event),
241249 [&w](int r) {w (r);});
242250 if (rc < 0 ) {
243251 // failed to publish, does not wait for reply
252+ if (perfcounter) perfcounter->dec (l_rgw_pubsub_push_pending);
253+ if (perfcounter) perfcounter->inc (l_rgw_pubsub_push_failed);
244254 return rc;
245255 }
246- return w.wait ();
256+ const auto wait_rc = w.wait ();
257+ if (perfcounter) perfcounter->dec (l_rgw_pubsub_push_pending);
258+ if (wait_rc < 0 && perfcounter) perfcounter->inc (l_rgw_pubsub_push_failed);
259+ return wait_rc;
247260 }
248261 }
249262
@@ -304,8 +317,13 @@ class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint {
304317 int send (const DoutPrefixProvider* dpp, const rgw_pubsub_s3_event& event,
305318 optional_yield y) override {
306319 if (ack_level == ack_level_t ::None) {
307- return kafka::publish (conn_id, topic, json_format_pubsub_event (event));
320+ if (perfcounter) perfcounter->inc (l_rgw_pubsub_push_pending);
321+ const auto rc = kafka::publish (conn_id, topic, json_format_pubsub_event (event));
322+ if (perfcounter) perfcounter->dec (l_rgw_pubsub_push_pending);
323+ if (rc < 0 && perfcounter) perfcounter->inc (l_rgw_pubsub_push_failed);
324+ return rc;
308325 } else {
326+ if (perfcounter) perfcounter->inc (l_rgw_pubsub_push_pending);
309327 if (y) {
310328 auto & yield = y.get_yield_context ();
311329 ceph::async::yield_waiter<int > w;
@@ -318,17 +336,25 @@ class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint {
318336 w.complete (boost::system::error_code{}, rc);
319337 }
320338 });
321- return w.async_wait (yield);
339+ const auto rc = w.async_wait (yield);
340+ if (perfcounter) perfcounter->dec (l_rgw_pubsub_push_pending);
341+ if (rc < 0 && perfcounter) perfcounter->inc (l_rgw_pubsub_push_failed);
342+ return rc;
322343 }
323344 ceph::async::waiter<int > w;
324345 const auto rc = kafka::publish_with_confirm (
325346 conn_id, topic, json_format_pubsub_event (event),
326347 [&w](int r) {w (r);});
327348 if (rc < 0 ) {
328349 // failed to publish, does not wait for reply
350+ if (perfcounter) perfcounter->dec (l_rgw_pubsub_push_pending);
351+ if (perfcounter) perfcounter->inc (l_rgw_pubsub_push_failed);
329352 return rc;
330353 }
331- return w.wait ();
354+ const auto wait_rc = w.wait ();
355+ if (perfcounter) perfcounter->dec (l_rgw_pubsub_push_pending);
356+ if (wait_rc < 0 && perfcounter) perfcounter->inc (l_rgw_pubsub_push_failed);
357+ return wait_rc;
332358 }
333359 }
334360
0 commit comments