Skip to content

Commit 21ed9fe

Browse files
committed
Adopt CheckedMutex in DefaultSocketProvider
1 parent 4475863 commit 21ed9fe

File tree

3 files changed

+50
-57
lines changed

3 files changed

+50
-57
lines changed

src/realm/sync/network/default_socket.cpp

Lines changed: 29 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -523,20 +523,21 @@ DefaultSocketProvider::~DefaultSocketProvider()
523523

524524
void DefaultSocketProvider::start()
525525
{
526-
std::unique_lock<std::mutex> lock(m_mutex);
526+
util::CheckedUniqueLock lock(m_mutex);
527527
// Has the thread already been started or is running
528528
if (m_state == State::Starting || m_state == State::Running)
529529
return; // early return
530530

531531
// If the thread has been previously run, make sure it has been joined first
532-
if (m_state == State::Stopping) {
532+
if (m_thread.joinable()) {
533533
state_wait_for(lock, State::Stopped);
534+
m_thread.join();
534535
}
535536

536537
m_logger_ptr->trace("Default event loop: start()");
537538
REALM_ASSERT(m_state == State::Stopped);
538539

539-
do_state_update(lock, State::Starting);
540+
do_state_update(State::Starting);
540541
m_thread = std::thread{&DefaultSocketProvider::event_loop, this};
541542
// Wait for the thread to start before continuing
542543
state_wait_for(lock, State::Running);
@@ -545,17 +546,17 @@ void DefaultSocketProvider::start()
545546
void DefaultSocketProvider::OnlyForTesting::run_event_loop_on_current_thread(DefaultSocketProvider* provider)
546547
{
547548
{
548-
std::unique_lock<std::mutex> lk(provider->m_mutex);
549+
util::CheckedLockGuard lk(provider->m_mutex);
549550
REALM_ASSERT(provider->m_state == State::Stopped);
550-
provider->do_state_update(lk, State::Starting);
551+
provider->do_state_update(State::Starting);
551552
}
552553

553554
provider->event_loop();
554555
}
555556

556557
void DefaultSocketProvider::OnlyForTesting::prep_event_loop_for_restart(DefaultSocketProvider* provider)
557558
{
558-
std::unique_lock<std::mutex> lk(provider->m_mutex);
559+
util::CheckedLockGuard lk(provider->m_mutex);
559560
REALM_ASSERT(provider->m_state == State::Stopped);
560561
provider->m_service.reset();
561562
}
@@ -569,21 +570,22 @@ void DefaultSocketProvider::event_loop()
569570
if (m_observer_ptr)
570571
m_observer_ptr->will_destroy_thread();
571572

572-
std::unique_lock<std::mutex> lock(m_mutex);
573-
// Did we get here due to an unhandled exception?
574-
if (m_state != State::Stopping) {
575-
m_logger_ptr->error("Default event loop: thread exited unexpectedly");
573+
{
574+
util::CheckedLockGuard lock(m_mutex);
575+
// Did we get here due to an unhandled exception?
576+
if (m_state != State::Stopping) {
577+
m_logger_ptr->error("Default event loop: thread exited unexpectedly");
578+
}
579+
m_state = State::Stopped;
576580
}
577-
m_state = State::Stopped;
578-
lock.unlock();
579581
m_state_cv.notify_all();
580582
});
581583

582584
if (m_observer_ptr)
583585
m_observer_ptr->did_create_thread();
584586

585587
{
586-
std::lock_guard<std::mutex> lock(m_mutex);
588+
util::CheckedLockGuard lock(m_mutex);
587589
REALM_ASSERT(m_state == State::Starting);
588590
}
589591

@@ -596,7 +598,7 @@ void DefaultSocketProvider::event_loop()
596598

597599
REALM_ASSERT(status.is_ok());
598600

599-
std::unique_lock<std::mutex> lock(m_mutex);
601+
util::CheckedLockGuard lock(m_mutex);
600602
// This is a callback from a previous generation
601603
if (m_event_loop_generation != my_generation) {
602604
return;
@@ -606,7 +608,7 @@ void DefaultSocketProvider::event_loop()
606608
}
607609
m_logger_ptr->trace("Default event loop: service run");
608610
REALM_ASSERT(m_state == State::Starting);
609-
do_state_update(lock, State::Running);
611+
do_state_update(State::Running);
610612
});
611613

