22// vim: ts=8 sw=2 smarttab
33
44#include < stack>
5+ #include < queue>
56#include < fcntl.h>
67#include < algorithm>
78#include < sys/time.h>
@@ -63,6 +64,12 @@ std::string entry_path(const std::string &dir, const std::string &name) {
6364 return dir + " /" + name;
6465}
6566
67+ std::string entry_diff_path (const std::string &dir, const std::string &name) {
68+ if (dir == " ." )
69+ return name;
70+ return dir + " /" + name;
71+ }
72+
6673std::map<std::string, std::string> decode_snap_metadata (snap_metadata *snap_metadata,
6774 size_t nr_snap_metadata) {
6875 std::map<std::string, std::string> metadata;
@@ -1210,17 +1217,12 @@ void PeerReplayer::post_sync_close_handles(const FHandles &fh) {
12101217 ceph_close (fh.p_mnt , fh.p_fd );
12111218}
12121219
1213- int PeerReplayer::do_synchronize (const std::string &dir_root, const Snapshot ¤t,
1214- boost::optional<Snapshot> prev) {
1220+ int PeerReplayer::do_synchronize (const std::string &dir_root, const Snapshot ¤t) {
12151221 dout (20 ) << " : dir_root=" << dir_root << " , current=" << current << dendl;
1216- if (prev) {
1217- dout (20 ) << " : incremental sync check from prev=" << prev << dendl;
1218- }
1219-
12201222 FHandles fh;
1221- int r = pre_sync_check_and_open_handles (dir_root, current, prev , &fh);
1223+ int r = pre_sync_check_and_open_handles (dir_root, current, boost::none , &fh);
12221224 if (r < 0 ) {
1223- dout (5 ) << " : cannot proceeed with sync: " << cpp_strerror (r) << dendl;
1225+ dout (5 ) << " : cannot proceed with sync: " << cpp_strerror (r) << dendl;
12241226 return r;
12251227 }
12261228
@@ -1371,6 +1373,177 @@ int PeerReplayer::do_synchronize(const std::string &dir_root, const Snapshot &cu
13711373 return r;
13721374}
13731375
1376+ int PeerReplayer::do_synchronize (const std::string &dir_root, const Snapshot ¤t,
1377+ boost::optional<Snapshot> prev) {
1378+ if (!prev) {
1379+ derr << " : invalid previous snapshot" << dendl;
1380+ return -ENODATA;
1381+ }
1382+
1383+ dout (20 ) << " : incremental sync check from prev=" << prev << dendl;
1384+
1385+ FHandles fh;
1386+ int r = pre_sync_check_and_open_handles (dir_root, current, prev, &fh);
1387+ if (r < 0 ) {
1388+ dout (5 ) << " : cannot proceed with sync: " << cpp_strerror (r) << dendl;
1389+ return r;
1390+ }
1391+
1392+ BOOST_SCOPE_EXIT_ALL ( (this )(&fh) ) {
1393+ post_sync_close_handles (fh);
1394+ };
1395+
1396+ // record that we are going to "dirty" the data under this directory root
1397+ auto snap_id_str{stringify (current.second )};
1398+ r = ceph_setxattr (m_remote_mount, dir_root.c_str (), " ceph.mirror.dirty_snap_id" ,
1399+ snap_id_str.c_str (), snap_id_str.size (), 0 );
1400+ if (r < 0 ) {
1401+ derr << " : error setting \" ceph.mirror.dirty_snap_id\" on dir_root=" << dir_root
1402+ << " : " << cpp_strerror (r) << dendl;
1403+ return r;
1404+ }
1405+
1406+ struct ceph_statx cstx;
1407+ r = ceph_fstatx (m_local_mount, fh.c_fd , &cstx,
1408+ CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
1409+ CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
1410+ AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
1411+ if (r < 0 ) {
1412+ derr << " : failed to stat snap=" << current.first << " : " << cpp_strerror (r)
1413+ << dendl;
1414+ return r;
1415+ }
1416+
1417+ ceph_snapdiff_info sd_info;
1418+ ceph_snapdiff_entry_t sd_entry;
1419+
1420+ // The queue of SyncEntry items (directories) to be synchronized.
1421+ // We follow a breadth first approach here based on the snapdiff output.
1422+ std::queue<SyncEntry> sync_queue;
1423+
1424+ // start with initial/default entry
1425+ std::string epath = " ." , npath = " " , nabs_path = " " , nname = " " ;
1426+ sync_queue.emplace (SyncEntry (epath, cstx));
1427+
1428+ while (!sync_queue.empty ()) {
1429+ if (should_backoff (dir_root, &r)) {
1430+ dout (0 ) << " : backing off r=" << r << dendl;
1431+ break ;
1432+ }
1433+ r = pre_sync_check_and_open_handles (dir_root, current, prev, &fh);
1434+ if (r < 0 ) {
1435+ dout (5 ) << " : cannot proceed with sync: " << cpp_strerror (r) << dendl;
1436+ return r;
1437+ }
1438+
1439+ dout (20 ) << " : " << sync_queue.size () << " entries in queue" << dendl;
1440+ const auto &queue_entry = sync_queue.front ();
1441+ epath = queue_entry.epath ;
1442+ dout (20 ) << " : syncing entry, path=" << epath << dendl;
1443+ r = ceph_open_snapdiff (fh.p_mnt , dir_root.c_str (), epath.c_str (),
1444+ stringify ((*prev).first ).c_str (), current.first .c_str (), &sd_info);
1445+ if (r != 0 ) {
1446+ derr << " : failed to open snapdiff, r=" << r << dendl;
1447+ return r;
1448+ }
1449+ while (0 < (r = ceph_readdir_snapdiff (&sd_info, &sd_entry))) {
1450+ if (r < 0 ) {
1451+ derr << " : failed to read directory=" << epath << dendl;
1452+ ceph_close_snapdiff (&sd_info);
1453+ return r;
1454+ }
1455+
1456+ // New entry found
1457+ nname = sd_entry.dir_entry .d_name ;
1458+ if (" ." == nname || " .." == nname)
1459+ continue ;
1460+ // create path for the newly found entry
1461+ npath = entry_diff_path (epath, nname);
1462+ nabs_path = entry_diff_path (dir_root, npath);
1463+
1464+ r = ceph_statx (sd_info.cmount , nabs_path.c_str (), &cstx,
1465+ CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
1466+ CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
1467+ AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
1468+ if (r < 0 ) {
1469+ // can't stat, so it's a deleted entry.
1470+ if (DT_DIR == sd_entry.dir_entry .d_type ) { // is a directory
1471+ r = cleanup_remote_dir (dir_root, npath, fh);
1472+ if (r < 0 ) {
1473+ derr << " : failed to remove directory=" << nabs_path << dendl;
1474+ break ;
1475+ }
1476+ }
1477+ else { // is a file
1478+ r = ceph_unlinkat (m_remote_mount, fh.r_fd_dir_root , npath.c_str (), 0 );
1479+ if (r < 0 ) {
1480+ break ;
1481+ }
1482+ }
1483+ } else {
1484+ // stat success, update the existing entry
1485+ struct ceph_statx tstx;
1486+ int rstat_r = ceph_statx (m_remote_mount, nabs_path.c_str (), &tstx,
1487+ CEPH_STATX_MODE | CEPH_STATX_UID | CEPH_STATX_GID |
1488+ CEPH_STATX_SIZE | CEPH_STATX_ATIME | CEPH_STATX_MTIME,
1489+ AT_STATX_DONT_SYNC | AT_SYMLINK_NOFOLLOW);
1490+ if (S_ISDIR (cstx.stx_mode )) { // is a directory
1491+ // cleanup if it's a file in the remotefs
1492+ if ((0 == rstat_r) && !S_ISDIR (tstx.stx_mode )) {
1493+ r = ceph_unlinkat (m_remote_mount, fh.r_fd_dir_root , npath.c_str (), 0 );
1494+ if (r < 0 ) {
1495+ derr << " : Error in directory sync. Failed to remove file="
1496+ << nabs_path << dendl;
1497+ break ;
1498+ }
1499+ }
1500+ r = remote_mkdir (npath, cstx, fh);
1501+ if (r < 0 ) {
1502+ break ;
1503+ }
1504+ // push it to sync_queue for later processing
1505+ sync_queue.emplace (SyncEntry (npath, cstx));
1506+ } else { // is a file
1507+ bool need_data_sync = true ;
1508+ bool need_attr_sync = true ;
1509+ r = should_sync_entry (npath, cstx, fh, &need_data_sync, &need_attr_sync);
1510+ if (r < 0 ) {
1511+ break ;
1512+ }
1513+ dout (5 ) << " : entry=" << npath << " , data_sync=" << need_data_sync
1514+ << " , attr_sync=" << need_attr_sync << dendl;
1515+ if (need_data_sync || need_attr_sync) {
1516+ // cleanup if it's a directory in the remotefs
1517+ if ((0 == rstat_r) && S_ISDIR (tstx.stx_mode )) {
1518+ r = cleanup_remote_dir (dir_root, npath, fh);
1519+ if (r < 0 ) {
1520+ derr << " : Error in file sync. Failed to remove remote directory="
1521+ << nabs_path << dendl;
1522+ break ;
1523+ }
1524+ }
1525+ r = remote_file_op (dir_root, npath, cstx, fh, need_data_sync, need_attr_sync);
1526+ if (r < 0 ) {
1527+ break ;
1528+ }
1529+ }
1530+ }
1531+ }
1532+ }
1533+ if (0 == r) {
1534+ dout (10 ) << " : successfully synchronized the entry=" << epath << dendl;
1535+ }
1536+
1537+ // Close the current open directory and take the next queue_entry, if success or failure.
1538+ r = ceph_close_snapdiff (&sd_info);
1539+ if (r != 0 ) {
1540+ derr << " : failed to close directory=" << epath << dendl;
1541+ }
1542+ sync_queue.pop ();
1543+ }
1544+ return r;
1545+ }
1546+
13741547int PeerReplayer::synchronize (const std::string &dir_root, const Snapshot ¤t,
13751548 boost::optional<Snapshot> prev) {
13761549 dout (20 ) << " : dir_root=" << dir_root << " , current=" << current << dendl;
@@ -1389,7 +1562,7 @@ int PeerReplayer::synchronize(const std::string &dir_root, const Snapshot &curre
13891562 if (r < 0 ) {
13901563 dout (5 ) << " : missing \" ceph.mirror.dirty_snap_id\" xattr on remote -- using"
13911564 << " incremental sync with remote scan" << dendl;
1392- r = do_synchronize (dir_root, current, boost::none );
1565+ r = do_synchronize (dir_root, current);
13931566 } else {
13941567 size_t xlen = r;
13951568 char *val = (char *)alloca (xlen+1 );
@@ -1410,7 +1583,7 @@ int PeerReplayer::synchronize(const std::string &dir_root, const Snapshot &curre
14101583 r = do_synchronize (dir_root, current, prev);
14111584 } else {
14121585 dout (5 ) << " : mismatch -- using incremental sync with remote scan" << dendl;
1413- r = do_synchronize (dir_root, current, boost::none );
1586+ r = do_synchronize (dir_root, current);
14141587 }
14151588 }
14161589
0 commit comments