@@ -652,6 +652,8 @@ int RGWAsyncFetchRemoteObj::_send_request(const DoutPrefixProvider *dpp)
652652 rgw::sal::RadosObject src_obj (store, key, &bucket);
653653 rgw::sal::RadosBucket dest_bucket (store, dest_bucket_info);
654654 rgw::sal::RadosObject dest_obj (store, dest_key.value_or (key), &dest_bucket);
655+
656+ std::string etag;
655657
656658 std::optional<uint64_t > bytes_transferred;
657659 int r = store->getRados ()->fetch_remote_obj (obj_ctx,
@@ -662,8 +664,8 @@ int RGWAsyncFetchRemoteObj::_send_request(const DoutPrefixProvider *dpp)
662664 &src_obj,
663665 &dest_bucket, /* dest */
664666 nullptr , /* source */
665- dest_placement_rule,
666- NULL , /* real_time* src_mtime, */
667+ dest_placement_rule,
668+ nullptr , /* real_time* src_mtime, */
667669 NULL , /* real_time* mtime, */
668670 NULL , /* const real_time* mod_ptr, */
669671 NULL , /* const real_time* unmod_ptr, */
@@ -677,7 +679,7 @@ int RGWAsyncFetchRemoteObj::_send_request(const DoutPrefixProvider *dpp)
677679 versioned_epoch,
678680 real_time (), /* delete_at */
679681 NULL , /* string *ptag, */
680- NULL , /* string *petag, */
682+ &etag , /* string *petag, */
681683 NULL , /* void (*progress_cb)(off_t, void *), */
682684 NULL , /* void *progress_data*); */
683685 dpp,
@@ -690,12 +692,53 @@ int RGWAsyncFetchRemoteObj::_send_request(const DoutPrefixProvider *dpp)
690692 if (counters) {
691693 counters->inc (sync_counters::l_fetch_err, 1 );
692694 }
693- } else if (counters) {
694- if (bytes_transferred) {
695- counters->inc (sync_counters::l_fetch, *bytes_transferred);
696- } else {
697- counters->inc (sync_counters::l_fetch_not_modified);
698- }
695+ } else {
696+ // r >= 0
697+ if (bytes_transferred) {
698+ // send notification that object was succesfully synced
699+ std::string user_id = " rgw sync" ;
700+ std::string req_id = " 0" ;
701+
702+ RGWObjTags obj_tags;
703+ auto iter = attrs.find (RGW_ATTR_TAGS);
704+ if (iter != attrs.end ()) {
705+ try {
706+ auto it = iter->second .cbegin ();
707+ obj_tags.decode (it);
708+ } catch (buffer::error &err) {
709+ ldpp_dout (dpp, 1 ) << " ERROR: " << __func__ << " : caught buffer::error couldn't decode TagSet " << dendl;
710+ }
711+ }
712+
713+ // NOTE: we create a mutable copy of bucket.get_tenant as the get_notification function expects a std::string&, not const
714+ std::string tenant (dest_bucket.get_tenant ());
715+
716+ std::unique_ptr<rgw::sal::Notification> notify
717+ = store->get_notification (dpp, &dest_obj, nullptr , rgw::notify::ObjectSyncedCreate,
718+ &dest_bucket, user_id,
719+ tenant,
720+ req_id, null_yield);
721+
722+ auto notify_res = static_cast <rgw::sal::RadosNotification*>(notify.get ())->get_reservation ();
723+ int ret = rgw::notify::publish_reserve (dpp, rgw::notify::ObjectSyncedCreate, notify_res, &obj_tags);
724+ if (ret < 0 ) {
725+ ldpp_dout (dpp, 1 ) << " ERROR: reserving notification failed, with error: " << ret << dendl;
726+ // no need to return, the sync already happened
727+ } else {
728+ ret = rgw::notify::publish_commit (&dest_obj, dest_obj.get_obj_size (), ceph::real_clock::now (), etag, dest_obj.get_instance (), rgw::notify::ObjectSyncedCreate, notify_res, dpp);
729+ if (ret < 0 ) {
730+ ldpp_dout (dpp, 1 ) << " ERROR: publishing notification failed, with error: " << ret << dendl;
731+ }
732+ }
733+ }
734+
735+ if (counters) {
736+ if (bytes_transferred) {
737+ counters->inc (sync_counters::l_fetch, *bytes_transferred);
738+ } else {
739+ counters->inc (sync_counters::l_fetch_not_modified);
740+ }
741+ }
699742 }
700743 return r;
701744}
0 commit comments