Skip to content

Commit aee7b30

Browse files
committed
common/intrusive_timer.h: introduce intrusive_timer
Signed-off-by: Samuel Just <[email protected]>
1 parent 75236e9 commit aee7b30

File tree

1 file changed

+222
-0
lines changed

1 file changed

+222
-0
lines changed

src/common/intrusive_timer.h

Lines changed: 222 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,222 @@
1+
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2+
// vim: ts=8 sw=2 smarttab
3+
4+
#pragma once
5+
6+
#include <mutex>
7+
#include <condition_variable>
8+
9+
#include <boost/intrusive/set.hpp>
10+
11+
#include "common/ceph_time.h"
12+
13+
namespace ceph::common {
14+
15+
/**
16+
* intrusive_timer
17+
*
18+
* SafeTimer (common/Timer.h) isn't well suited to usage in high
19+
* usage pathways for a few reasons:
20+
* - Usage generally requires allocation of a fresh context for each
21+
* scheduled operation. One could override Context::complete to avoid
22+
* destroying the instance, but actually reusing the instance is tricky
23+
* as SafeTimer doesn't guarrantee cancelation if safe_callbacks is false.
24+
* - SafeTimer only guarrantees cancelation if safe_timer is true, which
25+
* it generally won't be if the user needs to call into SafeTimer while
26+
* holding locks taken by callbacks.
27+
*
28+
* This implementation allows the user to repeatedly schedule and cancel
29+
* an object inheriting from the callback_t interface below while
30+
* guarranteeing cancelation provided that the user holds the lock
31+
* associated with a particular callback while calling into intrusive_timer.
32+
*/
33+
class intrusive_timer {
34+
using clock_t = ceph::coarse_real_clock;
35+
36+
public:
37+
/**
38+
* callback_t
39+
*
40+
* Objects inheriting from callback_t can be scheduled
41+
* via intrusive_timer.
42+
*/
43+
class callback_t : public boost::intrusive::set_base_hook<> {
44+
friend class intrusive_timer;
45+
clock_t::time_point schedule_point;
46+
unsigned incarnation = 0;
47+
48+
public:
49+
/**
50+
* add_ref, dec_ref
51+
*
52+
* callback_t must remain live and all methods must remain
53+
* safe to call as long as calls to add_ref() outnumber calls
54+
* to dec_ref().
55+
*/
56+
virtual void add_ref() = 0;
57+
virtual void dec_ref() = 0;
58+
59+
/**
60+
* lock, unlock
61+
*
62+
* For any specific callback_t, must lock/unlock a lock held while
63+
* accessing intrusive_timer public methods for that callback_t
64+
* instance.
65+
*/
66+
virtual void lock() = 0;
67+
virtual void unlock() = 0;
68+
69+
/// Invokes callback, will be called with lock held
70+
virtual void invoke() = 0;
71+
72+
/**
73+
* is_scheduled
74+
*
75+
* Return true iff callback is scheduled to be invoked.
76+
* May only be validly invoked while lock associated with
77+
* callback_t instance is held.
78+
*/
79+
bool is_scheduled() const { return incarnation % 2 == 1; }
80+
virtual ~callback_t() = default;
81+
82+
/// Order callback_t by schedule_point
83+
auto operator<=>(const callback_t &rhs) const {
84+
return std::make_pair(schedule_point, this) <=>
85+
std::make_pair(rhs.schedule_point, &rhs);
86+
}
87+
};
88+
89+
private:
90+
/// protects events, stopping
91+
std::mutex lock;
92+
93+
/// stopping, cv used to signal that t should halt
94+
std::condition_variable cv;
95+
bool stopping = false;
96+
97+
/// queued events ordered by callback_t::schedule_point
98+
boost::intrusive::set<callback_t> events;
99+
100+
/// thread responsible for calling scheduled callbacks
101+
std::thread t;
102+
103+
/// peek front of queue, null if empty
104+
callback_t *peek() {
105+
return events.empty() ? nullptr : &*(events.begin());
106+
}
107+
108+
/// entry point for t
109+
void _run() {
110+
std::unique_lock l(lock);
111+
while (true) {
112+
if (stopping) {
113+
return;
114+
}
115+
116+
auto next = peek();
117+
if (!next) {
118+
cv.wait(l);
119+
continue;
120+
}
121+
122+
if (next->schedule_point > clock_t::now()) {
123+
cv.wait_until(l, next->schedule_point);
124+
continue;
125+
}
126+
127+
// we release the reference below
128+
events.erase(*next);
129+
130+
/* cancel() and schedule_after() both hold both intrusive_timer::lock
131+
* and the callback_t lock (precondition of both) while mutating
132+
* next->incarnation, so this read is safe. We're relying on the
133+
* fact that only this method in this thread will access
134+
* next->incarnation under only one of the two. */
135+
auto incarnation = next->incarnation;
136+
l.unlock();
137+
{
138+
/* Note that intrusive_timer::cancel may observe that
139+
* callback_t::is_scheduled() returns true while
140+
* callback_t::is_linked() is false since we drop
141+
* intrusive_timer::lock between removing next from the
142+
* queue and incrementing callback_t::incarnation here
143+
* under the callback_t lock. In that case, cancel()
144+
* increments incarnation logically canceling the callback
145+
* but leaves the reference for us to drop.
146+
*/
147+
std::unique_lock m(*next);
148+
if (next->incarnation == incarnation) {
149+
/* As above, cancel() and schedule_after() hold both locks so this
150+
* mutation and read are safe. */
151+
++next->incarnation;
152+
next->invoke();
153+
}
154+
/* else, next was canceled between l.unlock() and next->lock().
155+
* Note that if incarnation does not match, we do nothing to next
156+
* other than drop our reference -- it might well have been
157+
* rescheduled already! */
158+
}
159+
next->dec_ref();
160+
l.lock();
161+
}
162+
}
163+
164+
public:
165+
intrusive_timer() : t([this] { _run(); }) {}
166+
167+
/**
168+
* schedule_after
169+
*
170+
* Schedule cb to run after the specified period.
171+
* The lock associated with cb must be held.
172+
* cb must not already be scheduled.
173+
*
174+
* @param cb [in] callback to schedule
175+
* @param after [in] period after which to schedule cb
176+
*/
177+
template <typename T>
178+
void schedule_after(callback_t &cb, T after) {
179+
ceph_assert(!cb.is_scheduled());
180+
std::unique_lock l(lock);
181+
ceph_assert(!cb.is_linked());
182+
183+
++cb.incarnation;
184+
cb.schedule_point = clock_t::now() + after;
185+
186+
cb.add_ref();
187+
events.insert(cb);
188+
189+
cv.notify_one();
190+
}
191+
192+
/**
193+
* cancel
194+
*
195+
* Cancel already scheduled cb.
196+
* The lock associated with cb must be held.
197+
*
198+
* @param cb [in] callback to cancel
199+
*/
200+
void cancel(callback_t &cb) {
201+
ceph_assert(cb.is_scheduled());
202+
std::unique_lock l(lock);
203+
++cb.incarnation;
204+
205+
if (cb.is_linked()) {
206+
events.erase(cb);
207+
cb.dec_ref();
208+
}
209+
}
210+
211+
/// Stop intrusive_timer
212+
void stop() {
213+
{
214+
std::unique_lock l(lock);
215+
stopping = true;
216+
cv.notify_one();
217+
}
218+
t.join();
219+
}
220+
};
221+
222+
}

0 commit comments

Comments
 (0)