|
22 | 22 | #include "common/homestore_config.hpp" |
23 | 23 | #include "common/homestore_assert.hpp" |
24 | 24 | #include "replication/service/raft_repl_service.h" |
| 25 | + |
| 26 | +#include <latch> |
| 27 | + |
25 | 28 | #include "replication/repl_dev/raft_repl_dev.h" |
26 | 29 |
|
27 | 30 | namespace homestore { |
@@ -624,57 +627,63 @@ void RaftReplService::trigger_snapshot_creation(group_id_t group_id, repl_lsn_t |
624 | 627 |
|
625 | 628 | ////////////////////// Reaper Thread related ////////////////////////////////// |
626 | 629 | void RaftReplService::start_repl_service_timers() { |
627 | | - // we need to explictly cancel the timers before we stop the repl_devs, but we cannot cancel a thread timer |
628 | | - // explictly(and exception will be threw out), so here we create a seperate gloable timer for each of them. |
629 | | - |
630 | 630 | // Schedule the rdev garbage collector timer |
631 | | - LOGINFOMOD(replication, "Reaper Thread: scheduling GC every {} seconds", |
632 | | - HS_DYNAMIC_CONFIG(generic.repl_dev_cleanup_interval_sec)); |
633 | | - m_rdev_gc_timer_hdl = iomanager.schedule_global_timer( |
634 | | - HS_DYNAMIC_CONFIG(generic.repl_dev_cleanup_interval_sec) * 1000 * 1000 * 1000, true /* recurring */, nullptr, |
635 | | - iomgr::reactor_regex::all_worker, |
636 | | - [this](void*) { |
637 | | - LOGDEBUGMOD(replication, "Reaper Thread: Doing GC"); |
638 | | - gc_repl_reqs(); |
639 | | - gc_repl_devs(); |
640 | | - }, |
641 | | - true /* wait_to_schedule */); |
642 | | - |
643 | | - // Check for queued fetches at the minimum every second |
644 | | - uint64_t interval_ns = |
645 | | - std::min(HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_ms) * 1000 * 1000, 1ul * 1000 * 1000 * 1000); |
646 | | - m_rdev_fetch_timer_hdl = iomanager.schedule_global_timer( |
647 | | - interval_ns, true /* recurring */, nullptr, iomgr::reactor_regex::all_worker, |
648 | | - [this](void*) { fetch_pending_data(); }, true /* wait_to_schedule */); |
649 | | - |
650 | | - // Flush durable commit lsns to superblock |
651 | | - // FIXUP: what is the best value for flush_durable_commit_interval_ms? |
652 | | - m_flush_durable_commit_timer_hdl = iomanager.schedule_global_timer( |
653 | | - HS_DYNAMIC_CONFIG(consensus.flush_durable_commit_interval_ms) * 1000 * 1000, true /* recurring */, nullptr, |
654 | | - iomgr::reactor_regex::all_worker, [this](void*) { flush_durable_commit_lsn(); }, true /* wait_to_schedule */); |
655 | | - |
656 | | - m_replace_member_sync_check_timer_hdl = iomanager.schedule_global_timer( |
657 | | - HS_DYNAMIC_CONFIG(consensus.replace_member_sync_check_interval_ms) * 1000 * 1000, true /* recurring */, nullptr, |
658 | | - iomgr::reactor_regex::all_worker, [this](void*) { monitor_replace_member_replication_status(); }, |
659 | | - true /* wait_to_schedule */); |
| 631 | + std::latch latch{1}; |
| 632 | + iomanager.create_reactor("raft_repl_svc_timer", iomgr::INTERRUPT_LOOP, 1u, [this, &latch](bool is_started) { |
| 633 | + if (is_started) { |
| 634 | + m_reaper_fiber = iomanager.iofiber_self(); |
| 635 | + LOGINFOMOD(replication, "Reaper Thread: scheduling GC every {} seconds", |
| 636 | + HS_DYNAMIC_CONFIG(generic.repl_dev_cleanup_interval_sec)); |
| 637 | + m_rdev_gc_timer_hdl = iomanager.schedule_thread_timer( |
| 638 | + HS_DYNAMIC_CONFIG(generic.repl_dev_cleanup_interval_sec) * 1000 * 1000 * 1000, true /* recurring */, |
| 639 | + nullptr, [this](void *) { |
| 640 | + LOGDEBUGMOD(replication, "Reaper Thread: Doing GC"); |
| 641 | + gc_repl_reqs(); |
| 642 | + gc_repl_devs(); |
| 643 | + }); |
| 644 | + |
| 645 | + // Check for queued fetches at the minimum every second |
| 646 | + uint64_t interval_ns = std::min( |
| 647 | + HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_ms) * 1000 * 1000, 1ul * 1000 * 1000 * 1000); |
| 648 | + m_rdev_fetch_timer_hdl = iomanager.schedule_thread_timer(interval_ns, true /* recurring */, nullptr, |
| 649 | + [this](void *) { fetch_pending_data(); }); |
| 650 | + |
| 651 | + // Flush durable commit lsns to superblock |
| 652 | + // FIXUP: what is the best value for flush_durable_commit_interval_ms? |
| 653 | + m_flush_durable_commit_timer_hdl = iomanager.schedule_thread_timer( |
| 654 | + HS_DYNAMIC_CONFIG(consensus.flush_durable_commit_interval_ms) * 1000 * 1000, true /* recurring */, |
| 655 | + nullptr, [this](void *) { flush_durable_commit_lsn(); }); |
| 656 | + |
| 657 | + m_replace_member_sync_check_timer_hdl = iomanager.schedule_thread_timer( |
| 658 | + HS_DYNAMIC_CONFIG(consensus.replace_member_sync_check_interval_ms) * 1000 * 1000, true /* recurring */, |
| 659 | + nullptr, [this](void *) { |
| 660 | + monitor_replace_member_replication_status(); |
| 661 | + }); |
| 662 | + latch.count_down(); |
| 663 | + } |
| 664 | + }); |
| 665 | + latch.wait(); |
660 | 666 | } |
661 | 667 |
|
662 | 668 | void RaftReplService::stop_repl_service_timers() { |
663 | | - iomanager.cancel_timer(m_rdev_gc_timer_hdl, true); |
664 | | - iomanager.cancel_timer(m_rdev_fetch_timer_hdl, true); |
665 | | - iomanager.cancel_timer(m_flush_durable_commit_timer_hdl, true); |
666 | | - iomanager.cancel_timer(m_replace_member_sync_check_timer_hdl, true); |
| 669 | + iomanager.run_on_wait(m_reaper_fiber, [this]() { |
| 670 | + LOGINFOMOD(replication, "Reaper Thread: Stopping timers"); |
| 671 | + iomanager.cancel_timer(m_rdev_gc_timer_hdl, true); |
| 672 | + iomanager.cancel_timer(m_rdev_fetch_timer_hdl, true); |
| 673 | + iomanager.cancel_timer(m_flush_durable_commit_timer_hdl, true); |
| 674 | + iomanager.cancel_timer(m_replace_member_sync_check_timer_hdl, true); |
| 675 | + }); |
667 | 676 | } |
668 | 677 |
|
669 | | -void RaftReplService::add_to_fetch_queue(cshared< RaftReplDev >& rdev, std::vector< repl_req_ptr_t > rreqs) { |
| 678 | +void RaftReplService::add_to_fetch_queue(cshared<RaftReplDev> &rdev, std::vector<repl_req_ptr_t> rreqs) { |
670 | 679 | std::unique_lock lg(m_pending_fetch_mtx); |
671 | 680 | m_pending_fetch_batches.push(std::make_pair(rdev, std::move(rreqs))); |
672 | 681 | } |
673 | 682 |
|
674 | 683 | void RaftReplService::fetch_pending_data() { |
675 | 684 | std::unique_lock lg(m_pending_fetch_mtx); |
676 | 685 | while (!m_pending_fetch_batches.empty()) { |
677 | | - auto const& [d, rreqs] = m_pending_fetch_batches.front(); |
| 686 | + auto const &[d, rreqs] = m_pending_fetch_batches.front(); |
678 | 687 | if (get_elapsed_time_ms(rreqs.at(0)->created_time()) < HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_ms)) { |
679 | 688 | break; |
680 | 689 | } |
|
0 commit comments