Skip to content

Commit 7f84e84

Browse files
authored
Fixes for issues discovered in the Unity desktop build (#4795)
* Make Filesystem's destructor virtual * Avoid capturing a strong reference to FirestoreClient in GC tasks. This prevents this task from keeping the FirestoreClient alive even after the api::Firestore instance has been destroyed. * Ensure that FirestoreClient is terminated In the public C++ API, it's possible to delete the owning FirebaseApp which triggers the destruction of the api::Firestore object even though we haven't called Terminate. This makes it such that api::Firestore calls a blocking Terminate call on FirestoreClient in its destructor. Additionally, change AsyncQueue to always run the operation enqueued on shutdown, even if already shutdown. Make FirestoreClient capable of short circuiting the termination work for itself. This makes terminate idempotent in a more staightforward way. * Clean up excess logging * Remove defintion of weak_from_this * Review feedback * Make AsyncQueue extend std::enable_shared_from_this Also hide the AsyncQueue's public constructor and force all users to manage their reference to it via std::shared_ptr. * Handle Executor shutdown from within its own pool
1 parent d80ccb0 commit 7f84e84

File tree

14 files changed

+151
-93
lines changed

14 files changed

+151
-93
lines changed

Firestore/Source/API/FSTFirestoreComponent.mm

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ - (FIRFirestore *)firestoreForDatabase:(NSString *)database {
9999
}
100100

101101
auto executor = Executor::CreateSerial(queue_name.c_str());
102-
auto workerQueue = absl::make_unique<AsyncQueue>(std::move(executor));
102+
auto workerQueue = AsyncQueue::Create(std::move(executor));
103103

104104
id<FIRAuthInterop> auth = FIR_COMPONENT(FIRAuthInterop, self.app.container);
105105
auto credentialsProvider = std::make_shared<FirebaseCredentialsProvider>(self.app, auth);

Firestore/core/src/firebase/firestore/api/firestore.cc

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,16 @@ Firestore::Firestore(model::DatabaseId database_id,
6262
extension_{extension} {
6363
}
6464

65+
Firestore::~Firestore() {
66+
std::lock_guard<std::mutex> lock{mutex_};
67+
68+
// If the client hasn't been configured yet we don't need to create it just
69+
// to tear it down.
70+
if (!client_) return;
71+
72+
client_->Terminate();
73+
}
74+
6575
const std::shared_ptr<FirestoreClient>& Firestore::client() {
6676
HARD_ASSERT(client_, "Client is not yet configured.");
6777
return client_;
@@ -133,7 +143,7 @@ void Firestore::Terminate(util::StatusCallback callback) {
133143
// The client must be initialized to ensure that all subsequent API usage
134144
// throws an exception.
135145
EnsureClientConfigured();
136-
client_->Terminate(std::move(callback));
146+
client_->TerminateAsync(std::move(callback));
137147
}
138148

139149
void Firestore::WaitForPendingWrites(util::StatusCallback callback) {

Firestore/core/src/firebase/firestore/api/firestore.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ class Firestore : public std::enable_shared_from_this<Firestore> {
6060
std::shared_ptr<util::AsyncQueue> worker_queue,
6161
void* extension);
6262

63+
~Firestore();
64+
6365
const model::DatabaseId& database_id() const {
6466
return database_id_;
6567
}

Firestore/core/src/firebase/firestore/core/firestore_client.cc

Lines changed: 32 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -213,9 +213,12 @@ void FirestoreClient::Initialize(const User& user, const Settings& settings) {
213213
void FirestoreClient::ScheduleLruGarbageCollection() {
214214
std::chrono::milliseconds delay =
215215
gc_has_run_ ? regular_gc_delay_ : initial_gc_delay_;
216-
auto shared_this = shared_from_this();
216+
std::weak_ptr<FirestoreClient> weak_this = shared_from_this();
217217
lru_callback_ = worker_queue()->EnqueueAfterDelay(
218-
delay, TimerId::GarbageCollectionDelay, [shared_this] {
218+
delay, TimerId::GarbageCollectionDelay, [weak_this] {
219+
auto shared_this = weak_this.lock();
220+
if (!shared_this) return;
221+
219222
shared_this->local_store_->CollectGarbage(
220223
shared_this->lru_delegate_->garbage_collector());
221224
shared_this->gc_has_run_ = true;
@@ -245,30 +248,42 @@ void FirestoreClient::EnableNetwork(StatusCallback callback) {
245248
});
246249
}
247250

248-
void FirestoreClient::Terminate(StatusCallback callback) {
251+
void FirestoreClient::TerminateAsync(StatusCallback callback) {
249252
auto shared_this = shared_from_this();
250253
worker_queue()->EnqueueAndInitiateShutdown([shared_this, callback] {
251-
shared_this->credentials_provider_->SetCredentialChangeListener(nullptr);
254+
shared_this->TerminateInternal();
252255

253-
// If we've scheduled LRU garbage collection, cancel it.
254-
if (shared_this->lru_callback_) {
255-
shared_this->lru_callback_.Cancel();
256-
}
257-
shared_this->remote_store_->Shutdown();
258-
shared_this->persistence_->Shutdown();
259-
});
260-
261-
// This separate enqueue ensures if `terminate` is called multiple times
262-
// every time the callback is triggered. If it is in the above
263-
// enqueue, it might not get executed because after first `terminate`
264-
// all operations are not executed.
265-
worker_queue()->EnqueueEvenAfterShutdown([shared_this, callback] {
266256
if (callback) {
267257
shared_this->user_executor()->Execute([=] { callback(Status::OK()); });
268258
}
269259
});
270260
}
271261

262+
void FirestoreClient::Terminate() {
263+
std::promise<void> signal_terminated;
264+
worker_queue()->EnqueueAndInitiateShutdown([&, this] {
265+
TerminateInternal();
266+
signal_terminated.set_value();
267+
});
268+
signal_terminated.get_future().wait();
269+
}
270+
271+
void FirestoreClient::TerminateInternal() {
272+
if (!remote_store_) return;
273+
274+
credentials_provider_->SetCredentialChangeListener(nullptr);
275+
276+
// If we've scheduled LRU garbage collection, cancel it.
277+
if (lru_callback_) {
278+
lru_callback_.Cancel();
279+
}
280+
remote_store_->Shutdown();
281+
persistence_->Shutdown();
282+
283+
// Clear the remote store to indicate terminate is complete.
284+
remote_store_.reset();
285+
}
286+
272287
void FirestoreClient::WaitForPendingWrites(StatusCallback callback) {
273288
VerifyNotTerminated();
274289

Firestore/core/src/firebase/firestore/core/firestore_client.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,13 @@ class FirestoreClient : public std::enable_shared_from_this<FirestoreClient> {
8888
* Terminates this client, cancels all writes / listeners, and releases all
8989
* resources.
9090
*/
91-
void Terminate(util::StatusCallback callback);
91+
void TerminateAsync(util::StatusCallback callback);
92+
93+
/**
94+
* Synchronously terminates this client, cancels all writes / listeners, and
95+
* releases all resources.
96+
*/
97+
void Terminate();
9298

9399
/**
94100
* Passes a callback that is triggered when all the pending writes at the
@@ -183,6 +189,8 @@ class FirestoreClient : public std::enable_shared_from_this<FirestoreClient> {
183189

184190
void VerifyNotTerminated();
185191

192+
void TerminateInternal();
193+
186194
void ScheduleLruGarbageCollection();
187195

188196
DatabaseInfo database_info_;

Firestore/core/src/firebase/firestore/util/async_queue.cc

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,13 @@ namespace firebase {
2626
namespace firestore {
2727
namespace util {
2828

29+
std::shared_ptr<AsyncQueue> AsyncQueue::Create(
30+
std::unique_ptr<Executor> executor) {
31+
// Use new because make_shared cannot access a private constructor.
32+
auto queue = new AsyncQueue(std::move(executor));
33+
return std::shared_ptr<AsyncQueue>(queue);
34+
}
35+
2936
AsyncQueue::AsyncQueue(std::unique_ptr<Executor> executor)
3037
: executor_{std::move(executor)} {
3138
is_operation_in_progress_ = false;
@@ -72,9 +79,7 @@ void AsyncQueue::Enqueue(const Operation& operation) {
7279
void AsyncQueue::EnqueueAndInitiateShutdown(const Operation& operation) {
7380
std::lock_guard<std::mutex> lock{shut_down_mutex_};
7481
VerifySequentialOrder();
75-
if (is_shutting_down_) {
76-
return;
77-
}
82+
7883
is_shutting_down_ = true;
7984
executor_->Execute(Wrap(operation));
8085
}
@@ -109,7 +114,7 @@ DelayedOperation AsyncQueue::EnqueueAfterDelay(Milliseconds delay,
109114
return DelayedOperation();
110115
}
111116

112-
// Skip delays for timer_ids that have been overriden
117+
// Skip delays for timer_ids that have been overridden
113118
if (absl::c_linear_search(timer_ids_to_skip_, timer_id)) {
114119
delay = Milliseconds(0);
115120
}
@@ -123,7 +128,8 @@ AsyncQueue::Operation AsyncQueue::Wrap(const Operation& operation) {
123128
// ensure that it doesn't spawn any nested operations.
124129

125130
// Note: can't move `operation` into lambda until C++14.
126-
return [this, operation] { ExecuteBlocking(operation); };
131+
auto shared_this = shared_from_this();
132+
return [shared_this, operation] { shared_this->ExecuteBlocking(operation); };
127133
}
128134

129135
void AsyncQueue::VerifySequentialOrder() const {

Firestore/core/src/firebase/firestore/util/async_queue.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,12 +84,12 @@ enum class TimerId {
8484
//
8585
// A significant portion of `AsyncQueue` interface only exists for test purposes
8686
// and must *not* be used in regular code.
87-
class AsyncQueue {
87+
class AsyncQueue : public std::enable_shared_from_this<AsyncQueue> {
8888
public:
8989
using Operation = Executor::Operation;
9090
using Milliseconds = Executor::Milliseconds;
9191

92-
explicit AsyncQueue(std::unique_ptr<Executor> executor);
92+
static std::shared_ptr<AsyncQueue> Create(std::unique_ptr<Executor> executor);
9393

9494
// Asserts for the caller that it is being invoked as part of an operation on
9595
// the `AsyncQueue`.
@@ -186,6 +186,8 @@ class AsyncQueue {
186186
void SkipDelaysForTimerId(TimerId timer_id);
187187

188188
private:
189+
explicit AsyncQueue(std::unique_ptr<Executor> executor);
190+
189191
Operation Wrap(const Operation& operation);
190192

191193
// Asserts that the current invocation happens asynchronously on the queue.

Firestore/core/src/firebase/firestore/util/executor_std.cc

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ std::string ThreadIdToString(const std::thread::id thread_id) {
4040

4141
// MARK: - ExecutorStd
4242

43-
ExecutorStd::ExecutorStd(int threads) {
43+
ExecutorStd::ExecutorStd(int threads)
44+
: shutting_down_(std::make_shared<std::atomic<bool>>()) {
4445
HARD_ASSERT(threads > 0);
4546

4647
// Somewhat counter-intuitively, constructor of `std::atomic` assigns the
@@ -49,14 +50,14 @@ ExecutorStd::ExecutorStd(int threads) {
4950
// See [this thread](https://stackoverflow.com/questions/25609858) for context
5051
// on the constructor.
5152
current_id_ = 0;
52-
shutting_down_ = false;
53+
*shutting_down_ = false;
5354
for (int i = 0; i < threads; ++i) {
5455
worker_thread_pool_.emplace_back(&ExecutorStd::PollingThread, this);
5556
}
5657
}
5758

5859
ExecutorStd::~ExecutorStd() {
59-
shutting_down_ = true;
60+
*shutting_down_ = true;
6061

6162
// Make sure the worker threads are not blocked, so that the call to `join`
6263
// doesn't hang. It's not deterministic which thread will pick up an entry,
@@ -66,7 +67,14 @@ ExecutorStd::~ExecutorStd() {
6667
}
6768

6869
for (std::thread& thread : worker_thread_pool_) {
69-
thread.join();
70+
// If the current thread is running this destructor, we can't join the
71+
// thread. Instead detach it and rely on PollingThread to notice that
72+
// *shutting_down_ is now true.
73+
if (std::this_thread::get_id() == thread.get_id()) {
74+
thread.detach();
75+
} else {
76+
thread.join();
77+
}
7078
}
7179
}
7280

@@ -105,7 +113,10 @@ ExecutorStd::Id ExecutorStd::PushOnSchedule(Operation&& operation,
105113
}
106114

107115
void ExecutorStd::PollingThread() {
108-
while (!shutting_down_) {
116+
// Keep a local shared_ptr here to ensure that the atomic pointed to by
117+
// shutting_down_ remains valid even after the destruction of the executor.
118+
std::shared_ptr<std::atomic<bool>> local_shutting_down = shutting_down_;
119+
while (!*local_shutting_down) {
109120
Entry entry = schedule_.PopBlocking();
110121
if (entry.tagged.operation) {
111122
entry.tagged.operation();

Firestore/core/src/firebase/firestore/util/executor_std.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <atomic>
2222
#include <condition_variable> // NOLINT(build/c++11)
2323
#include <deque>
24+
#include <memory>
2425
#include <mutex> // NOLINT(build/c++11)
2526
#include <string>
2627
#include <thread> // NOLINT(build/c++11)
@@ -273,7 +274,7 @@ class ExecutorStd : public Executor {
273274

274275
std::vector<std::thread> worker_thread_pool_;
275276
// Used to stop the worker thread.
276-
std::atomic<bool> shutting_down_{false};
277+
std::shared_ptr<std::atomic<bool>> shutting_down_;
277278

278279
std::atomic<Id> current_id_{0};
279280
};

Firestore/core/src/firebase/firestore/util/filesystem.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ class StatusOr;
3737
*/
3838
class Filesystem {
3939
public:
40+
virtual ~Filesystem() = default;
41+
4042
Filesystem(const Filesystem&) = delete;
4143
Filesystem& operator=(const Filesystem&) = delete;
4244

0 commit comments

Comments
 (0)