Skip to content

Commit c0613cf

Browse files
authored
Index Manager handles index states (#9384)
1 parent 5856e5d commit c0613cf

31 files changed

+400
-105
lines changed

Firestore/core/src/local/leveldb_index_manager.cc

Lines changed: 53 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <utility>
2222
#include <vector>
2323

24+
#include "Firestore/core/src/credentials/user.h"
2425
#include "Firestore/core/src/local/leveldb_key.h"
2526
#include "Firestore/core/src/local/leveldb_persistence.h"
2627
#include "Firestore/core/src/local/local_serializer.h"
@@ -35,6 +36,7 @@ namespace firebase {
3536
namespace firestore {
3637
namespace local {
3738

39+
using credentials::User;
3840
using model::DocumentKey;
3941
using model::FieldIndex;
4042
using model::IndexState;
@@ -49,21 +51,15 @@ struct DbIndexState {
4951
int32_t nanos;
5052
std::string key;
5153
model::ListenSequenceNumber sequence_number;
54+
model::BatchId largest_batch_id;
5255
};
5356

54-
// TODO(wuandy): Uncomment this when needed.
55-
// void to_json(json& j, const DbIndexState& s) {
56-
// j = json{{"seconds", s.seconds},
57-
// {"nanos", s.nanos},
58-
// {"key", s.key},
59-
// {"seq_num", s.sequence_number}};
60-
//}
61-
6257
void from_json(const json& j, DbIndexState& s) {
6358
j.at("seconds").get_to(s.seconds);
6459
j.at("nanos").get_to(s.nanos);
6560
j.at("key").get_to(s.key);
6661
j.at("seq_num").get_to(s.sequence_number);
62+
j.at("largest_batch").get_to(s.largest_batch_id);
6763
}
6864

6965
IndexState DecodeIndexState(const std::string& encoded) {
@@ -72,16 +68,34 @@ IndexState DecodeIndexState(const std::string& encoded) {
7268
auto db_state = j.get<DbIndexState>();
7369
return {db_state.sequence_number,
7470
SnapshotVersion(Timestamp(db_state.seconds, db_state.nanos)),
75-
DocumentKey::FromPathString(db_state.key)};
71+
DocumentKey::FromPathString(db_state.key), db_state.largest_batch_id};
72+
}
73+
74+
std::string EncodeIndexState(const IndexState& state) {
75+
return json{
76+
{"seconds", state.index_offset().read_time().timestamp().seconds()},
77+
{"nanos", state.index_offset().read_time().timestamp().nanoseconds()},
78+
{"key", state.index_offset().document_key().ToString()},
79+
{"seq_num", state.sequence_number()},
80+
{"largest_batch", state.index_offset().largest_batch_id()}}
81+
.dump();
7682
}
7783

7884
} // namespace
7985

80-
LevelDbIndexManager::LevelDbIndexManager(LevelDbPersistence* db,
86+
LevelDbIndexManager::LevelDbIndexManager(const User& user,
87+
LevelDbPersistence* db,
8188
LocalSerializer* serializer)
82-
: db_(db), serializer_(serializer) {
89+
: db_(db), serializer_(serializer), uid_(user.uid()) {
90+
// The contract for this comparison expected by priority queue is
91+
// `std::less`, but std::priority_queue's default order is descending.
92+
// We change the order to be ascending by doing left >= right instead.
8393
auto cmp = [](FieldIndex* left, FieldIndex* right) {
84-
return left->index_state().sequence_number() <
94+
if (left->index_state().sequence_number() ==
95+
right->index_state().sequence_number()) {
96+
return left->collection_group() >= right->collection_group();
97+
}
98+
return left->index_state().sequence_number() >
8599
right->index_state().sequence_number();
86100
};
87101
next_index_to_update_ = std::priority_queue<
@@ -225,7 +239,8 @@ void LevelDbIndexManager::MemoizeIndex(FieldIndex index) {
225239
}
226240

227241
// Moves `index` into `existing_indexes`.
228-
existing_indexes.insert({index_id, std::move(index)});
242+
existing_indexes[index_id] = std::move(index);
243+
229244
// next_index_to_update_ holds a pointer to the index owned by
230245
// `existing_indexes`.
231246
next_index_to_update_.push(&existing_indexes.find(index_id)->second);
@@ -300,9 +315,11 @@ std::vector<FieldIndex> LevelDbIndexManager::GetFieldIndexes(
300315
HARD_ASSERT(started_, "IndexManager not started");
301316

302317
std::vector<FieldIndex> result;
303-
const auto& indexes = memoized_indexes_[collection_group];
304-
for (const auto& entry : indexes) {
305-
result.push_back(entry.second);
318+
const auto iter = memoized_indexes_.find(collection_group);
319+
if (iter != memoized_indexes_.end()) {
320+
for (const auto& entry : iter->second) {
321+
result.push_back(entry.second);
322+
}
306323
}
307324

308325
return result;
@@ -335,13 +352,30 @@ LevelDbIndexManager::GetDocumentsMatchingTarget(model::FieldIndex field_index,
335352

336353
absl::optional<std::string>
337354
LevelDbIndexManager::GetNextCollectionGroupToUpdate() {
338-
return {};
355+
if (next_index_to_update_.empty()) {
356+
return absl::nullopt;
357+
}
358+
359+
return next_index_to_update_.top()->collection_group();
339360
}
340361

341362
void LevelDbIndexManager::UpdateCollectionGroup(
342363
const std::string& collection_group, model::IndexOffset offset) {
343-
(void)collection_group;
344-
(void)offset;
364+
HARD_ASSERT(started_, "IndexManager not started");
365+
366+
++memoized_max_sequence_number_;
367+
for (const auto& field_index : GetFieldIndexes(collection_group)) {
368+
IndexState updated_state{memoized_max_sequence_number_, offset};
369+
370+
auto state_key = LevelDbIndexStateKey::Key(uid_, field_index.index_id());
371+
auto val = EncodeIndexState(updated_state);
372+
db_->current_transaction()->Put(std::move(state_key),
373+
EncodeIndexState(updated_state));
374+
375+
MemoizeIndex(FieldIndex{field_index.index_id(),
376+
field_index.collection_group(),
377+
field_index.segments(), std::move(updated_state)});
378+
}
345379
}
346380

347381
void LevelDbIndexManager::UpdateIndexEntries(

Firestore/core/src/local/leveldb_index_manager.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@
2828

2929
namespace firebase {
3030
namespace firestore {
31+
32+
namespace credentials {
33+
class User;
34+
} // namespace credentials
35+
3136
namespace local {
3237

3338
class LevelDbPersistence;
@@ -36,7 +41,8 @@ class LocalSerializer;
3641
/** A persisted implementation of IndexManager. */
3742
class LevelDbIndexManager : public IndexManager {
3843
public:
39-
explicit LevelDbIndexManager(LevelDbPersistence* db,
44+
explicit LevelDbIndexManager(const credentials::User& user,
45+
LevelDbPersistence* db,
4046
LocalSerializer* serializer);
4147

4248
void Start() override;

Firestore/core/src/local/leveldb_mutation_queue.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,10 @@ BatchId LoadNextBatchIdFromDb(DB* db) {
126126

127127
LevelDbMutationQueue::LevelDbMutationQueue(const User& user,
128128
LevelDbPersistence* db,
129+
IndexManager* index_manager,
129130
LocalSerializer* serializer)
130131
: db_(NOT_NULL(db)),
132+
index_manager_(NOT_NULL(index_manager)),
131133
serializer_(NOT_NULL(serializer)),
132134
user_id_(user.is_authenticated() ? user.uid() : "") {
133135
}
@@ -177,8 +179,7 @@ MutationBatch LevelDbMutationQueue::AddMutationBatch(
177179
key = LevelDbDocumentMutationKey::Key(user_id_, mutation.key(), batch_id);
178180
db_->current_transaction()->Put(key, empty_buffer);
179181

180-
db_->index_manager()->AddToCollectionParentIndex(
181-
mutation.key().path().PopLast());
182+
index_manager_->AddToCollectionParentIndex(mutation.key().path().PopLast());
182183
}
183184

184185
return batch;

Firestore/core/src/local/leveldb_mutation_queue.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <vector>
2323

2424
#include "Firestore/Protos/nanopb/firestore/local/mutation.nanopb.h"
25+
#include "Firestore/core/src/local/leveldb_index_manager.h"
2526
#include "Firestore/core/src/local/mutation_queue.h"
2627
#include "Firestore/core/src/model/model_fwd.h"
2728
#include "Firestore/core/src/model/types.h"
@@ -53,6 +54,7 @@ class LevelDbMutationQueue : public MutationQueue {
5354
public:
5455
LevelDbMutationQueue(const credentials::User& user,
5556
LevelDbPersistence* db,
57+
IndexManager* index_manager,
5658
LocalSerializer* serializer);
5759

5860
void Start() override;
@@ -114,6 +116,7 @@ class LevelDbMutationQueue : public MutationQueue {
114116

115117
// The LevelDbMutationQueue instance is owned by LevelDbPersistence.
116118
LevelDbPersistence* db_;
119+
IndexManager* index_manager_;
117120

118121
// Owned by LevelDbPersistence.
119122
LocalSerializer* serializer_ = nullptr;

Firestore/core/src/local/leveldb_persistence.cc

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@ LevelDbPersistence::LevelDbPersistence(std::unique_ptr<leveldb::DB> db,
114114
target_cache_ = absl::make_unique<LevelDbTargetCache>(this, &serializer_);
115115
document_cache_ =
116116
absl::make_unique<LevelDbRemoteDocumentCache>(this, &serializer_);
117-
index_manager_ = absl::make_unique<LevelDbIndexManager>(this, &serializer_);
118117
reference_delegate_ =
119118
absl::make_unique<LevelDbLruReferenceDelegate>(this, lru_params);
120119
bundle_cache_ = absl::make_unique<LevelDbBundleCache>(this, &serializer_);
@@ -225,11 +224,11 @@ void LevelDbPersistence::Shutdown() {
225224
db_.reset();
226225
}
227226

228-
LevelDbMutationQueue* LevelDbPersistence::GetMutationQueueForUser(
229-
const credentials::User& user) {
227+
LevelDbMutationQueue* LevelDbPersistence::GetMutationQueue(
228+
const credentials::User& user, IndexManager* manager) {
230229
users_.insert(user.uid());
231-
current_mutation_queue_ =
232-
absl::make_unique<LevelDbMutationQueue>(user, this, &serializer_);
230+
current_mutation_queue_ = absl::make_unique<LevelDbMutationQueue>(
231+
user, this, dynamic_cast<LevelDbIndexManager*>(manager), &serializer_);
233232
return current_mutation_queue_.get();
234233
}
235234

@@ -241,7 +240,11 @@ LevelDbRemoteDocumentCache* LevelDbPersistence::remote_document_cache() {
241240
return document_cache_.get();
242241
}
243242

244-
LevelDbIndexManager* LevelDbPersistence::index_manager() {
243+
LevelDbIndexManager* LevelDbPersistence::GetIndexManager(
244+
const credentials::User& user) {
245+
users_.insert(user.uid());
246+
index_manager_ =
247+
absl::make_unique<LevelDbIndexManager>(user, this, &serializer_);
245248
return index_manager_.get();
246249
}
247250

@@ -253,7 +256,7 @@ LevelDbBundleCache* LevelDbPersistence::bundle_cache() {
253256
return bundle_cache_.get();
254257
}
255258

256-
LevelDbDocumentOverlayCache* LevelDbPersistence::document_overlay_cache(
259+
LevelDbDocumentOverlayCache* LevelDbPersistence::GetDocumentOverlayCache(
257260
const User& user) {
258261
users_.insert(user.uid());
259262
current_document_overlay_cache_ =

Firestore/core/src/local/leveldb_persistence.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,17 +81,17 @@ class LevelDbPersistence : public Persistence {
8181

8282
LevelDbBundleCache* bundle_cache() override;
8383

84-
LevelDbDocumentOverlayCache* document_overlay_cache(
84+
LevelDbDocumentOverlayCache* GetDocumentOverlayCache(
8585
const credentials::User& user) override;
8686

87-
LevelDbMutationQueue* GetMutationQueueForUser(
88-
const credentials::User& user) override;
87+
LevelDbMutationQueue* GetMutationQueue(const credentials::User& user,
88+
IndexManager* index_manager) override;
8989

9090
LevelDbTargetCache* target_cache() override;
9191

9292
LevelDbRemoteDocumentCache* remote_document_cache() override;
9393

94-
LevelDbIndexManager* index_manager() override;
94+
LevelDbIndexManager* GetIndexManager(const credentials::User& user) override;
9595

9696
LevelDbLruReferenceDelegate* reference_delegate() override;
9797

Firestore/core/src/local/leveldb_remote_document_cache.cc

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,8 +115,8 @@ void LevelDbRemoteDocumentCache::Add(const MutableDocument& document,
115115
path.PopLast(), read_time, path.last_segment());
116116
db_->current_transaction()->Put(ldb_read_time_key, "");
117117

118-
db_->index_manager()->AddToCollectionParentIndex(
119-
document.key().path().PopLast());
118+
NOT_NULL(index_manager_);
119+
index_manager_->AddToCollectionParentIndex(document.key().path().PopLast());
120120
}
121121

122122
void LevelDbRemoteDocumentCache::Remove(const DocumentKey& key) {
@@ -285,6 +285,10 @@ MutableDocument LevelDbRemoteDocumentCache::DecodeMaybeDocument(
285285
return maybe_document;
286286
}
287287

288+
void LevelDbRemoteDocumentCache::SetIndexManager(IndexManager* manager) {
289+
index_manager_ = NOT_NULL(manager);
290+
}
291+
288292
} // namespace local
289293
} // namespace firestore
290294
} // namespace firebase

Firestore/core/src/local/leveldb_remote_document_cache.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <thread> // NOLINT(build/c++11)
2222
#include <vector>
2323

24+
#include "Firestore/core/src/local/leveldb_index_manager.h"
2425
#include "Firestore/core/src/local/remote_document_cache.h"
2526
#include "Firestore/core/src/model/model_fwd.h"
2627
#include "Firestore/core/src/model/types.h"
@@ -55,6 +56,8 @@ class LevelDbRemoteDocumentCache : public RemoteDocumentCache {
5556
const core::Query& query,
5657
const model::SnapshotVersion& since_read_time) override;
5758

59+
void SetIndexManager(IndexManager* manager) override;
60+
5861
private:
5962
/**
6063
* Looks up a set of entries in the cache, returning only existing entries of
@@ -67,6 +70,8 @@ class LevelDbRemoteDocumentCache : public RemoteDocumentCache {
6770

6871
// The LevelDbRemoteDocumentCache instance is owned by LevelDbPersistence.
6972
LevelDbPersistence* db_;
73+
// The LevelDbIndexManager instance is owned by LevelDbPersistence.
74+
IndexManager* index_manager_ = nullptr;
7075
// Owned by LevelDbPersistence.
7176
LocalSerializer* serializer_ = nullptr;
7277

Firestore/core/src/local/local_store.cc

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -82,15 +82,16 @@ LocalStore::LocalStore(Persistence* persistence,
8282
QueryEngine* query_engine,
8383
const User& initial_user)
8484
: persistence_(persistence),
85-
mutation_queue_(persistence->GetMutationQueueForUser(initial_user)),
8685
remote_document_cache_(persistence->remote_document_cache()),
8786
target_cache_(persistence->target_cache()),
8887
bundle_cache_(persistence->bundle_cache()),
89-
query_engine_(query_engine),
90-
local_documents_(
91-
absl::make_unique<LocalDocumentsView>(remote_document_cache_,
92-
mutation_queue_,
93-
persistence->index_manager())) {
88+
query_engine_(query_engine) {
89+
index_manager_ = persistence->GetIndexManager(initial_user);
90+
mutation_queue_ = persistence->GetMutationQueue(initial_user, index_manager_);
91+
local_documents_ = absl::make_unique<LocalDocumentsView>(
92+
remote_document_cache_, mutation_queue_, index_manager_);
93+
remote_document_cache_->SetIndexManager(index_manager_);
94+
9495
persistence->reference_delegate()->AddInMemoryPins(&local_view_references_);
9596
target_id_generator_ = TargetIdGenerator::TargetCacheTargetIdGenerator(0);
9697
query_engine_->SetLocalDocumentsView(local_documents_.get());
@@ -117,7 +118,9 @@ DocumentMap LocalStore::HandleUserChange(const User& user) {
117118

118119
// The old one has a reference to the mutation queue, so null it out first.
119120
local_documents_.reset();
120-
mutation_queue_ = persistence_->GetMutationQueueForUser(user);
121+
index_manager_ = persistence_->GetIndexManager(user);
122+
mutation_queue_ = persistence_->GetMutationQueue(user, index_manager_);
123+
remote_document_cache_->SetIndexManager(index_manager_);
121124

122125
StartMutationQueue();
123126

@@ -127,7 +130,7 @@ DocumentMap LocalStore::HandleUserChange(const User& user) {
127130

128131
// Recreate our LocalDocumentsView using the new MutationQueue.
129132
local_documents_ = absl::make_unique<LocalDocumentsView>(
130-
remote_document_cache_, mutation_queue_, persistence_->index_manager());
133+
remote_document_cache_, mutation_queue_, index_manager_);
131134
query_engine_->SetLocalDocumentsView(local_documents_.get());
132135

133136
// Union the old/new changed keys.

Firestore/core/src/local/local_store.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ class TargetChange;
5151
namespace local {
5252

5353
class BundleCache;
54+
class IndexManager;
5455
class LocalDocumentsView;
5556
class LocalViewChanges;
5657
class LocalWriteResult;
@@ -353,6 +354,11 @@ class LocalStore : public bundle::BundleCallback {
353354
*/
354355
QueryEngine* query_engine_ = nullptr;
355356

357+
/**
358+
* Manages indexes and support indexed queries.
359+
*/
360+
IndexManager* index_manager_ = nullptr;
361+
356362
/**
357363
* The "local" view of all documents (layering mutation queue on top of
358364
* remote_document_cache_).

0 commit comments

Comments
 (0)