Skip to content

Commit 5a707cd

Browse files
committed
MB-41321: 1/4 Defer collection statistic updates until successful commit
Collection statistics (items, disk size and persistent high-seqno) for persistent buckets are managed by the flusher thread. This is done because we maintain statistic documents in KVStore (e.g. _local documents), and as the flusher writes documents to KVStore, we can count and update these statistic documents along with the documents. Warmup for example can open a KVStore and know the count and size of all collections just by reading the statistic documents. Prior to this change the visible collection statistics were updated by callbacks invoked by the KVStore before 'commit'. So cmd_stat etc... are reading counters that are written to from the flusher - if the KVStore update fails the flusher retries the batch of items, which means that the counters are updated multiple times, giving the wrong counters or in-case of deletes underflow exceptions. As part of this fix it was also identified that the collections flusher code (collections/flush.cc) made decisions about documents in the flush batch and documents already stored on disk by querying the VB::Manifest object. This is a flawed approach because the VB::Manifest contains changes that are not yet 'durable', i.e. we may update the statistics on disk based on the VB::Manifest saying a collection was dropped, but that drop is somewhere in a yet-to-be persisted checkpoint - warm-up and that collection drop redacts but it's too late for the statistic updates, the values could now be wrong. This commit changes how collection statistics are updated to occur in multiple steps. 1) As the items of the batch are processed, we now update a flusher owned map of collection-ID to collection statistics - this collects the 'deltas' of changes being made by the flusher batch. 2) Before commit we read the current collection statistics and apply the collected changes to generate statistics for the '_local' updates. 3) If the commit is successful, we update the current in-memory statistics. As part of these steps the changes to collection persisted statistics now don't use the VB::Manifest 'map' of what are current collections, except for the final 'make visible' write. The functions doing the updates now make decisions about dropped collections based on the current flush data, which knows if collections were dropped in the batch (and the sequence number of the drop), and secondly (for existing items in the document update case) what collections have already been dropped in the KVStore we are updating. Finally to allow for the flusher to make changes to a collection's statistics and avoid a cycle of "read statistics, update, write statistics" the VB::Manifest is modified so that it doesn't immediately discard the count/size/seqno of dropped collections. This allows the flusher to just do 'update and write' (which is what it has always done for collections). Change-Id: Ib3065457057bbeca983849cef4c5e1cb2854343c Reviewed-on: http://review.couchbase.org/c/kv_engine/+/136878 Tested-by: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]>
1 parent f285f2f commit 5a707cd

27 files changed

+1117
-279
lines changed

engines/ep/src/collections/collections_types.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,13 @@ using IsVisibleFunction =
354354
namespace VB {
355355
enum class ManifestUpdateStatus { Success, Behind, EqualUidWithDifferences };
356356
std::string to_string(ManifestUpdateStatus);
357+
358+
/// values required by the flusher to calculate new collection statistics
359+
struct StatsForFlush {
360+
uint64_t itemCount;
361+
size_t diskSize;
362+
};
363+
357364
} // namespace VB
358365

359366
} // end namespace Collections

engines/ep/src/collections/flush.cc

Lines changed: 165 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -26,164 +26,126 @@
2626

