|
15 | 15 |
|
16 | 16 | #pragma once |
17 | 17 |
|
| 18 | +#include <chrono> |
| 19 | +#include <functional> |
18 | 20 | #include <ostream> |
19 | 21 | #include <map> |
20 | 22 | #include <vector> |
@@ -74,12 +76,56 @@ class mClockScheduler : public Scheduler, md_config_obs_t { |
74 | 76 | const scheduler_id_t &id) const; |
75 | 77 | } client_registry; |
76 | 78 |
|
| 79 | + class crimson_mclock_cleaning_job_t { |
| 80 | + struct job_control_t { |
| 81 | + std::chrono::milliseconds period; |
| 82 | + std::function<void()> body; |
| 83 | + |
| 84 | + bool stopping = false; |
| 85 | + seastar::condition_variable cv; |
| 86 | + |
| 87 | + |
| 88 | + template <typename D, typename F> |
| 89 | + job_control_t(D _period, F &&_body) : |
| 90 | + period(std::chrono::duration_cast<decltype(period)>(_period)), |
| 91 | + body(std::forward<F>(_body)) { |
| 92 | + } |
| 93 | + }; |
| 94 | + seastar::lw_shared_ptr<job_control_t> control; |
| 95 | + |
| 96 | + static seastar::future<> run( |
| 97 | + seastar::lw_shared_ptr<job_control_t> control) { |
| 98 | + while (!control->stopping) { |
| 99 | + std::invoke(control->body); |
| 100 | + co_await control->cv.wait(control->period); |
| 101 | + } |
| 102 | + } |
| 103 | + public: |
| 104 | + template<typename... Args> |
| 105 | + crimson_mclock_cleaning_job_t(Args&&... args) : |
| 106 | + control(seastar::make_lw_shared<job_control_t>( |
| 107 | + std::forward<Args>(args)...)) |
| 108 | + { |
| 109 | + std::ignore = run(control); |
| 110 | + } |
| 111 | + |
| 112 | + void try_update(milliseconds _period) { |
| 113 | + control->period = _period; |
| 114 | + control->cv.signal(); |
| 115 | + } |
| 116 | + |
| 117 | + ~crimson_mclock_cleaning_job_t() { |
| 118 | + control->stopping = true; |
| 119 | + control->cv.signal(); |
| 120 | + } |
| 121 | + }; |
77 | 122 | using mclock_queue_t = crimson::dmclock::PullPriorityQueue< |
78 | 123 | scheduler_id_t, |
79 | 124 | item_t, |
80 | 125 | true, |
81 | 126 | true, |
82 | | - 2>; |
| 127 | + 2, |
| 128 | + crimson_mclock_cleaning_job_t>; |
83 | 129 | mclock_queue_t scheduler; |
84 | 130 | std::list<item_t> immediate; |
85 | 131 |
|
|
0 commit comments