Skip to content

Commit df4ef78

Browse files
committed
rgw: add metric when send message with kafka and ampq
- l_rgw_pubsub_push_pending - l_rgw_pubsub_push_failed Fixes: https://tracker.ceph.com/issues/70256 Signed-off-by: Hoai-Thu Vuong <[email protected]>
1 parent af633b0 commit df4ef78

File tree

2 files changed

+33
-6
lines changed

2 files changed

+33
-6
lines changed

src/rgw/driver/rados/rgw_notify.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,7 @@ class Manager : public DoutPrefixProvider {
509509
needs_migration_vector[entry_idx - 1] = (result == EntryProcessingResult::Migrating);
510510
notifs_persistency_tracker.erase(entry.marker);
511511
is_idle = false;
512+
if (result == EntryProcessingResult::Expired && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
512513
return;
513514
}
514515
if (set_min_marker(end_marker, entry.marker) < 0) {

src/rgw/driver/rados/rgw_pubsub_push.cc

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -218,10 +218,15 @@ class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
218218

219219
int send(const DoutPrefixProvider* dpp, const rgw_pubsub_s3_event& event, optional_yield y) override {
220220
if (ack_level == ack_level_t::None) {
221-
return amqp::publish(conn_id, topic, json_format_pubsub_event(event));
221+
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
222+
const auto rc = amqp::publish(conn_id, topic, json_format_pubsub_event(event));
223+
if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
224+
if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
225+
return rc;
222226
} else {
223227
// TODO: currently broker and routable are the same - this will require different flags but the same mechanism
224228
if (y) {
229+
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
225230
auto& yield = y.get_yield_context();
226231
ceph::async::yield_waiter<int> w;
227232
boost::asio::defer(yield.get_executor(),[&w, &event, this]() {
@@ -233,17 +238,25 @@ class RGWPubSubAMQPEndpoint : public RGWPubSubEndpoint {
233238
w.complete(boost::system::error_code{}, rc);
234239
}
235240
});
236-
return w.async_wait(yield);
241+
const auto rc = w.async_wait(yield);
242+
if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
243+
if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
244+
return rc;
237245
}
238246
ceph::async::waiter<int> w;
239247
const auto rc = amqp::publish_with_confirm(
240248
conn_id, topic, json_format_pubsub_event(event),
241249
[&w](int r) {w(r);});
242250
if (rc < 0) {
243251
// failed to publish, does not wait for reply
252+
if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
253+
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
244254
return rc;
245255
}
246-
return w.wait();
256+
const auto wait_rc = w.wait();
257+
if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
258+
if (wait_rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
259+
return wait_rc;
247260
}
248261
}
249262

@@ -304,8 +317,13 @@ class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint {
304317
int send(const DoutPrefixProvider* dpp, const rgw_pubsub_s3_event& event,
305318
optional_yield y) override {
306319
if (ack_level == ack_level_t::None) {
307-
return kafka::publish(conn_id, topic, json_format_pubsub_event(event));
320+
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
321+
const auto rc = kafka::publish(conn_id, topic, json_format_pubsub_event(event));
322+
if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
323+
if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
324+
return rc;
308325
} else {
326+
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
309327
if (y) {
310328
auto& yield = y.get_yield_context();
311329
ceph::async::yield_waiter<int> w;
@@ -318,17 +336,25 @@ class RGWPubSubKafkaEndpoint : public RGWPubSubEndpoint {
318336
w.complete(boost::system::error_code{}, rc);
319337
}
320338
});
321-
return w.async_wait(yield);
339+
const auto rc = w.async_wait(yield);
340+
if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
341+
if (rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
342+
return rc;
322343
}
323344
ceph::async::waiter<int> w;
324345
const auto rc = kafka::publish_with_confirm(
325346
conn_id, topic, json_format_pubsub_event(event),
326347
[&w](int r) {w(r);});
327348
if (rc < 0) {
328349
// failed to publish, does not wait for reply
350+
if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
351+
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
329352
return rc;
330353
}
331-
return w.wait();
354+
const auto wait_rc = w.wait();
355+
if (perfcounter) perfcounter->dec(l_rgw_pubsub_push_pending);
356+
if (wait_rc < 0 && perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
357+
return wait_rc;
332358
}
333359
}
334360

0 commit comments

Comments
 (0)