2727
namespace Collections::VB {
2828

29-
// Helper class for doing collection stat updates
30-
class StatsUpdate {
31-
public:
32-
explicit StatsUpdate(CachingReadHandle&& handle)
33-
: handle(std::move(handle)) {
29+
Flush::StatisticsUpdate& Flush::getStatsAndMaybeSetPersistedHighSeqno(
30+
CollectionID cid, uint64_t seqno) {
31+
auto [itr, inserted] = stats.try_emplace(cid, StatisticsUpdate{seqno});
32+
auto& [key, value] = *itr;
33+
(void)key;
34+
if (!inserted) {
35+
value.maybeSetPersistedHighSeqno(seqno);
3436
}
3537

36-
/**
37-
* An item is being inserted into the collection
38-
* @param isCommitted the prepare/commit state of the item inserted
39-
* @param isDelete alive/delete stats of the item inserted
40-
* @param diskSizeDelta the +/- bytes the insert changes disk-used by
41-
*/
42-
void insert(bool isCommitted, bool isDelete, ssize_t diskSizeDelta);
43-
44-
/**
45-
* An item is being updated in the collection
46-
* @param isCommitted the prepare/commit state of the item updated
47-
* @param diskSizeDelta the +/- bytes the update changes disk-used by
48-
*/
49-
void update(bool isCommitted, ssize_t diskSizeDelta);
50-
51-
/**
52-
* An item is being removed (deleted) from the collection
53-
* @param isCommitted the prepare/commit state of the item removed
54-
* @param diskSizeDelta the +/- bytes the delete changes disk-used by
55-
*/
56-
void remove(bool isCommitted, ssize_t diskSizeDelta);
57-
58-
/**
59-
* @return true if the seqno represents a logically deleted item for the
60-
* locked collection.
61-
*/
62-
bool isLogicallyDeleted(uint64_t seqno) const;
63-
64-
/**
65-
* Increment the item count for the collection associated with the key
66-
*/
67-
void incrementItemCount();
68-
69-
/**
70-
* Decrement the item count for the collection associated with the key
71-
*/
72-
void decrementItemCount();
73-
74-
/**
75-
* Update the on disk size (bytes) for the collection associated with
76-
* the key
77-
*/
78-
void updateDiskSize(ssize_t delta);
79-
80-
private:
81-
/// handle on the collection
82-
CachingReadHandle handle;
83-
};
84-
85-
bool StatsUpdate::isLogicallyDeleted(uint64_t seqno) const {
86-
return handle.isLogicallyDeleted(seqno);
38+
return value;
8739
}
8840

89-
void StatsUpdate::incrementItemCount() {
90-
if (!handle.getKey().isInSystemCollection()) {
91-
handle.incrementItemCount();
41+
void Flush::StatisticsUpdate::maybeSetPersistedHighSeqno(uint64_t seqno) {
42+
if (seqno > persistedHighSeqno) {
43+
persistedHighSeqno = seqno;
9244
}
9345
}
9446

95-
void StatsUpdate::decrementItemCount() {
96-
if (!handle.getKey().isInSystemCollection()) {
97-
handle.decrementItemCount();
98-
}
47+
void Flush::StatisticsUpdate::incrementItemCount() {
48+
itemCount++;
9949
}
10050

101-
void StatsUpdate::updateDiskSize(ssize_t delta) {
102-
handle.updateDiskSize(delta);
51+
void Flush::StatisticsUpdate::decrementItemCount() {
52+
itemCount--;
10353
}
10454

105-
static std::optional<StatsUpdate> tryTolockAndSetPersistedSeqno(
106-
Flush& flush, const DocKey& key, uint64_t seqno, bool isCommitted) {
107-
if (key.isInSystemCollection()) {
108-
// Is it a collection system event?
109-
auto [event, id] = SystemEventFactory::getTypeAndID(key);
110-
switch (event) {
111-
case SystemEvent::Collection: {
112-
auto handle =
113-
flush.getManifest().lock(key, Manifest::AllowSystemKeys{});
114-
if (handle.setPersistedHighSeqno(seqno)) {
115-
// Update the 'mutated' set, stats are changing
116-
flush.setMutated(CollectionID(id));
117-
} else {
118-
// Cannot set the seqno (flushing dropped items) no more updates
119-
return {};
120-
}
121-
return StatsUpdate{std::move(handle)};
122-
}
123-
case SystemEvent::Scope:
124-
break;
125-
}
126-
return {};
127-
}
128-
129-
auto handle = flush.getManifest().lock(key);
130-
131-
if (isCommitted) {
132-
if (handle.setPersistedHighSeqno(seqno)) {
133-
// Update the 'mutated' set, stats are changing
134-
flush.setMutated(key.getCollectionID());
135-
return StatsUpdate{std::move(handle)};
136-
} else {
137-
// Cannot set the seqno (flushing dropped items) no more updates
138-
return {};
139-
}
140-
}
141-
142-
return StatsUpdate{std::move(handle)};
55+
void Flush::StatisticsUpdate::updateDiskSize(ssize_t delta) {
56+
diskSize += delta;
14357
}
14458

145-
void StatsUpdate::insert(bool isCommitted,
146-
bool isDelete,
147-
ssize_t diskSizeDelta) {
148-
if (!isDelete && isCommitted) {
59+
void Flush::StatisticsUpdate::insert(bool isSystem,
60+
bool isCommitted,
61+
bool isDelete,
62+
ssize_t diskSizeDelta) {
63+
if (!isSystem && !isDelete && isCommitted) {
14964
incrementItemCount();
15065
} // else inserting a tombstone or it's a prepare
15166

152-
if (isCommitted) {
67+
if (isSystem || isCommitted) {
15368
updateDiskSize(diskSizeDelta);
15469
}
15570
}
15671

157-
void StatsUpdate::update(bool isCommitted, ssize_t diskSizeDelta) {
158-
if (isCommitted) {
72+
void Flush::StatisticsUpdate::update(bool isSystem,
73+
bool isCommitted,
74+
ssize_t diskSizeDelta) {
75+
if (isSystem || isCommitted) {
15976
updateDiskSize(diskSizeDelta);
16077
}
16178
}
16279

163-
void StatsUpdate::remove(bool isCommitted, ssize_t diskSizeDelta) {
80+
void Flush::StatisticsUpdate::remove(bool isSystem,
81+
bool isCommitted,
82+
ssize_t diskSizeDelta) {
16483
if (isCommitted) {
16584
decrementItemCount();
16685
} // else inserting a tombstone or it's a prepare
16786

168-
if (isCommitted) {
87+
if (isSystem || isCommitted) {
16988
updateDiskSize(diskSizeDelta);
17089
}
17190
}
17291

92+
// Called from KVStore during the flush process and before we consider the
93+
// data of the flush to be committed. This method iterates through the
94+
// statistics gathered by the Flush and uses the std::function callback to
95+
// have the KVStore implementation write them to storage, e.g. a local document.
17396
void Flush::saveCollectionStats(
174-
std::function<void(CollectionID, PersistedStats)> cb) const {
175-
for (CollectionID c : mutated) {
176-
PersistedStats stats;
177-
{
178-
auto lock = manifest.lock(c);
179-
if (!lock.valid()) {
180-
// Can be flushing for a dropped collection (no longer in the
181-
// manifest)
182-
continue;
183-
}
184-
stats = lock.getPersistedStats();
97+
std::function<void(CollectionID, const PersistedStats&)> cb) const {
98+
// For each collection modified in the flush run ask the VBM for the
99+
// current stats (using the high-seqno so we find the correct generation
100+
// of stats)
101+
for (const auto& [cid, flushStats] : stats) {
102+
// Get the current stats of the collection (for the seqno)
103+
auto lock = manifest.lock();
104+
auto stats =
105+
lock.getStatsForFlush(cid, flushStats.getPersistedHighSeqno());
106+
107+
// Generate new stats, add the deltas from this flush batch for count
108+
// and size and set the high-seqno
109+
PersistedStats ps(stats.itemCount + flushStats.getItemCount(),
110+
flushStats.getPersistedHighSeqno(),
111+
stats.diskSize + flushStats.getDiskSize());
112+
cb(cid, ps);
113+
}
114+
}
115+
116+
// Called from KVStore after a successful commit.
117+
// This method will iterate through all of the collection stats that the Flush
118+
// gathered and attempt to update the VB::Manifest (which is where cmd_stat
119+
// reads from). This method has to consider that the VB::Manifest can
120+
// be modified by changes to the collections manifest during the flush, for
121+
// example by the time we've gathered statistics about a collection, the
122+
// VB::Manifest may have 1) dropped the collection 2) dropped and recreated
123+
// the collection - in either of these cases the gathered statistics are no
124+
// longer applicable and are not pushed to the VB::Manifest.
125+
void Flush::postCommitMakeStatsVisible() {
126+
for (const auto& [cid, flushStats] : stats) {
127+
auto lock = manifest.lock(cid);
128+
if (!lock.valid() ||
129+
lock.isLogicallyDeleted(flushStats.getPersistedHighSeqno())) {
130+
// Can be flushing for a dropped collection (no longer in the
131+
// manifest, or was flushed/recreated at a new seqno)
132+
continue;
185133
}
186-
cb(c, stats);
134+
// update the stats with the changes collected by the flusher
135+
lock.updateItemCount(flushStats.getItemCount());
136+
lock.setPersistedHighSeqno(flushStats.getPersistedHighSeqno());
137+
lock.updateDiskSize(flushStats.getDiskSize());
138+
}
139+
}
140+
141+
void Flush::flushSuccess(Vbid vbid, KVBucket& bucket) {
142+
notifyManifestOfAnyDroppedCollections();
143+
checkAndTriggerPurge(vbid, bucket);
144+
}
145+
146+
void Flush::notifyManifestOfAnyDroppedCollections() {
147+
for (const auto& [cid, droppedData] : droppedCollections) {
148+
manifest.collectionDropPersisted(cid, droppedData.endSeqno);
187149
}
188150
}
189151

@@ -199,15 +161,46 @@ void Flush::triggerPurge(Vbid vbid, KVBucket& bucket) {
199161
bucket.scheduleCompaction(config, nullptr);
200162
}
201163

164+
static std::pair<bool, std::optional<CollectionID>> getCollectionID(
165+
const DocKey& key) {
166+
bool isSystemEvent = key.isInSystemCollection();
167+
CollectionID cid;
168+
if (isSystemEvent) {
169+
auto [event, id] = SystemEventFactory::getTypeAndID(key);
170+
switch (event) {
171+
case SystemEvent::Collection: {
172+
cid = CollectionID(id);
173+
break;
174+
}
175+
case SystemEvent::Scope:
176+
return {true, {}};
177+
}
178+
} else {
179+
cid = key.getCollectionID();
180+
}
181+
return {isSystemEvent, cid};
182+
}
183+
202184
void Flush::updateStats(const DocKey& key,
203185
uint64_t seqno,
204186
bool isCommitted,
205187
bool isDelete,
206188
size_t size) {
207-
auto update = tryTolockAndSetPersistedSeqno(*this, key, seqno, isCommitted);
208-
if (update) {
209-
update->insert(isCommitted, isDelete, size);
189+
auto [isSystemEvent, cid] = getCollectionID(key);
190+
191+
if (!cid || isLogicallyDeleted(cid.value(), seqno)) {
192+
// 1) The key is not for a collection (could be a scope event) or
193+
// 2) The key belongs to a collection now dropped, the drop is in this
194+
// flush batch.
195+
// The flusher still persists documents that are in this state but we
196+
// do not gather statistics about them - this is because the current
197+
// statistics will be wiped out by the flush (side effect of the drop
198+
// going through the KVStore).
199+
return;
210200
}
201+
202+
getStatsAndMaybeSetPersistedHighSeqno(cid.value(), seqno)
203+
.insert(isSystemEvent, isCommitted, isDelete, size);
211204
}
212205

213206
void Flush::updateStats(const DocKey& key,
@@ -218,20 +211,54 @@ void Flush::updateStats(const DocKey& key,
218211
uint64_t oldSeqno,
219212
bool oldIsDelete,
220213
size_t oldSize) {
221-
auto update = tryTolockAndSetPersistedSeqno(*this, key, seqno, isCommitted);
222-
if (update) {
223-
if (update->isLogicallyDeleted(oldSeqno) || oldIsDelete) {
224-
update->insert(isCommitted, isDelete, size);
225-
} else if (!oldIsDelete && isDelete) {
226-
update->remove(isCommitted, size - oldSize);
227-
} else {
228-
update->update(isCommitted, size - oldSize);
229-
}
214+
// Same logic and comment as updateStats above.
215+
auto [isSystemEvent, cid] = getCollectionID(key);
216+
if (!cid || isLogicallyDeleted(cid.value(), seqno)) {
217+
return;
218+
}
219+
220+
// Next update the delete state for the old item.
221+
// 1) Already deleted or
222+
// 2) the key's collection is dropped in this flush batch
223+
// 3) the key's collection is dropped in the snapshot we are writing to
224+
// For 2 and 3 we are switching live documents of dropped collections into
225+
// being deleted, thus any update flips to an insert
226+
oldIsDelete =
227+
oldIsDelete || (isLogicallyDeleted(cid.value(), oldSeqno) ||
228+
isLogicallyDeletedInStore(cid.value(), oldSeqno));
229+
230+
auto& stats = getStatsAndMaybeSetPersistedHighSeqno(cid.value(), seqno);
231+
232+
if (oldIsDelete) {
233+
stats.insert(isSystemEvent, isCommitted, isDelete, size);
234+
} else if (!oldIsDelete && isDelete) {
235+
stats.remove(isSystemEvent, isCommitted, size - oldSize);
236+
} else {
237+
stats.update(isSystemEvent, isCommitted, size - oldSize);
230238
}
231239
}
232240

233-
void Flush::setMutated(CollectionID cid) {
234-
mutated.insert(cid);
241+
void Flush::setDroppedCollectionsForStore(
242+
const std::vector<Collections::KVStore::DroppedCollection>& v) {
243+
for (const auto& c : v) {
244+
droppedInStore.emplace(c.collectionId, c);
245+
}
246+
}
247+
248+
bool Flush::isLogicallyDeleted(CollectionID cid, uint64_t seqno) const {
249+
auto itr = droppedCollections.find(cid);
250+
if (itr != droppedCollections.end()) {
251+
return seqno <= itr->second.endSeqno;
252+
}
253+
return false;
254+
}
255+
256+
bool Flush::isLogicallyDeletedInStore(CollectionID cid, uint64_t seqno) const {
257+
auto itr = droppedInStore.find(cid);
258+
if (itr != droppedInStore.end()) {
259+
return seqno <= itr->second.endSeqno;
260+
}
261+
return false;
235262
}
236263

237264
void Flush::recordSystemEvent(const Item& item) {

0 commit comments

Comments
 (0)