66#include " common/WorkQueue.h"
77#include " common/Throttle.h"
88#include " common/errno.h"
9+ #include " common/perf_counters_key.h"
910
1011#include " rgw_common.h"
1112#include " rgw_zone.h"
@@ -300,12 +301,14 @@ struct read_remote_data_log_response {
300301 string marker;
301302 bool truncated;
302303 vector<rgw_data_change_log_entry> entries;
304+ real_time last_update;
303305
304306 read_remote_data_log_response () : truncated(false ) {}
305307
306308 void decode_json (JSONObj *obj) {
307309 JSONDecoder::decode_json (" marker" , marker, obj);
308310 JSONDecoder::decode_json (" truncated" , truncated, obj);
311+ JSONDecoder::decode_json (" last_update" , last_update, obj);
309312 JSONDecoder::decode_json (" entries" , entries, obj);
310313 };
311314};
@@ -321,6 +324,7 @@ class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
321324 string *pnext_marker;
322325 vector<rgw_data_change_log_entry> *entries;
323326 bool *truncated;
327+ real_time *last_update;
324328
325329 read_remote_data_log_response response;
326330 std::optional<TOPNSPC::common::PerfGuard> timer;
@@ -332,10 +336,10 @@ class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
332336 RGWReadRemoteDataLogShardCR (RGWDataSyncCtx *_sc, int _shard_id,
333337 const std::string& marker, string *pnext_marker,
334338 vector<rgw_data_change_log_entry> *_entries,
335- bool *_truncated)
339+ bool *_truncated, real_time *_last_update )
336340 : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
337341 shard_id (_shard_id), marker(marker), pnext_marker(pnext_marker),
338- entries(_entries), truncated(_truncated) {
342+ entries(_entries), truncated(_truncated), last_update(_last_update) {
339343 }
340344
341345 int operate (const DoutPrefixProvider *dpp) override {
@@ -397,6 +401,7 @@ class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
397401 entries->swap (response.entries );
398402 *pnext_marker = response.marker ;
399403 *truncated = response.truncated ;
404+ *last_update = response.last_update ;
400405 return set_cr_done ();
401406 }
402407 }
@@ -1109,22 +1114,54 @@ class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, strin
11091114 rgw_data_sync_marker sync_marker;
11101115 RGWSyncTraceNodeRef tn;
11111116 RGWObjVersionTracker& objv;
1117+ sync_deltas::SyncDeltaCountersManager sync_delta_counters_manager;
1118+
1119+ // timestamp of remote's most recent log entry. initialized only for data sync
1120+ ceph::real_time last_updated;
11121121
11131122public:
11141123 RGWDataSyncShardMarkerTrack (RGWDataSyncCtx *_sc,
11151124 const string& _marker_oid,
11161125 const rgw_data_sync_marker& _marker,
1117- RGWSyncTraceNodeRef& _tn, RGWObjVersionTracker& objv) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
1126+ RGWSyncTraceNodeRef& _tn,
1127+ RGWObjVersionTracker& objv,
1128+ const uint32_t shard_id) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
11181129 sc (_sc), sync_env(_sc->env),
11191130 marker_oid(_marker_oid),
11201131 sync_marker(_marker),
1121- tn(_tn), objv(objv) {}
1132+ tn(_tn), objv(objv),
1133+ sync_delta_counters_manager(init_keys(shard_id), _sc->env->cct) {}
1134+
1135+ std::string init_keys (const uint32_t shard_id) {
1136+ std::string sz_id = sc->source_zone .id ;
1137+ std::string lz_id = sc->env ->svc ->zone ->get_zone_params ().get_id ();
1138+ return ceph::perf_counters::key_create (rgw_sync_delta_counters_key,
1139+ {{" local_zone_id" , lz_id},
1140+ {" source_zone_id" , sz_id},
1141+ {" shard_id" , std::to_string (shard_id)}});
1142+ }
1143+
1144+ bool start (const std::string& pos, int index_pos, const real_time& timestamp, const real_time& last_update = {}) {
1145+ if (last_updated < last_update) {
1146+ last_updated = last_update;
1147+ }
1148+ return RGWSyncShardMarkerTrack::start (pos, index_pos, timestamp);
1149+ }
11221150
11231151 RGWCoroutine* store_marker (const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
11241152 sync_marker.marker = new_marker;
11251153 sync_marker.pos = index_pos;
11261154 sync_marker.timestamp = timestamp;
11271155
1156+ // Since store_marker() is called by full and incremental sync but
1157+ // last_update is only modified during incremental sync we only want to
1158+ // report deltas for incremental sync
1159+ real_time zero_time;
1160+ if (last_updated != zero_time) {
1161+ auto delta = last_updated - timestamp;
1162+ sync_delta_counters_manager.tset (sync_deltas::l_rgw_datalog_sync_delta, delta);
1163+ }
1164+
11281165 tn->log (20 , SSTR (" updating marker marker_oid=" << marker_oid << " marker=" << new_marker));
11291166
11301167 return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->dpp , sync_env->driver ,
@@ -1814,7 +1851,7 @@ class RGWDataFullSyncShardCR : public RGWDataBaseSyncShardCR {
18141851 reenter (this ) {
18151852 tn->log (10 , " start full sync" );
18161853 oid = full_data_sync_index_shard_oid (sc->source_zone , shard_id);
1817- marker_tracker.emplace (sc, status_oid, sync_marker, tn, objv);
1854+ marker_tracker.emplace (sc, status_oid, sync_marker, tn, objv, shard_id );
18181855 total_entries = sync_marker.pos ;
18191856 entry_timestamp = sync_marker.timestamp ; // time when full sync started
18201857 do {
@@ -1927,6 +1964,7 @@ class RGWDataIncSyncShardCR : public RGWDataBaseSyncShardCR {
19271964
19281965 string next_marker;
19291966 vector<rgw_data_change_log_entry> log_entries;
1967+ real_time last_update;
19301968 decltype (log_entries)::iterator log_iter;
19311969 bool truncated = false ;
19321970 int cbret = 0 ;
@@ -1968,7 +2006,7 @@ class RGWDataIncSyncShardCR : public RGWDataBaseSyncShardCR {
19682006 int operate (const DoutPrefixProvider *dpp) override {
19692007 reenter (this ) {
19702008 tn->log (10 , " start incremental sync" );
1971- marker_tracker.emplace (sc, status_oid, sync_marker, tn, objv);
2009+ marker_tracker.emplace (sc, status_oid, sync_marker, tn, objv, shard_id );
19722010 do {
19732011 if (!lease_cr->is_locked ()) {
19742012 lost_lock = true ;
@@ -2073,7 +2111,7 @@ class RGWDataIncSyncShardCR : public RGWDataBaseSyncShardCR {
20732111 yield call (new RGWReadRemoteDataLogShardCR (sc, shard_id,
20742112 sync_marker.marker ,
20752113 &next_marker, &log_entries,
2076- &truncated));
2114+ &truncated, &last_update ));
20772115 if (retcode < 0 && retcode != -ENOENT) {
20782116 tn->log (0 , SSTR (" ERROR: failed to read remote data log info: ret="
20792117 << retcode));
@@ -2104,7 +2142,7 @@ class RGWDataIncSyncShardCR : public RGWDataBaseSyncShardCR {
21042142 continue ;
21052143 }
21062144 if (!marker_tracker->start (log_iter->log_id , 0 ,
2107- log_iter->log_timestamp )) {
2145+ log_iter->log_timestamp , last_update )) {
21082146 tn->log (0 , SSTR (" ERROR: cannot start syncing " << log_iter->log_id
21092147 << " . Duplicate entry?" ));
21102148 } else {
@@ -3922,6 +3960,7 @@ class RGWReadPendingBucketShardsCoroutine : public RGWCoroutine {
39223960 std::string next_marker;
39233961 vector<rgw_data_change_log_entry> log_entries;
39243962 bool truncated;
3963+ real_time last_update;
39253964
39263965public:
39273966 RGWReadPendingBucketShardsCoroutine (RGWDataSyncCtx *_sc, const int _shard_id,
@@ -3956,7 +3995,7 @@ int RGWReadPendingBucketShardsCoroutine::operate(const DoutPrefixProvider *dpp)
39563995 count = 0 ;
39573996 do {
39583997 yield call (new RGWReadRemoteDataLogShardCR (sc, shard_id, marker,
3959- &next_marker, &log_entries, &truncated));
3998+ &next_marker, &log_entries, &truncated, &last_update ));
39603999
39614000 if (retcode == -ENOENT) {
39624001 break ;
0 commit comments