1818#include " common/dout.h"
1919#include < chrono>
2020
21- #define dout_subsys ceph_subsys_rgw
21+ #define dout_subsys ceph_subsys_rgw_notification
2222
2323namespace rgw ::notify {
2424
@@ -66,6 +66,16 @@ struct event_entry_t {
6666};
6767WRITE_CLASS_ENCODER (event_entry_t )
6868
69+ static inline std::ostream& operator <<(std::ostream& out,
70+ const event_entry_t & e) {
71+ return out << " notification id: '" << e.event .configurationId
72+ << " ', topic: '" << e.arn_topic
73+ << " ', endpoint: '" << e.push_endpoint
74+ << " ', bucket_owner: '" << e.event .bucket_ownerIdentity
75+ << " ', bucket: '" << e.event .bucket_name
76+ << " ', object: '" << e.event .object_key
77+ << " ', event type: '" << e.event .eventName << " '" ;
78+ }
6979
7080struct persistency_tracker {
7181 ceph::coarse_real_time last_retry_time {ceph::coarse_real_clock::zero ()};
@@ -244,15 +254,12 @@ class Manager : public DoutPrefixProvider {
244254 if ( (topic_persistency_ttl != 0 && event_entry.creation_time != ceph::coarse_real_clock::zero () &&
245255 time_now - event_entry.creation_time > std::chrono::seconds (topic_persistency_ttl))
246256 || ( topic_persistency_max_retries != 0 && entry_persistency_tracker.retires_num > topic_persistency_max_retries) ) {
247- ldpp_dout (this , 1 ) << " Expiring entry for topic= "
248- << event_entry.arn_topic << " bucket_owner= "
249- << event_entry.event .bucket_ownerIdentity
250- << " bucket= " << event_entry.event .bucket_name
251- << " object_name= " << event_entry.event .object_key
252- << " entry retry_number="
257+ ldpp_dout (this , 1 ) << " WARNING: Expiring entry marker: " << entry.marker
258+ << " for event with " << event_entry
259+ << " entry retry_number: "
253260 << entry_persistency_tracker.retires_num
254- << " creation_time= " << event_entry.creation_time
255- << " time_now= " << time_now << dendl;
261+ << " creation_time: " << event_entry.creation_time
262+ << " time_now: " << time_now << dendl;
256263 return EntryProcessingResult::Expired;
257264 }
258265 if (time_now - entry_persistency_tracker.last_retry_time < std::chrono::seconds (topic_persistency_sleep_duration) ) {
@@ -261,7 +268,10 @@ class Manager : public DoutPrefixProvider {
261268
262269 ++entry_persistency_tracker.retires_num ;
263270 entry_persistency_tracker.last_retry_time = time_now;
264- ldpp_dout (this , 20 ) << " Processing entry retry_number=" << entry_persistency_tracker.retires_num << " time=" << dendl;
271+ ldpp_dout (this , 20 ) << " Processing event entry with " << event_entry
272+ << " retry_number: "
273+ << entry_persistency_tracker.retires_num
274+ << " current time: " << time_now << dendl;
265275 try {
266276 // TODO move endpoint creation to queue level
267277 const auto push_endpoint = RGWPubSubEndpoint::create (event_entry.push_endpoint , event_entry.arn_topic ,
@@ -271,12 +281,14 @@ class Manager : public DoutPrefixProvider {
271281 " for entry: " << entry.marker << dendl;
272282 const auto ret = push_endpoint->send_to_completion_async (cct, event_entry.event , optional_yield (io_context, yield));
273283 if (ret < 0 ) {
274- ldpp_dout (this , 5 ) << " WARNING: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint
275- << " failed. error: " << ret << " (will retry)" << dendl;
284+ ldpp_dout (this , 5 ) << " WARNING: push entry marker: " << entry.marker
285+ << " failed. error: " << ret
286+ << " (will retry) for event with " << event_entry
287+ << dendl;
276288 return EntryProcessingResult::Failure;
277289 } else {
278- ldpp_dout (this , 20 ) << " INFO: push entry: " << entry. marker << " to endpoint : " << event_entry. push_endpoint
279- << " ok" << dendl;
290+ ldpp_dout (this , 5 ) << " INFO: push entry marker: " << entry. marker
291+ << " ok for event with " << event_entry << dendl;
280292 if (perfcounter) perfcounter->inc (l_rgw_pubsub_push_ok);
281293 return EntryProcessingResult::Successful;
282294 }
@@ -304,12 +316,15 @@ class Manager : public DoutPrefixProvider {
304316 auto ret = rgw_rados_operate (this , rados_store.getRados ()->get_notif_pool_ctx (), queue_name, &op, optional_yield (io_context, yield));
305317 if (ret == -ENOENT) {
306318 // queue was deleted
307- ldpp_dout (this , 5 ) << " INFO: queue: "
308- << queue_name << " . was removed. cleanup will stop" << dendl;
319+ ldpp_dout (this , 10 ) << " INFO: queue: " << queue_name
320+ << " . was removed. cleanup will stop" << dendl;
309321 return ;
310322 }
311323 if (ret == -EBUSY) {
312- ldpp_dout (this , 5 ) << " WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
324+ ldpp_dout (this , 10 )
325+ << " WARNING: queue: " << queue_name
326+ << " ownership moved to another daemon. processing will stop"
327+ << dendl;
313328 return ;
314329 }
315330 if (ret < 0 ) {
@@ -367,13 +382,16 @@ class Manager : public DoutPrefixProvider {
367382 if (ret == -ENOENT) {
368383 // queue was deleted
369384 topics_persistency_tracker.erase (queue_name);
370- ldpp_dout (this , 5 ) << " INFO: queue: "
371- << queue_name << " . was removed. processing will stop" << dendl;
385+ ldpp_dout (this , 10 ) << " INFO: queue: " << queue_name
386+ << " . was removed. processing will stop" << dendl;
372387 return ;
373388 }
374389 if (ret == -EBUSY) {
375390 topics_persistency_tracker.erase (queue_name);
376- ldpp_dout (this , 5 ) << " WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
391+ ldpp_dout (this , 10 )
392+ << " WARNING: queue: " << queue_name
393+ << " ownership moved to another daemon. processing will stop"
394+ << dendl;
377395 return ;
378396 }
379397 if (ret < 0 ) {
@@ -468,11 +486,15 @@ class Manager : public DoutPrefixProvider {
468486 auto ret = rgw_rados_operate (this , rados_ioctx, queue_name, &op, optional_yield (io_context, yield));
469487 if (ret == -ENOENT) {
470488 // queue was deleted
471- ldpp_dout (this , 5 ) << " INFO: queue: " << queue_name << " . was removed. processing will stop" << dendl;
489+ ldpp_dout (this , 10 ) << " INFO: queue: " << queue_name
490+ << " . was removed. processing will stop" << dendl;
472491 return ;
473492 }
474493 if (ret == -EBUSY) {
475- ldpp_dout (this , 5 ) << " WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
494+ ldpp_dout (this , 10 )
495+ << " WARNING: queue: " << queue_name
496+ << " ownership moved to another daemon. processing will stop"
497+ << dendl;
476498 return ;
477499 }
478500 if (ret < 0 ) {
@@ -1133,23 +1155,16 @@ int publish_commit(rgw::sal::Object* obj,
11331155 dpp->get_cct (), event_entry.event , res.yield );
11341156 if (ret < 0 ) {
11351157 ldpp_dout (dpp, 1 )
1136- << " ERROR: push to endpoint " << topic.cfg .dest .push_endpoint
1137- << " bucket: " << event_entry.event .bucket_name
1138- << " bucket_owner: " << event_entry.event .bucket_ownerIdentity
1139- << " object_name: " << event_entry.event .object_key
1140- << " failed. error: " << ret << dendl;
1158+ << " ERROR: failed to push sync notification event with error: "
1159+ << ret << " for event with " << event_entry << dendl;
11411160 if (perfcounter) perfcounter->inc (l_rgw_pubsub_push_failed);
11421161 return ret;
11431162 }
11441163 if (perfcounter) perfcounter->inc (l_rgw_pubsub_push_ok);
11451164 } catch (const RGWPubSubEndpoint::configuration_error& e) {
1146- ldpp_dout (dpp, 1 ) << " ERROR: failed to create push endpoint: "
1147- << topic.cfg .dest .push_endpoint
1148- << " bucket: " << event_entry.event .bucket_name
1149- << " bucket_owner: "
1150- << event_entry.event .bucket_ownerIdentity
1151- << " object_name: " << event_entry.event .object_key
1152- << " . error: " << e.what () << dendl;
1165+ ldpp_dout (dpp, 1 ) << " ERROR: failed to create push endpoint for sync "
1166+ " notification event with error: "
1167+ << e.what () << " event with " << event_entry << dendl;
11531168 if (perfcounter) perfcounter->inc (l_rgw_pubsub_push_failed);
11541169 return -EINVAL;
11551170 }
0 commit comments