@@ -46,8 +46,9 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk
4646 });
4747 m_next_dsn = m_rd_sb->last_applied_dsn + 1 ;
4848 m_commit_upto_lsn = m_rd_sb->durable_commit_lsn ;
49- m_last_flushed_commit_lsn = m_commit_upto_lsn ;
49+ m_last_flushed_cp_lsn = m_rd_sb-> checkpoint_lsn ;
5050 m_compact_lsn = m_rd_sb->compact_lsn ;
51+ m_last_flushed_compact_lsn = m_compact_lsn;
5152
5253 m_rdev_name = m_rd_sb->rdev_name ;
5354 // Its ok not to do compare exchange, because loading is always single threaded as of now
@@ -818,6 +819,54 @@ void RaftReplDev::on_create_snapshot(nuraft::snapshot& s, nuraft::async_result<
818819 if (when_done) { when_done (ret_val, null_except); }
819820}
820821
822+ void RaftReplDev::trigger_snapshot_creation (repl_lsn_t compact_lsn, bool wait_for_commit) {
823+ // Step 1. Update truncation boundary if compact_lsn is specified
824+ if (compact_lsn >= 0 ) {
825+ // Step 1.1 Wait for commit upto compact_lsn if needed
826+ if (wait_for_commit) {
827+ RD_LOGI (NO_TRACE_ID, " Waiting for commit upto compact_lsn={}, current_commit_lsn={}" , compact_lsn,
828+ m_commit_upto_lsn.load ());
829+ while (compact_lsn > m_commit_upto_lsn.load ()) {
830+ std::this_thread::sleep_for (std::chrono::milliseconds (100 ));
831+ }
832+ }
833+ // Step 1.2 trigger cp_flush to make sure all changes are flushed to disk before updating truncation boundary
834+ hs ()->cp_mgr ().trigger_cp_flush (true /* force*/ ).get ();
835+ RD_LOGI (NO_TRACE_ID, " cp_flush completed before updating truncation boundary to lsn={}" , compact_lsn);
836+ // Step 1.3 Update truncation boundary
837+ RD_LOGI (NO_TRACE_ID, " Updating truncation boundary to lsn={}, current_truncation_boundary={}" , compact_lsn,
838+ m_truncation_upper_limit.load ());
839+ update_truncation_boundary (compact_lsn);
840+ }
841+
842+ // Step 2. Create snapshot manually
843+ auto result = raft_server ()->create_snapshot ();
844+ if (result == 0 ) {
845+ RD_LOGE (NO_TRACE_ID, " Failed to create snapshot - another snapshot may be in progress" );
846+ return ;
847+ }
848+ RD_LOGI (NO_TRACE_ID, " Successfully created snapshot, result log_idx={}" , result);
849+
850+ // Step 3. Make sure logs are compacted upto compact_lsn after snapshot created
851+ // Since we want to compact logs upto compact_lsn after the snapshot created, here we check the compact_lsn and
852+ // trigger a compact manually if needed (since logs will only to truncated upto min(m_truncation_upper_limit,
853+ // snp_idx-snp_distance), there is a that real_compact_lsn < compact_lsn after snapshot created).
854+ // Note that manual compact may cause concurrency issue with the automatic log compaction in raft server. Currently,
855+ // it doesn't matter because this API is used in op scenario when members syncup and no new write are handled. If we
856+ // want to use it in more phase, we need to add lock to avoid concurrency issue.
857+ if (compact_lsn > 0 && m_compact_lsn.load () < compact_lsn) {
858+ RD_LOGI (NO_TRACE_ID, " Manually compacting logs upto lsn={} after snapshot, current_compact_lsn={}" , compact_lsn,
859+ m_compact_lsn.load ());
860+ m_data_journal->compact (compact_lsn);
861+ RD_LOGI (NO_TRACE_ID, " Compacted logs upto lsn={} after snapshot" , compact_lsn);
862+ }
863+
864+ // Step 4. trigger cp_flush to make sure all changes are flushed to disk after snapshot creation and log compaction
865+ hs ()->cp_mgr ().trigger_cp_flush (true /* force*/ ).get ();
866+ RD_LOGI (NO_TRACE_ID, " cp_flush completed after snapshot creation and log compaction" );
867+ RD_LOGI (NO_TRACE_ID, " snapshot creation and compaction completed" );
868+ }
869+
821870void RaftReplDev::propose_truncate_boundary () {
822871 init_req_counter counter (pending_request_num);
823872 auto repl_status = get_replication_status ();
@@ -1591,7 +1640,7 @@ void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) {
15911640 complete_replace_member (rreq);
15921641 break ;
15931642 case journal_type_t ::HS_CTRL_UPDATE_TRUNCATION_BOUNDARY:
1594- update_truncation_boundary (rreq);
1643+ update_truncation_boundary (r_cast< const truncate_ctx* >( rreq-> header (). cbytes ())-> truncation_upper_limit );
15951644 break ;
15961645 case journal_type_t ::HS_CTRL_REMOVE_MEMBER:
15971646 remove_member (rreq);
@@ -1755,7 +1804,7 @@ void RaftReplDev::clean_replace_member_task(repl_req_ptr_t rreq) {
17551804 RD_LOGI (rreq->traceID (), " Raft repl replace_member_task has been cleared, task_id={}" , task_id);
17561805}
17571806
1758- void RaftReplDev::update_truncation_boundary (repl_req_ptr_t rreq ) {
1807+ void RaftReplDev::update_truncation_boundary (repl_lsn_t truncation_upper_limit ) {
17591808 repl_lsn_t cur_checkpoint_lsn = 0 ;
17601809 {
17611810 std::unique_lock lg{m_sb_mtx};
@@ -1764,8 +1813,7 @@ void RaftReplDev::update_truncation_boundary(repl_req_ptr_t rreq) {
17641813 // expected truncation_upper_limit should not larger than the current checkpoint_lsn, this is to ensure that
17651814 // when a crash happens before index flushed to disk, all the logs larger than checkpoint_lsn are still available
17661815 // to replay.
1767- auto ctx = r_cast< const truncate_ctx* >(rreq->header ().cbytes ());
1768- auto exp_truncation_upper_limit = std::min (ctx->truncation_upper_limit , cur_checkpoint_lsn);
1816+ auto exp_truncation_upper_limit = std::min (truncation_upper_limit, cur_checkpoint_lsn);
17691817 auto cur_truncation_upper_limit = m_truncation_upper_limit.load ();
17701818 // exp_truncation_upper_limit might be less or equal to cur_truncation_upper_limit after Baseline Re-sync,
17711819 // we should skip update to ensure the truncation_upper_limit is always increasing.
@@ -2389,8 +2437,9 @@ void RaftReplDev::cp_flush(CP* cp, cshared< ReplDevCPContext > ctx) {
23892437 auto const clsn = ctx->compacted_to_lsn ;
23902438 auto const dsn = ctx->last_applied_dsn ;
23912439
2392- if (lsn == m_last_flushed_commit_lsn) {
2393- // Not dirtied since last flush ignore
2440+ // compact can be triggered manually while no new logs are committed, so both lsn and clsn need to be checked
2441+ if (lsn == m_last_flushed_cp_lsn && clsn == m_last_flushed_compact_lsn) {
2442+ // Not dirtied since last cp flush ignore
23942443 return ;
23952444 }
23962445
@@ -2402,7 +2451,8 @@ void RaftReplDev::cp_flush(CP* cp, cshared< ReplDevCPContext > ctx) {
24022451 m_rd_sb->checkpoint_lsn = lsn;
24032452 m_rd_sb->last_applied_dsn = dsn;
24042453 m_rd_sb.write ();
2405- m_last_flushed_commit_lsn = lsn;
2454+ m_last_flushed_cp_lsn = lsn;
2455+ m_last_flushed_compact_lsn = clsn;
24062456 RD_LOGD (NO_TRACE_ID, " cp flush in raft repl dev, lsn={}, clsn={}, next_dsn={}, cp string:{}" , lsn, clsn,
24072457 m_next_dsn.load (), cp->to_string ());
24082458}
0 commit comments