612614
// If there is no event loop observer or handle_error function registered, then just
@@ -619,11 +621,12 @@ void DefaultSocketProvider::event_loop()
619621
m_service.run_until_stopped(); // Throws
620622
}
621623
catch (const std::exception& e) {
622-
REALM_ASSERT(m_observer_ptr); // should not change while event loop is running
623-
std::unique_lock<std::mutex> lock(m_mutex);
624-
// Service is no longer running, event loop thread is stopping
625-
do_state_update(lock, State::Stopping);
626-
lock.unlock();
624+
{
625+
util::CheckedLockGuard lock(m_mutex);
626+
// Service is no longer running, event loop thread is stopping
627+
m_state = State::Stopping;
628+
}
629+
m_state_cv.notify_all();
627630
m_logger_ptr->error("Default event loop exception: ", e.what());
628631
// If the error was not handled by the thread loop observer, then rethrow
629632
if (!m_observer_ptr->handle_error(e))
@@ -634,12 +637,12 @@ void DefaultSocketProvider::event_loop()
634637

635638
void DefaultSocketProvider::stop(bool wait_for_stop)
636639
{
637-
std::unique_lock<std::mutex> lock(m_mutex);
640+
util::CheckedUniqueLock lock(m_mutex);
638641

639642
// Do nothing if the thread is not started or running or stop has already been called
640643
if (m_state == State::Starting || m_state == State::Running) {
641644
m_logger_ptr->trace("Default event loop: stop()");
642-
do_state_update(lock, State::Stopping);
645+
do_state_update(State::Stopping);
643646
// Updating state to Stopping will free a start() if it is waiting for the thread to
644647
// start and may cause the thread to exit early before calling service.run()
645648
m_service.stop(); // Unblocks m_service.run()
@@ -661,24 +664,16 @@ void DefaultSocketProvider::stop(bool wait_for_stop)
661664
// | | ^
662665
// +----------------------+
663666

664-
void DefaultSocketProvider::do_state_update(std::unique_lock<std::mutex>&, State new_state)
667+
void DefaultSocketProvider::do_state_update(State new_state)
665668
{
666-
// m_state_mutex should already be locked...
667669
m_state = new_state;
668670
m_state_cv.notify_all(); // Let any waiters check the state
669671
}
670672

671-
void DefaultSocketProvider::state_wait_for(std::unique_lock<std::mutex>& lock, State expected_state)
673+
void DefaultSocketProvider::state_wait_for(util::CheckedUniqueLock& lock, State expected_state)
672674
{
673-
// Check for condition already met or superseded
674-
if (m_state >= expected_state)
675-
return;
676-
677-
m_state_cv.wait(lock, [this, expected_state]() {
678-
// are we there yet?
679-
if (m_state < expected_state)
680-
return false;
681-
return true;
675+
m_state_cv.wait(lock.native_handle(), [this, expected_state]() REQUIRES(m_mutex) {
676+
return m_state >= expected_state;
682677
});
683678
}
684679

src/realm/sync/network/default_socket.hpp

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <realm/sync/socket_provider.hpp>
66
#include <realm/sync/network/http.hpp>
77
#include <realm/sync/network/network.hpp>
8+
#include <realm/util/checked_mutex.hpp>
89
#include <realm/util/future.hpp>
910
#include <realm/util/tagged_bool.hpp>
1011

@@ -22,7 +23,7 @@ using port_type = sync::port_type;
2223

2324
class DefaultSocketProvider : public SyncSocketProvider {
2425
public:
25-
class Timer : public SyncSocketProvider::Timer {
26+
class Timer final : public SyncSocketProvider::Timer {
2627
public:
2728
friend class DefaultSocketProvider;
2829

@@ -35,33 +36,30 @@ class DefaultSocketProvider : public SyncSocketProvider {
3536
m_timer.cancel();
3637
}
3738

38-
protected:
39+
private:
40+
network::DeadlineTimer m_timer;
41+
3942
Timer(network::Service& service, std::chrono::milliseconds delay, FunctionHandler&& handler)
4043
: m_timer{service}
4144
{
4245
m_timer.async_wait(delay, std::move(handler));
4346
}
44-
45-
private:
46-
network::DeadlineTimer m_timer;
4747
};
4848

49-
struct AutoStartTag {};
50-
51-
using AutoStart = util::TaggedBool<AutoStartTag>;
49+
using AutoStart = util::TaggedBool<struct AutoStartTag>;
5250
DefaultSocketProvider(const std::shared_ptr<util::Logger>& logger, const std::string& user_agent,
5351
const std::shared_ptr<BindingCallbackThreadObserver>& observer_ptr = nullptr,
5452
AutoStart auto_start = AutoStart{true});
5553

5654
~DefaultSocketProvider();
5755

5856
// Start the event loop if it is not started already. Otherwise, do nothing.
59-
void start();
57+
void start() REQUIRES(!m_mutex);
6058

6159
/// Temporary workaround until client shutdown has been updated in a separate PR - these functions
6260
/// will be handled internally when this happens.
6361
/// Stops the internal event loop (provided by network::Service)
64-
void stop(bool wait_for_stop = false) override;
62+
void stop(bool wait_for_stop = false) override REQUIRES(!m_mutex);
6563

6664
std::unique_ptr<WebSocketInterface> connect(std::unique_ptr<WebSocketObserver>, WebSocketEndpoint&&) override;
6765

@@ -89,23 +87,23 @@ class DefaultSocketProvider : public SyncSocketProvider {
8987
private:
9088
enum class State { Starting, Running, Stopping, Stopped };
9189

92-
/// Block until the state reaches the expected or later state - return true if state matches expected state
93-
void state_wait_for(std::unique_lock<std::mutex>& lock, State expected_state);
90+
/// Block until the state reaches the expected or later state
91+
void state_wait_for(util::CheckedUniqueLock& lock, State expected_state) REQUIRES(m_mutex);
9492
/// Internal function for updating the state and signaling the wait_for_state condvar
95-
void do_state_update(std::unique_lock<std::mutex>&, State new_state);
93+
void do_state_update(State new_state) REQUIRES(m_mutex);
9694
/// The execution code for the event loop thread
97-
void event_loop();
95+
void event_loop() REQUIRES(!m_mutex);
9896

9997
std::shared_ptr<util::Logger> m_logger_ptr;
100-
std::shared_ptr<BindingCallbackThreadObserver> m_observer_ptr;
98+
const std::shared_ptr<BindingCallbackThreadObserver> m_observer_ptr;
10199
network::Service m_service;
102100
std::mt19937_64 m_random;
103101
const std::string m_user_agent;
104-
std::mutex m_mutex;
102+
util::CheckedMutex m_mutex;
105103
uint64_t m_event_loop_generation = 0;
106-
State m_state; // protected by m_mutex
107-
std::condition_variable m_state_cv; // uses m_mutex
108-
std::thread m_thread; // protected by m_mutex
104+
State m_state GUARDED_BY(m_mutex);
105+
std::condition_variable m_state_cv;
106+
std::thread m_thread GUARDED_BY(m_mutex);
109107
};
110108

111109
/// Class for the Default Socket Provider websockets that allows a simulated
@@ -115,8 +113,6 @@ class DefaultWebSocket : public WebSocketInterface {
115113
virtual ~DefaultWebSocket() = default;
116114

117115
virtual void force_handshake_response_for_testing(int status_code, std::string body = "") = 0;
118-
119-
protected:
120116
};
121117

122118
} // namespace realm::sync::websocket

src/realm/sync/noinst/client_impl_base.hpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,8 @@ class ClientImpl {
219219
// SyncSocketProvider
220220
void post(SyncSocketProvider::FunctionHandler&& handler) REQUIRES(!m_drain_mutex);
221221
SyncSocketProvider::SyncTimer create_timer(std::chrono::milliseconds delay,
222-
SyncSocketProvider::FunctionHandler&& handler) REQUIRES(!m_drain_mutex);
222+
SyncSocketProvider::FunctionHandler&& handler)
223+
REQUIRES(!m_drain_mutex);
223224
using SyncTrigger = std::unique_ptr<Trigger<ClientImpl>>;
224225
SyncTrigger create_trigger(SyncSocketProvider::FunctionHandler&& handler);
225226

@@ -336,7 +337,8 @@ class ClientImpl {
336337
bool verify_servers_ssl_certificate,
337338
util::Optional<std::string> ssl_trust_certificate_path,
338339
std::function<SyncConfig::SSLVerifyCallback>,
339-
util::Optional<SyncConfig::ProxyConfig>, bool& was_created) REQUIRES(!m_drain_mutex);
340+
util::Optional<SyncConfig::ProxyConfig>, bool& was_created)
341+
REQUIRES(!m_drain_mutex);
340342

341343
// Destroys the specified connection.
342344
void remove_connection(ClientImpl::Connection&) noexcept REQUIRES(!m_drain_mutex);

0 commit comments

Comments
 (0)