2020#include " common/dout.h"
2121#include < chrono>
2222
23- #define dout_subsys ceph_subsys_rgw
23+ #define dout_subsys ceph_subsys_rgw_notification
2424
2525namespace rgw ::notify {
2626
@@ -68,6 +68,16 @@ struct event_entry_t {
6868};
6969WRITE_CLASS_ENCODER (event_entry_t )
7070
71+ static inline std::ostream& operator <<(std::ostream& out,
72+ const event_entry_t & e) {
73+ return out << " notification id: '" << e.event .configurationId
74+ << " ', topic: '" << e.arn_topic
75+ << " ', endpoint: '" << e.push_endpoint
76+ << " ', bucket_owner: '" << e.event .bucket_ownerIdentity
77+ << " ', bucket: '" << e.event .bucket_name
78+ << " ', object: '" << e.event .object_key
79+ << " ', event type: '" << e.event .eventName << " '" ;
80+ }
7181
7282struct persistency_tracker {
7383 ceph::coarse_real_time last_retry_time {ceph::coarse_real_clock::zero ()};
@@ -247,15 +257,12 @@ class Manager : public DoutPrefixProvider {
247257 if ( (topic_persistency_ttl != 0 && event_entry.creation_time != ceph::coarse_real_clock::zero () &&
248258 time_now - event_entry.creation_time > std::chrono::seconds (topic_persistency_ttl))
249259 || ( topic_persistency_max_retries != 0 && entry_persistency_tracker.retires_num > topic_persistency_max_retries) ) {
250- ldpp_dout (this , 1 ) << " Expiring entry for topic= "
251- << event_entry.arn_topic << " bucket_owner= "
252- << event_entry.event .bucket_ownerIdentity
253- << " bucket= " << event_entry.event .bucket_name
254- << " object_name= " << event_entry.event .object_key
255- << " entry retry_number="
260+ ldpp_dout (this , 1 ) << " WARNING: Expiring entry marker: " << entry.marker
261+ << " for event with " << event_entry
262+ << " entry retry_number: "
256263 << entry_persistency_tracker.retires_num
257- << " creation_time= " << event_entry.creation_time
258- << " time_now= " << time_now << dendl;
264+ << " creation_time: " << event_entry.creation_time
265+ << " time_now: " << time_now << dendl;
259266 return EntryProcessingResult::Expired;
260267 }
261268 if (time_now - entry_persistency_tracker.last_retry_time < std::chrono::seconds (topic_persistency_sleep_duration) ) {
@@ -264,7 +271,10 @@ class Manager : public DoutPrefixProvider {
264271
265272 ++entry_persistency_tracker.retires_num ;
266273 entry_persistency_tracker.last_retry_time = time_now;
267- ldpp_dout (this , 20 ) << " Processing entry retry_number=" << entry_persistency_tracker.retires_num << " time=" << dendl;
274+ ldpp_dout (this , 20 ) << " Processing event entry with " << event_entry
275+ << " retry_number: "
276+ << entry_persistency_tracker.retires_num
277+ << " current time: " << time_now << dendl;
268278 try {
269279 // TODO move endpoint creation to queue level
270280 const auto push_endpoint = RGWPubSubEndpoint::create (event_entry.push_endpoint , event_entry.arn_topic ,
@@ -274,12 +284,14 @@ class Manager : public DoutPrefixProvider {
274284 " for entry: " << entry.marker << dendl;
275285 const auto ret = push_endpoint->send_to_completion_async (cct, event_entry.event , optional_yield (io_context, yield));
276286 if (ret < 0 ) {
277- ldpp_dout (this , 5 ) << " WARNING: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint
278- << " failed. error: " << ret << " (will retry)" << dendl;
287+ ldpp_dout (this , 5 ) << " WARNING: push entry marker: " << entry.marker
288+ << " failed. error: " << ret
289+ << " (will retry) for event with " << event_entry
290+ << dendl;
279291 return EntryProcessingResult::Failure;
280292 } else {
281- ldpp_dout (this , 20 ) << " INFO: push entry: " << entry. marker << " to endpoint : " << event_entry. push_endpoint
282- << " ok" << dendl;
293+ ldpp_dout (this , 5 ) << " INFO: push entry marker: " << entry. marker
294+ << " ok for event with " << event_entry << dendl;
283295 if (perfcounter) perfcounter->inc (l_rgw_pubsub_push_ok);
284296 return EntryProcessingResult::Successful;
285297 }
@@ -307,12 +319,15 @@ class Manager : public DoutPrefixProvider {
307319 auto ret = rgw_rados_operate (this , rados_store.getRados ()->get_notif_pool_ctx (), queue_name, &op, optional_yield (io_context, yield));
308320 if (ret == -ENOENT) {
309321 // queue was deleted
310- ldpp_dout (this , 5 ) << " INFO: queue: "
311- << queue_name << " . was removed. cleanup will stop" << dendl;
322+ ldpp_dout (this , 10 ) << " INFO: queue: " << queue_name
323+ << " . was removed. cleanup will stop" << dendl;
312324 return ;
313325 }
314326 if (ret == -EBUSY) {
315- ldpp_dout (this , 5 ) << " WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
327+ ldpp_dout (this , 10 )
328+ << " WARNING: queue: " << queue_name
329+ << " ownership moved to another daemon. processing will stop"
330+ << dendl;
316331 return ;
317332 }
318333 if (ret < 0 ) {
@@ -370,13 +385,16 @@ class Manager : public DoutPrefixProvider {
370385 if (ret == -ENOENT) {
371386 // queue was deleted
372387 topics_persistency_tracker.erase (queue_name);
373- ldpp_dout (this , 5 ) << " INFO: queue: "
374- << queue_name << " . was removed. processing will stop" << dendl;
388+ ldpp_dout (this , 10 ) << " INFO: queue: " << queue_name
389+ << " . was removed. processing will stop" << dendl;
375390 return ;
376391 }
377392 if (ret == -EBUSY) {
378393 topics_persistency_tracker.erase (queue_name);
379- ldpp_dout (this , 5 ) << " WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
394+ ldpp_dout (this , 10 )
395+ << " WARNING: queue: " << queue_name
396+ << " ownership moved to another daemon. processing will stop"
397+ << dendl;
380398 return ;
381399 }
382400 if (ret < 0 ) {
@@ -471,11 +489,15 @@ class Manager : public DoutPrefixProvider {
471489 auto ret = rgw_rados_operate (this , rados_ioctx, queue_name, &op, optional_yield (io_context, yield));
472490 if (ret == -ENOENT) {
473491 // queue was deleted
474- ldpp_dout (this , 5 ) << " INFO: queue: " << queue_name << " . was removed. processing will stop" << dendl;
492+ ldpp_dout (this , 10 ) << " INFO: queue: " << queue_name
493+ << " . was removed. processing will stop" << dendl;
475494 return ;
476495 }
477496 if (ret == -EBUSY) {
478- ldpp_dout (this , 5 ) << " WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
497+ ldpp_dout (this , 10 )
498+ << " WARNING: queue: " << queue_name
499+ << " ownership moved to another daemon. processing will stop"
500+ << dendl;
479501 return ;
480502 }
481503 if (ret < 0 ) {
@@ -1191,23 +1213,16 @@ int publish_commit(rgw::sal::Object* obj,
11911213 dpp->get_cct (), event_entry.event , res.yield );
11921214 if (ret < 0 ) {
11931215 ldpp_dout (dpp, 1 )
1194- << " ERROR: push to endpoint " << topic.cfg .dest .push_endpoint
1195- << " bucket: " << event_entry.event .bucket_name
1196- << " bucket_owner: " << event_entry.event .bucket_ownerIdentity
1197- << " object_name: " << event_entry.event .object_key
1198- << " failed. error: " << ret << dendl;
1216+ << " ERROR: failed to push sync notification event with error: "
1217+ << ret << " for event with " << event_entry << dendl;
11991218 if (perfcounter) perfcounter->inc (l_rgw_pubsub_push_failed);
12001219 return ret;
12011220 }
12021221 if (perfcounter) perfcounter->inc (l_rgw_pubsub_push_ok);
12031222 } catch (const RGWPubSubEndpoint::configuration_error& e) {
1204- ldpp_dout (dpp, 1 ) << " ERROR: failed to create push endpoint: "
1205- << topic.cfg .dest .push_endpoint
1206- << " bucket: " << event_entry.event .bucket_name
1207- << " bucket_owner: "
1208- << event_entry.event .bucket_ownerIdentity
1209- << " object_name: " << event_entry.event .object_key
1210- << " . error: " << e.what () << dendl;
1223+ ldpp_dout (dpp, 1 ) << " ERROR: failed to create push endpoint for sync "
1224+ " notification event with error: "
1225+ << e.what () << " event with " << event_entry << dendl;
12111226 if (perfcounter) perfcounter->inc (l_rgw_pubsub_push_failed);
12121227 return -EINVAL;
12131228 }
0 commit comments