66#include " common/WorkQueue.h"
77#include " common/Throttle.h"
88#include " common/errno.h"
9+ #include " common/perf_counters_key.h"
910
1011#include " rgw_common.h"
1112#include " rgw_zone.h"
@@ -300,12 +301,14 @@ struct read_remote_data_log_response {
300301 string marker;
301302 bool truncated;
302303 vector<rgw_data_change_log_entry> entries;
304+ real_time last_update;
303305
304306 read_remote_data_log_response () : truncated(false ) {}
305307
306308 void decode_json (JSONObj *obj) {
307309 JSONDecoder::decode_json (" marker" , marker, obj);
308310 JSONDecoder::decode_json (" truncated" , truncated, obj);
311+ JSONDecoder::decode_json (" last_update" , last_update, obj);
309312 JSONDecoder::decode_json (" entries" , entries, obj);
310313 };
311314};
@@ -321,6 +324,7 @@ class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
321324 string *pnext_marker;
322325 vector<rgw_data_change_log_entry> *entries;
323326 bool *truncated;
327+ real_time *last_update;
324328
325329 read_remote_data_log_response response;
326330 std::optional<TOPNSPC::common::PerfGuard> timer;
@@ -332,10 +336,10 @@ class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
332336 RGWReadRemoteDataLogShardCR (RGWDataSyncCtx *_sc, int _shard_id,
333337 const std::string& marker, string *pnext_marker,
334338 vector<rgw_data_change_log_entry> *_entries,
335- bool *_truncated)
339+ bool *_truncated, real_time *_last_update )
336340 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
337341 shard_id (_shard_id), marker(marker), pnext_marker(pnext_marker),
338- entries(_entries), truncated(_truncated) {
342+ entries(_entries), truncated(_truncated), last_update(_last_update) {
339343 }
340344
341345 int operate (const DoutPrefixProvider *dpp) override {
@@ -397,6 +401,7 @@ class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
397401 entries->swap (response.entries );
398402 *pnext_marker = response.marker ;
399403 *truncated = response.truncated ;
404+ *last_update = response.last_update ;
400405 return set_cr_done ();
401406 }
402407 }
@@ -1109,22 +1114,44 @@ class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, strin
11091114 rgw_data_sync_marker sync_marker;
11101115 RGWSyncTraceNodeRef tn;
11111116 RGWObjVersionTracker& objv;
1117+ sync_deltas::SyncDeltaCountersManager sync_delta_counters_manager;
11121118
11131119public:
11141120 RGWDataSyncShardMarkerTrack (RGWDataSyncCtx *_sc,
11151121 const string& _marker_oid,
11161122 const rgw_data_sync_marker& _marker,
1117- RGWSyncTraceNodeRef& _tn, RGWObjVersionTracker& objv) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
1123+ RGWSyncTraceNodeRef& _tn,
1124+ RGWObjVersionTracker& objv,
1125+ const uint32_t shard_id) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
11181126 sc (_sc), sync_env(_sc->env),
11191127 marker_oid(_marker_oid),
11201128 sync_marker(_marker),
1121- tn(_tn), objv(objv) {}
1129+ tn(_tn), objv(objv),
1130+ sync_delta_counters_manager(init_keys(shard_id), _sc->env->cct) {}
11221131
1123- RGWCoroutine* store_marker (const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
1132+ std::string init_keys (const uint32_t shard_id) {
1133+ std::string sz_id = sc->source_zone .id ;
1134+ std::string lz_id = sc->env ->svc ->zone ->get_zone_params ().get_id ();
1135+ return ceph::perf_counters::key_create (rgw_sync_delta_counters_key,
1136+ {{" local-zone-id" , lz_id},
1137+ {" source-zone-id" , sz_id},
1138+ {" shard-id" , std::to_string (shard_id)}});
1139+ }
1140+
1141+ RGWCoroutine* store_marker (const string& new_marker, uint64_t index_pos, const real_time& timestamp, const real_time& last_update) override {
11241142 sync_marker.marker = new_marker;
11251143 sync_marker.pos = index_pos;
11261144 sync_marker.timestamp = timestamp;
11271145
1146+ // Since store_marker() is called by full and incremental sync but
1147+ // last_update is only modified during incremental sync we only want to
1148+ // report deltas for incremental sync
1149+ real_time zero_time;
1150+ if (last_update != zero_time) {
1151+ auto delta = last_update - timestamp;
1152+ sync_delta_counters_manager.tset (sync_deltas::l_rgw_datalog_sync_delta, delta);
1153+ }
1154+
11281155 tn->log (20 , SSTR (" updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
11291156
11301157 return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->dpp , sync_env->driver ,
@@ -1814,7 +1841,7 @@ class RGWDataFullSyncShardCR : public RGWDataBaseSyncShardCR {
18141841 reenter (this ) {
18151842 tn->log (10 , " start full sync" );
18161843 oid = full_data_sync_index_shard_oid (sc->source_zone , shard_id);
1817- marker_tracker.emplace (sc, status_oid, sync_marker, tn, objv);
1844+ marker_tracker.emplace (sc, status_oid, sync_marker, tn, objv, shard_id );
18181845 total_entries = sync_marker.pos ;
18191846 entry_timestamp = sync_marker.timestamp ; // time when full sync started
18201847 do {
@@ -1927,6 +1954,7 @@ class RGWDataIncSyncShardCR : public RGWDataBaseSyncShardCR {
19271954
19281955 string next_marker;
19291956 vector<rgw_data_change_log_entry> log_entries;
1957+ real_time last_update;
19301958 decltype (log_entries)::iterator log_iter;
19311959 bool truncated = false ;
19321960 int cbret = 0 ;
@@ -1968,7 +1996,7 @@ class RGWDataIncSyncShardCR : public RGWDataBaseSyncShardCR {
19681996 int operate (const DoutPrefixProvider *dpp) override {
19691997 reenter (this ) {
19701998 tn->log (10 , " start incremental sync" );
1971- marker_tracker.emplace (sc, status_oid, sync_marker, tn, objv);
1999+ marker_tracker.emplace (sc, status_oid, sync_marker, tn, objv, shard_id );
19722000 do {
19732001 if (!lease_cr->is_locked ()) {
19742002 lost_lock = true ;
@@ -2073,7 +2101,7 @@ class RGWDataIncSyncShardCR : public RGWDataBaseSyncShardCR {
20732101 yield call (new RGWReadRemoteDataLogShardCR (sc, shard_id,
20742102 sync_marker.marker ,
20752103 &next_marker, &log_entries,
2076- &truncated));
2104+ &truncated, &last_update ));
20772105 if (retcode < 0 && retcode != -ENOENT) {
20782106 tn->log (0 , SSTR (" ERROR: failed to read remote data log info: ret="
20792107 << retcode));
@@ -2100,11 +2128,11 @@ class RGWDataIncSyncShardCR : public RGWDataBaseSyncShardCR {
21002128 tn->log (1 , SSTR (" failed to parse bucket shard: "
21012129 << log_iter->entry .key ));
21022130 marker_tracker->try_update_high_marker (log_iter->log_id , 0 ,
2103- log_iter->log_timestamp );
2131+ log_iter->log_timestamp , last_update );
21042132 continue ;
21052133 }
21062134 if (!marker_tracker->start (log_iter->log_id , 0 ,
2107- log_iter->log_timestamp )) {
2135+ log_iter->log_timestamp , last_update )) {
21082136 tn->log (0 , SSTR (" ERROR: cannot start syncing " << log_iter->log_id
21092137 << " . Duplicate entry?" ));
21102138 } else {
@@ -3922,6 +3950,7 @@ class RGWReadPendingBucketShardsCoroutine : public RGWCoroutine {
39223950 std::string next_marker;
39233951 vector<rgw_data_change_log_entry> log_entries;
39243952 bool truncated;
3953+ real_time last_update;
39253954
39263955public:
39273956 RGWReadPendingBucketShardsCoroutine (RGWDataSyncCtx *_sc, const int _shard_id,
@@ -3956,7 +3985,7 @@ int RGWReadPendingBucketShardsCoroutine::operate(const DoutPrefixProvider *dpp)
39563985 count = 0 ;
39573986 do {
39583987 yield call (new RGWReadRemoteDataLogShardCR (sc, shard_id, marker,
3959- &next_marker, &log_entries, &truncated));
3988+ &next_marker, &log_entries, &truncated, &last_update ));
39603989
39613990 if (retcode == -ENOENT) {
39623991 break ;
@@ -4223,7 +4252,7 @@ class RGWBucketFullSyncMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj_key,
42234252 {}
42244253
42254254
4226- RGWCoroutine *store_marker (const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
4255+ RGWCoroutine *store_marker (const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp, const real_time& last_update ) override {
42274256 sync_status.full .position = new_marker;
42284257 sync_status.full .count = index_pos;
42294258
@@ -4324,7 +4353,7 @@ class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string,
43244353
43254354 const rgw_raw_obj& get_obj () const { return obj; }
43264355
4327- RGWCoroutine* store_marker (const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
4356+ RGWCoroutine* store_marker (const string& new_marker, uint64_t index_pos, const real_time& timestamp, const real_time& last_update ) override {
43284357 sync_marker.position = new_marker;
43294358 sync_marker.timestamp = timestamp;
43304359
0 commit comments