@@ -49,9 +49,10 @@ void LoggingErrorHandler::taskFailed(kj::Exception&& exception)
49
49
m_loop.log () << " Uncaught exception in daemonized task." ;
50
50
}
51
51
52
- EventLoopRef::EventLoopRef (EventLoop& loop, std::unique_lock<std::mutex> * lock) : m_loop(&loop), m_lock(lock)
52
+ EventLoopRef::EventLoopRef (EventLoop& loop, Lock * lock) : m_loop(&loop), m_lock(lock)
53
53
{
54
54
auto loop_lock{PtrOrValue{m_lock, m_loop->m_mutex }};
55
+ loop_lock->assert_locked (m_loop->m_mutex );
55
56
m_loop->m_num_clients += 1 ;
56
57
}
57
58
@@ -61,9 +62,10 @@ bool EventLoopRef::reset()
61
62
if (auto * loop{m_loop}) {
62
63
m_loop = nullptr ;
63
64
auto loop_lock{PtrOrValue{m_lock, loop->m_mutex }};
65
+ loop_lock->assert_locked (loop->m_mutex );
64
66
assert (loop->m_num_clients > 0 );
65
67
loop->m_num_clients -= 1 ;
66
- if (loop->done (*loop_lock )) {
68
+ if (loop->done ()) {
67
69
done = true ;
68
70
loop->m_cv .notify_all ();
69
71
int post_fd{loop->m_post_fd };
@@ -134,17 +136,17 @@ Connection::~Connection()
134
136
m_sync_cleanup_fns.pop_front ();
135
137
}
136
138
while (!m_async_cleanup_fns.empty ()) {
137
- const std::unique_lock<std::mutex> lock (m_loop->m_mutex );
139
+ const Lock lock (m_loop->m_mutex );
138
140
m_loop->m_async_fns .emplace_back (std::move (m_async_cleanup_fns.front ()));
139
141
m_async_cleanup_fns.pop_front ();
140
142
}
141
- std::unique_lock<std::mutex> lock (m_loop->m_mutex );
142
- m_loop->startAsyncThread (lock );
143
+ Lock lock (m_loop->m_mutex );
144
+ m_loop->startAsyncThread ();
143
145
}
144
146
145
147
CleanupIt Connection::addSyncCleanup (std::function<void ()> fn)
146
148
{
147
- const std::unique_lock<std::mutex> lock (m_loop->m_mutex );
149
+ const Lock lock (m_loop->m_mutex );
148
150
// Add cleanup callbacks to the front of list, so sync cleanup functions run
149
151
// in LIFO order. This is a good approach because sync cleanup functions are
150
152
// added as client objects are created, and it is natural to clean up
@@ -158,13 +160,13 @@ CleanupIt Connection::addSyncCleanup(std::function<void()> fn)
158
160
159
161
void Connection::removeSyncCleanup (CleanupIt it)
160
162
{
161
- const std::unique_lock<std::mutex> lock (m_loop->m_mutex );
163
+ const Lock lock (m_loop->m_mutex );
162
164
m_sync_cleanup_fns.erase (it);
163
165
}
164
166
165
167
void Connection::addAsyncCleanup (std::function<void ()> fn)
166
168
{
167
- const std::unique_lock<std::mutex> lock (m_loop->m_mutex );
169
+ const Lock lock (m_loop->m_mutex );
168
170
// Add async cleanup callbacks to the back of the list. Unlike the sync
169
171
// cleanup list, this list order is more significant because it determines
170
172
// the order server objects are destroyed when there is a sudden disconnect,
@@ -200,7 +202,7 @@ EventLoop::EventLoop(const char* exe_name, LogFn log_fn, void* context)
200
202
EventLoop::~EventLoop ()
201
203
{
202
204
if (m_async_thread.joinable ()) m_async_thread.join ();
203
- const std::lock_guard<std::mutex> lock (m_mutex);
205
+ const Lock lock (m_mutex);
204
206
KJ_ASSERT (m_post_fn == nullptr );
205
207
KJ_ASSERT (m_async_fns.empty ());
206
208
KJ_ASSERT (m_wait_fd == -1 );
@@ -225,12 +227,12 @@ void EventLoop::loop()
225
227
for (;;) {
226
228
const size_t read_bytes = wait_stream->read (&buffer, 0 , 1 ).wait (m_io_context.waitScope );
227
229
if (read_bytes != 1 ) throw std::logic_error (" EventLoop wait_stream closed unexpectedly" );
228
- std::unique_lock<std::mutex> lock (m_mutex);
230
+ Lock lock (m_mutex);
229
231
if (m_post_fn) {
230
232
Unlock (lock, *m_post_fn);
231
233
m_post_fn = nullptr ;
232
234
m_cv.notify_all ();
233
- } else if (done (lock )) {
235
+ } else if (done ()) {
234
236
// Intentionally do not break if m_post_fn was set, even if done()
235
237
// would return true, to ensure that the EventLoopRef write(post_fd)
236
238
// call always succeeds and the loop does not exit between the time
@@ -243,7 +245,7 @@ void EventLoop::loop()
243
245
log () << " EventLoop::loop bye." ;
244
246
wait_stream = nullptr ;
245
247
KJ_SYSCALL (::close (post_fd));
246
- const std::unique_lock<std::mutex> lock (m_mutex);
248
+ const Lock lock (m_mutex);
247
249
m_wait_fd = -1 ;
248
250
m_post_fd = -1 ;
249
251
}
@@ -254,27 +256,27 @@ void EventLoop::post(kj::Function<void()> fn)
254
256
fn ();
255
257
return ;
256
258
}
257
- std::unique_lock<std::mutex> lock (m_mutex);
259
+ Lock lock (m_mutex);
258
260
EventLoopRef ref (*this , &lock);
259
- m_cv.wait (lock, [this ] { return m_post_fn == nullptr ; });
261
+ m_cv.wait (lock. m_lock , [this ]() MP_REQUIRES (m_mutex) { return m_post_fn == nullptr ; });
260
262
m_post_fn = &fn;
261
263
int post_fd{m_post_fd};
262
264
Unlock (lock, [&] {
263
265
char buffer = 0 ;
264
266
KJ_SYSCALL (write (post_fd, &buffer, 1 ));
265
267
});
266
- m_cv.wait (lock, [this , &fn] { return m_post_fn != &fn; });
268
+ m_cv.wait (lock. m_lock , [this , &fn]() MP_REQUIRES (m_mutex) { return m_post_fn != &fn; });
267
269
}
268
270
269
- void EventLoop::startAsyncThread (std::unique_lock<std::mutex>& lock )
271
+ void EventLoop::startAsyncThread ()
270
272
{
271
273
if (m_async_thread.joinable ()) {
272
274
// Notify to wake up the async thread if it is already running.
273
275
m_cv.notify_all ();
274
276
} else if (!m_async_fns.empty ()) {
275
277
m_async_thread = std::thread ([this ] {
276
- std::unique_lock<std::mutex> lock (m_mutex);
277
- while (!done (lock )) {
278
+ Lock lock (m_mutex);
279
+ while (!done ()) {
278
280
if (!m_async_fns.empty ()) {
279
281
EventLoopRef ref{*this , &lock};
280
282
const std::function<void ()> fn = std::move (m_async_fns.front ());
@@ -289,17 +291,15 @@ void EventLoop::startAsyncThread(std::unique_lock<std::mutex>& lock)
289
291
// Continue without waiting in case there are more async_fns
290
292
continue ;
291
293
}
292
- m_cv.wait (lock);
294
+ m_cv.wait (lock. m_lock );
293
295
}
294
296
});
295
297
}
296
298
}
297
299
298
- bool EventLoop::done (std::unique_lock<std::mutex>& lock ) const
300
+ bool EventLoop::done () const
299
301
{
300
302
assert (m_num_clients >= 0 );
301
- assert (lock.owns_lock ());
302
- assert (lock.mutex () == &m_mutex);
303
303
return m_num_clients == 0 && m_async_fns.empty ();
304
304
}
305
305
0 commit comments