Skip to content

Commit 97bd8ab

Browse files
chkuang-ga-maurice
authored andcommitted
Fix deadlock between scheduler thread and websocket thread when server revokes auth token.
Also change assert() to FIREBASE_DEV_ASSERT() to compile out in release build. PiperOrigin-RevId: 292447554
1 parent e1c3d0b commit 97bd8ab

File tree

6 files changed

+140
-105
lines changed

6 files changed

+140
-105
lines changed

database/src/desktop/connection/connection.cc

Lines changed: 41 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@
1414

1515
#include "database/src/desktop/connection/connection.h"
1616

17-
#include <cassert>
1817
#include <cstdlib>
1918
#include <cstring>
2019

20+
#include "app/src/assert.h"
2121
#include "app/src/log.h"
2222
#include "app/src/variant_util.h"
2323
#include "database/src/desktop/connection/util_connection.h"
@@ -62,17 +62,17 @@ Connection::Connection(scheduler::Scheduler* scheduler, const HostInfo& info,
6262
client_(nullptr),
6363
expected_incoming_frames_(0),
6464
logger_(logger) {
65-
assert(scheduler);
66-
assert(event_handler);
65+
FIREBASE_DEV_ASSERT(scheduler);
66+
FIREBASE_DEV_ASSERT(event_handler);
6767

6868
// Create log id like "[conn_0]" for debugging
6969
std::stringstream log_id_stream;
7070
log_id_stream << "[conn_" << next_log_id_.fetch_add(1) << "]";
7171
log_id_ = log_id_stream.str();
7272

7373
// Create web socket client regardless of its implementation
74-
client_ =
75-
CreateWebSocketClient(host_info_, this, opt_last_session_id, logger);
74+
client_ = CreateWebSocketClient(host_info_, this, opt_last_session_id, logger,
75+
scheduler);
7676
}
7777

7878
Connection::~Connection() {
@@ -88,13 +88,13 @@ Connection::~Connection() {
8888
scheduler_->Schedule(new callback::CallbackValue1<scheduler::RequestHandle>(
8989
keep_alive_handler_, [](scheduler::RequestHandle handler) {
9090
if (handler.IsValid() && !handler.IsCancelled()) {
91-
assert(handler.Cancel());
91+
FIREBASE_DEV_ASSERT(handler.Cancel());
9292
}
9393
}));
9494
}
9595

9696
void Connection::Open() {
97-
assert(client_);
97+
FIREBASE_DEV_ASSERT(client_);
9898
if (state_ != kStateNone) {
9999
logger_->LogError("%s Cannot open. Connection has be opened before",
100100
log_id_.c_str());
@@ -107,7 +107,7 @@ void Connection::Open() {
107107
}
108108

109109
void Connection::Close(DisconnectReason reason /* = kReasonOther */) {
110-
assert(client_);
110+
FIREBASE_DEV_ASSERT(client_);
111111

112112
if (state_ == kStateDisconnected) {
113113
logger_->LogError("%s Cannot close. Connection has been closed.",
@@ -131,8 +131,8 @@ void Connection::Close(DisconnectReason reason /* = kReasonOther */) {
131131
}
132132

133133
void Connection::Send(const Variant& message, bool is_sensitive) {
134-
assert(client_);
135-
assert(!message.is_null());
134+
FIREBASE_DEV_ASSERT(client_);
135+
FIREBASE_DEV_ASSERT(!message.is_null());
136136

137137
if (state_ != kStateReady) {
138138
logger_->LogError("%s Tried to send on an unconnected connection",
@@ -174,67 +174,48 @@ void Connection::OnOpen() {
174174

175175
logger_->LogDebug("%s websocket opened", log_id_.c_str());
176176

177-
scheduler_->Schedule(new callback::CallbackValue1<ConnectionRef>(
178-
safe_this_, [](ConnectionRef conn_ref) {
179-
ConnectionRefLock lock(&conn_ref);
180-
auto connection = lock.GetReference();
181-
if (connection != nullptr) {
182-
assert(connection->state_ == kStateConnecting);
183-
184-
connection->ws_connected_ = true;
185-
186-
// Start periodic callback to keep the connection alive, by sending
187-
// "0" to server
188-
connection->keep_alive_handler_ = connection->scheduler_->Schedule(
189-
new callback::CallbackValue1<ConnectionRef>(
190-
connection->safe_this_,
191-
[](ConnectionRef conn_ref) {
192-
ConnectionRefLock lock(&conn_ref);
193-
auto connection = lock.GetReference();
194-
if (connection != nullptr && connection->client_ &&
195-
connection->state_ == kStateReady) {
196-
connection->client_->Send("0");
197-
}
198-
}),
199-
kKeepAliveTimeoutMs, kKeepAliveTimeoutMs);
200-
}
201-
}));
177+
FIREBASE_DEV_ASSERT(state_ == kStateConnecting);
178+
179+
ws_connected_ = true;
180+
181+
// Start periodic callback to keep the connection alive, by sending
182+
// "0" to server
183+
keep_alive_handler_ = scheduler_->Schedule(
184+
new callback::CallbackValue1<ConnectionRef>(
185+
safe_this_,
186+
[](ConnectionRef conn_ref) {
187+
ConnectionRefLock lock(&conn_ref);
188+
auto connection = lock.GetReference();
189+
if (connection != nullptr && connection->client_ &&
190+
connection->state_ == kStateReady) {
191+
connection->client_->Send("0");
192+
}
193+
}),
194+
kKeepAliveTimeoutMs, kKeepAliveTimeoutMs);
202195
}
203196

204197
void Connection::OnMessage(const char* msg) {
205198
SAFE_REFERENCE_RETURN_VOID_IF_INVALID(ConnectionRefLock, lock, safe_this_);
206199

207200
logger_->LogDebug("%s websocket message received", log_id_.c_str());
208-
scheduler_->Schedule(new callback::CallbackValue1String1<ConnectionRef>(
209-
safe_this_, msg, [](ConnectionRef conn_ref, const char* msg) {
210-
ConnectionRefLock lock(&conn_ref);
211-
auto connection = lock.GetReference();
212-
if (connection != nullptr) {
213-
connection->HandleIncomingFrame(msg);
214-
}
215-
}));
201+
202+
HandleIncomingFrame(msg);
216203
}
217204

218205
void Connection::OnClose() {
219206
SAFE_REFERENCE_RETURN_VOID_IF_INVALID(ConnectionRefLock, lock, safe_this_);
220207

221208
logger_->LogDebug("%s websocket closed", log_id_.c_str());
222209

223-
scheduler_->Schedule(new callback::CallbackValue1<ConnectionRef>(
224-
safe_this_, [](ConnectionRef conn_ref) {
225-
ConnectionRefLock lock(&conn_ref);
226-
auto connection = lock.GetReference();
227-
if (connection != nullptr && connection->state_ != kStateDisconnected) {
228-
// No need to do anything if Close() has been called already.
229-
// Otherwise, the cause could be either connection failure or
230-
// connection lost, depending on whether the web socket has already
231-
// been connected or not.
232-
DisconnectReason reason = connection->ws_connected_
233-
? kDisconnectReasonConnectionLost
234-
: kDisconnectReasonConnectionFailed;
235-
connection->Close(reason);
236-
}
237-
}));
210+
if (state_ != kStateDisconnected) {
211+
// No need to do anything if Close() has been called already.
212+
// Otherwise, the cause could be either connection failure or
213+
// connection lost, depending on whether the web socket has already
214+
// been connected or not.
215+
DisconnectReason reason = ws_connected_ ? kDisconnectReasonConnectionLost
216+
: kDisconnectReasonConnectionFailed;
217+
Close(reason);
218+
}
238219
}
239220

240221
void Connection::OnError(const WebSocketClientErrorData& error_data) {
@@ -311,7 +292,7 @@ void Connection::ProcessMessage(const char* message) {
311292
logger_->LogDebug("%s ProcessMessage (length: %d)", log_id_.c_str(),
312293
strlen(message));
313294

314-
assert(!message_data.is_null());
295+
FIREBASE_DEV_ASSERT(!message_data.is_null());
315296

316297
const auto& messageMap = message_data.map();
317298
auto itType = messageMap.find(kServerEnvelopeType);
@@ -357,7 +338,7 @@ void Connection::OnControlMessage(const Variant& data) {
357338
logger_->LogDebug("%s received control message: %s", log_id_.c_str(),
358339
util::VariantToJson(data).c_str());
359340

360-
assert(!data.is_null());
341+
FIREBASE_DEV_ASSERT(!data.is_null());
361342

362343
const auto& data_map = data.map();
363344
auto itType = data_map.find(kServerControlMessageType);

database/src/desktop/connection/persistent_connection.cc

Lines changed: 44 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
#include "database/src/desktop/connection/persistent_connection.h"
1616

1717
#include <algorithm>
18-
#include <cassert>
1918
#include <sstream>
2019

2120
#include "app/src/app_common.h"
@@ -118,9 +117,9 @@ PersistentConnection::PersistentConnection(
118117
next_listen_id_(0),
119118
next_write_id_(0),
120119
logger_(logger) {
121-
assert(app);
122-
assert(scheduler);
123-
assert(event_handler_);
120+
FIREBASE_DEV_ASSERT(app);
121+
FIREBASE_DEV_ASSERT(scheduler);
122+
FIREBASE_DEV_ASSERT(event_handler_);
124123

125124
// Create log id like "[pc_0]" for debugging
126125
std::stringstream log_id_stream;
@@ -208,7 +207,7 @@ void PersistentConnection::OnReady(int64_t timestamp,
208207

209208
// Restore Auth
210209
logger_->LogDebug("%s calling restore state", log_id_.c_str());
211-
assert(connection_state_ == kConnecting);
210+
FIREBASE_DEV_ASSERT(connection_state_ == kConnecting);
212211

213212
// Try to retrieve auth token synchronously when connection is ready.
214213
GetAuthToken(&auth_token_);
@@ -247,26 +246,26 @@ void PersistentConnection::HandleConnectStatsResponse(
247246
}
248247

249248
void PersistentConnection::OnDataMessage(const Variant& message) {
250-
assert(message.is_map());
249+
FIREBASE_DEV_ASSERT(message.is_map());
251250

252251
SAFE_REFERENCE_RETURN_VOID_IF_INVALID(ThisRefLock, lock, safe_this_);
253252

254253
if (HasKey(message, kRequestNumber)) {
255254
auto it_request_number = message.map().find(kRequestNumber);
256-
assert(it_request_number->second.is_numeric());
255+
FIREBASE_DEV_ASSERT(it_request_number->second.is_numeric());
257256
uint64_t rn = it_request_number->second.int64_value();
258257

259258
RequestDataPtr request_ptr;
260259
auto it_request = request_map_.find(rn);
261-
assert(it_request != request_map_.end());
260+
FIREBASE_DEV_ASSERT(it_request != request_map_.end());
262261
if (it_request != request_map_.end()) {
263262
request_ptr = Move(it_request->second);
264263
request_map_.erase(it_request);
265264
}
266-
assert(request_ptr);
265+
FIREBASE_DEV_ASSERT(request_ptr);
267266
if (request_ptr) {
268267
auto it_response_message = message.map().find(kResponseForRequest);
269-
assert(it_response_message != message.map().end());
268+
FIREBASE_DEV_ASSERT(it_response_message != message.map().end());
270269
if (it_response_message != message.map().end()) {
271270
logger_->LogDebug("%s Trigger handler for request %llu",
272271
log_id_.c_str(), rn);
@@ -510,33 +509,41 @@ void PersistentConnection::TryScheduleReconnect() {
510509
return;
511510
}
512511

513-
assert(connection_state_ == kDisconnected);
512+
FIREBASE_DEV_ASSERT(connection_state_ == kDisconnected);
514513
bool force_refresh = force_auth_refresh_;
515514
force_auth_refresh_ = false;
516515
logger_->LogDebug("%s Scheduling connection attempt", log_id_.c_str());
517516

518-
// TODO(chkuang): Implement Exponential Backoff Retry
519-
connection_state_ = kGettingToken;
520-
logger_->LogDebug("%s Trying to fetch auth token", log_id_.c_str());
521-
522-
// Get Token Asynchronously to make sure the token is not expired.
523-
Future<std::string> future;
524-
bool succeeded = app_->function_registry()->CallFunction(
525-
::firebase::internal::FnAuthGetTokenAsync, app_, &force_refresh, &future);
526-
if (succeeded && future.status() != kFutureStatusInvalid) {
527-
// Set pending future
528-
MutexLock future_lock(pending_token_future_mutex_);
529-
pending_token_future_ = future;
530-
future.OnCompletion(OnTokenFutureComplete, this);
531-
} else {
532-
// Auth is not available now. Start the connection anyway.
533-
OpenNetworkConnection();
534-
}
517+
scheduler_->Schedule(new callback::CallbackValue2<ThisRef, bool>(
518+
safe_this_, force_refresh, [](ThisRef ref, bool force_refresh) {
519+
ThisRefLock lock(&ref);
520+
auto* connection = lock.GetReference();
521+
if (!connection) return;
522+
// TODO(chkuang): Implement Exponential Backoff Retry
523+
connection->connection_state_ = kGettingToken;
524+
connection->logger_->LogDebug("%s Trying to fetch auth token",
525+
connection->log_id_.c_str());
526+
527+
// Get Token Asynchronously to make sure the token is not expired.
528+
Future<std::string> future;
529+
bool succeeded = connection->app_->function_registry()->CallFunction(
530+
::firebase::internal::FnAuthGetTokenAsync, connection->app_,
531+
&force_refresh, &future);
532+
if (succeeded && future.status() != kFutureStatusInvalid) {
533+
// Set pending future
534+
MutexLock future_lock(connection->pending_token_future_mutex_);
535+
connection->pending_token_future_ = future;
536+
future.OnCompletion(OnTokenFutureComplete, connection);
537+
} else {
538+
// Auth is not available now. Start the connection anyway.
539+
connection->OpenNetworkConnection();
540+
}
541+
}));
535542
}
536543

537544
void PersistentConnection::OnTokenFutureComplete(
538545
const Future<std::string>& result_data, void* user_data) {
539-
assert(user_data);
546+
FIREBASE_DEV_ASSERT(user_data);
540547
PersistentConnection* connection =
541548
static_cast<PersistentConnection*>(user_data);
542549
ThisRefLock lock(&connection->safe_this_);
@@ -568,7 +575,7 @@ void PersistentConnection::HandleTokenFuture(Future<std::string> future) {
568575
auth_token_ = *future.result();
569576
OpenNetworkConnection();
570577
} else {
571-
assert(connection_state_ == kDisconnected);
578+
FIREBASE_DEV_ASSERT(connection_state_ == kDisconnected);
572579
logger_->LogDebug(
573580
"%s Not opening connection after token refresh, because "
574581
"connection was set to disconnected",
@@ -583,7 +590,7 @@ void PersistentConnection::HandleTokenFuture(Future<std::string> future) {
583590
}
584591

585592
void PersistentConnection::OpenNetworkConnection() {
586-
assert(connection_state_ == kGettingToken);
593+
FIREBASE_DEV_ASSERT(connection_state_ == kGettingToken);
587594

588595
// User might have logged out. Positive auth status is handled after
589596
// authenticating with the server
@@ -823,10 +830,10 @@ void PersistentConnection::PutInternal(const char* action, const Path& path,
823830
}
824831

825832
void PersistentConnection::SendPut(uint64_t write_id) {
826-
assert(CanSendWrites());
833+
FIREBASE_DEV_ASSERT(CanSendWrites());
827834

828835
auto it_put = outstanding_puts_.find(write_id);
829-
assert(it_put != outstanding_puts_.end());
836+
FIREBASE_DEV_ASSERT(it_put != outstanding_puts_.end());
830837

831838
it_put->second->MarkSent();
832839
SendSensitive(it_put->second->action.c_str(), false, it_put->second->data,
@@ -908,7 +915,7 @@ void PersistentConnection::SendSensitive(const char* action, bool sensitive,
908915
ResponsePtr response,
909916
ConnectionResponseHandler callback,
910917
uint64_t outstanding_id) {
911-
assert(realtime_);
918+
FIREBASE_DEV_ASSERT(realtime_);
912919

913920
// Varient only accept int64_t
914921
int64_t rn = ++next_request_id_;
@@ -924,7 +931,7 @@ void PersistentConnection::SendSensitive(const char* action, bool sensitive,
924931
}
925932

926933
void PersistentConnection::RestoreOutstandingRequests() {
927-
assert(connection_state_ == kConnected);
934+
FIREBASE_DEV_ASSERT(connection_state_ == kConnected);
928935

929936
// Restore listens
930937
logger_->LogDebug("%s Restoring outstanding listens", log_id_.c_str());
@@ -950,7 +957,7 @@ void PersistentConnection::RestoreOutstandingRequests() {
950957
}
951958

952959
void PersistentConnection::GetAuthToken(std::string* out) {
953-
assert(out);
960+
FIREBASE_DEV_ASSERT(out);
954961
app_->function_registry()->CallFunction(
955962
::firebase::internal::FnAuthGetCurrentToken, app_, nullptr, out);
956963
}
@@ -1010,7 +1017,7 @@ void PersistentConnection::SendUnauth() {
10101017
void PersistentConnection::HandleAuthTokenResponse(const Variant& message,
10111018
const ResponsePtr& response,
10121019
uint64_t outstanding_id) {
1013-
assert(response);
1020+
FIREBASE_DEV_ASSERT(response);
10141021

10151022
connection_state_ = kConnected;
10161023

database/src/desktop/connection/util_connection.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@ namespace connection {
2323

2424
UniquePtr<WebSocketClientInterface> CreateWebSocketClient(
2525
const HostInfo& info, WebSocketClientEventHandler* delegate,
26-
const char* opt_last_session_id, Logger* logger) {
26+
const char* opt_last_session_id, Logger* logger,
27+
scheduler::Scheduler* scheduler) {
2728
// Currently we use uWebSockets implementation.
2829
std::string uri = info.GetConnectionUrl(opt_last_session_id);
2930
return MakeUnique<WebSocketClientImpl>(uri, info.user_agent(), logger,
30-
delegate);
31+
scheduler, delegate);
3132
}
3233

3334
} // namespace connection

0 commit comments

Comments
 (0)