Skip to content

Commit 1ff902e

Browse files
committed
MB-27554: [BP] Move flushing code from KVBucket to EPBucket
Originally d440338 flushVBucket and related code is only applicable to EP buckets, not to Ephemeral buckets. As such, move all the flushing code from KVBucket to EPBucket class. Change-Id: Ie0c55e7ff8e67d8ef0a3276bdc20c727ae554b16 Reviewed-on: http://review.couchbase.org/88379 Well-Formed: Build Bot <[email protected]> Tested-by: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]>
1 parent 3ecac16 commit 1ff902e

19 files changed

+608
-536
lines changed

engines/ep/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,7 @@ ADD_LIBRARY(ep_objs OBJECT
200200
src/murmurhash3.cc
201201
src/mutation_log.cc
202202
src/mutation_log_entry.cc
203+
src/persistence_callback.cc
203204
src/pre_link_document_context.cc
204205
src/pre_link_document_context.h
205206
src/progress_tracker.cc

engines/ep/src/ep_bucket.cc

Lines changed: 303 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919

2020
#include "bgfetcher.h"
2121
#include "ep_engine.h"
22+
#include "ep_time.h"
2223
#include "ep_vb.h"
2324
#include "failover-table.h"
2425
#include "flusher.h"
26+
#include "persistence_callback.h"
2527
#include "replicationthrottle.h"
2628
#include "tasks.h"
2729

@@ -207,6 +209,253 @@ void EPBucket::reset() {
207209
vbMap.getShard(EP_PRIMARY_SHARD)->getFlusher()->notifyFlushEvent();
208210
}
209211

212+
int EPBucket::flushVBucket(uint16_t vbid) {
213+
KVShard *shard = vbMap.getShardByVbId(vbid);
214+
if (diskDeleteAll && !deleteAllTaskCtx.delay) {
215+
if (shard->getId() == EP_PRIMARY_SHARD) {
216+
flushOneDeleteAll();
217+
} else {
218+
// disk flush is pending just return
219+
return 0;
220+
}
221+
}
222+
223+
int items_flushed = 0;
224+
const hrtime_t flush_start = gethrtime();
225+
226+
VBucketPtr vb = vbMap.getBucket(vbid);
227+
if (vb) {
228+
std::unique_lock<std::mutex> lh(vb_mutexes[vbid], std::try_to_lock);
229+
if (!lh.owns_lock()) { // Try another bucket if this one is locked
230+
return RETRY_FLUSH_VBUCKET; // to avoid blocking flusher
231+
}
232+
233+
std::vector<queued_item> items;
234+
KVStore *rwUnderlying = getRWUnderlying(vbid);
235+
236+
while (!vb->rejectQueue.empty()) {
237+
items.push_back(vb->rejectQueue.front());
238+
vb->rejectQueue.pop();
239+
}
240+
241+
// Append any 'backfill' items (mutations added by a DCP stream).
242+
vb->getBackfillItems(items);
243+
244+
// Append all items outstanding for the persistence cursor.
245+
snapshot_range_t range;
246+
hrtime_t _begin_ = gethrtime();
247+
range = vb->checkpointManager.getAllItemsForCursor(
248+
CheckpointManager::pCursorName, items);
249+
stats.persistenceCursorGetItemsHisto.add((gethrtime() - _begin_) / 1000);
250+
251+
if (!items.empty()) {
252+
while (!rwUnderlying->begin()) {
253+
++stats.beginFailed;
254+
LOG(EXTENSION_LOG_WARNING, "Failed to start a transaction!!! "
255+
"Retry in 1 sec ...");
256+
sleep(1);
257+
}
258+
rwUnderlying->optimizeWrites(items);
259+
260+
Item *prev = NULL;
261+
auto vbstate = vb->getVBucketState();
262+
uint64_t maxSeqno = 0;
263+
range.start = std::max(range.start, vbstate.lastSnapStart);
264+
265+
bool mustCheckpointVBState = false;
266+
std::list<PersistenceCallback*>& pcbs = rwUnderlying->getPersistenceCbList();
267+
268+
SystemEventFlush sef;
269+
270+
for (const auto& item : items) {
271+
272+
if (!item->shouldPersist()) {
273+
continue;
274+
}
275+
276+
// Pass the Item through the SystemEventFlush which may filter
277+
// the item away (return Skip).
278+
if (sef.process(item) == ProcessStatus::Skip) {
279+
// The item has no further flushing actions i.e. we've
280+
// absorbed it in the process function.
281+
// Update stats and carry-on
282+
--stats.diskQueueSize;
283+
vb->doStatsForFlushing(*item, item->size());
284+
continue;
285+
}
286+
287+
if (item->getOperation() == queue_op::set_vbucket_state) {
288+
// No actual item explicitly persisted to (this op exists
289+
// to ensure a commit occurs with the current vbstate);
290+
// flag that we must trigger a snapshot even if there are
291+
// no 'real' items in the checkpoint.
292+
mustCheckpointVBState = true;
293+
294+
// Update queuing stats how this item has logically been
295+
// processed.
296+
--stats.diskQueueSize;
297+
vb->doStatsForFlushing(*item, item->size());
298+
299+
} else if (!prev || prev->getKey() != item->getKey()) {
300+
prev = item.get();
301+
++items_flushed;
302+
PersistenceCallback *cb = flushOneDelOrSet(item, vb);
303+
if (cb) {
304+
pcbs.push_back(cb);
305+
}
306+
307+
maxSeqno = std::max(maxSeqno, (uint64_t)item->getBySeqno());
308+
vbstate.maxCas = std::max(vbstate.maxCas, item->getCas());
309+
if (item->isDeleted()) {
310+
vbstate.maxDeletedSeqno =
311+
std::max(vbstate.maxDeletedSeqno,
312+
item->getRevSeqno());
313+
}
314+
++stats.flusher_todo;
315+
316+
} else {
317+
// Item is the same key as the previous[1] one - don't need
318+
// to flush to disk.
319+
// [1] Previous here really means 'next' - optimizeWrites()
320+
// above has actually re-ordered items such that items
321+
// with the same key are ordered from high->low seqno.
322+
// This means we only write the highest (i.e. newest)
323+
// item for a given key, and discard any duplicate,
324+
// older items.
325+
--stats.diskQueueSize;
326+
vb->doStatsForFlushing(*item, item->size());
327+
}
328+
}
329+
330+
331+
{
332+
ReaderLockHolder rlh(vb->getStateLock());
333+
if (vb->getState() == vbucket_state_active) {
334+
if (maxSeqno) {
335+
range.start = maxSeqno;
336+
range.end = maxSeqno;
337+
}
338+
}
339+
340+
// Update VBstate based on the changes we have just made,
341+
// then tell the rwUnderlying the 'new' state
342+
// (which will persisted as part of the commit() below).
343+
vbstate.lastSnapStart = range.start;
344+
vbstate.lastSnapEnd = range.end;
345+
346+
// Track the lowest seqno written in spock and record it as
347+
// the HLC epoch, a seqno which we can be sure the value has a
348+
// HLC CAS.
349+
vbstate.hlcCasEpochSeqno = vb->getHLCEpochSeqno();
350+
if (vbstate.hlcCasEpochSeqno == HlcCasSeqnoUninitialised) {
351+
vbstate.hlcCasEpochSeqno = range.start;
352+
vb->setHLCEpochSeqno(range.start);
353+
}
354+
355+
// Track if the VB has xattrs present
356+
vbstate.mightContainXattrs = vb->mightContainXattrs();
357+
358+
// Do we need to trigger a persist of the state?
359+
// If there are no "real" items to flush, and we encountered
360+
// a set_vbucket_state meta-item.
361+
auto options = VBStatePersist::VBSTATE_CACHE_UPDATE_ONLY;
362+
if ((items_flushed == 0) && mustCheckpointVBState) {
363+
options = VBStatePersist::VBSTATE_PERSIST_WITH_COMMIT;
364+
}
365+
366+
if (rwUnderlying->snapshotVBucket(vb->getId(), vbstate,
367+
options) != true) {
368+
return RETRY_FLUSH_VBUCKET;
369+
}
370+
371+
if (vb->setBucketCreation(false)) {
372+
LOG(EXTENSION_LOG_INFO, "VBucket %" PRIu16 " created", vbid);
373+
}
374+
}
375+
376+
/* Perform an explicit commit to disk if the commit
377+
* interval reaches zero and if there is a non-zero number
378+
* of items to flush.
379+
* Or if there is a manifest item
380+
*/
381+
if (items_flushed > 0 || sef.getCollectionsManifestItem()) {
382+
commit(*rwUnderlying, sef.getCollectionsManifestItem());
383+
384+
// Now the commit is complete, vBucket file must exist.
385+
if (vb->setBucketCreation(false)) {
386+
LOG(EXTENSION_LOG_INFO, "VBucket %" PRIu16 " created", vbid);
387+
}
388+
}
389+
390+
hrtime_t flush_end = gethrtime();
391+
uint64_t trans_time = (flush_end - flush_start) / 1000000;
392+
393+
lastTransTimePerItem.store((items_flushed == 0) ? 0 :
394+
static_cast<double>(trans_time) /
395+
static_cast<double>(items_flushed));
396+
stats.cumulativeFlushTime.fetch_add(trans_time);
397+
stats.flusher_todo.store(0);
398+
stats.totalPersistVBState++;
399+
400+
if (vb->rejectQueue.empty()) {
401+
vb->setPersistedSnapshot(range.start, range.end);
402+
uint64_t highSeqno = rwUnderlying->getLastPersistedSeqno(vbid);
403+
if (highSeqno > 0 &&
404+
highSeqno != vb->getPersistenceSeqno()) {
405+
vb->setPersistenceSeqno(highSeqno);
406+
}
407+
}
408+
}
409+
410+
rwUnderlying->pendingTasks();
411+
412+
if (vb->checkpointManager.getNumCheckpoints() > 1) {
413+
wakeUpCheckpointRemover();
414+
}
415+
416+
if (vb->rejectQueue.empty()) {
417+
vb->checkpointManager.itemsPersisted();
418+
uint64_t seqno = vb->getPersistenceSeqno();
419+
uint64_t chkid = vb->checkpointManager.getPersistenceCursorPreChkId();
420+
vb->notifyHighPriorityRequests(
421+
engine, seqno, HighPriorityVBNotify::Seqno);
422+
vb->notifyHighPriorityRequests(
423+
engine, chkid, HighPriorityVBNotify::ChkPersistence);
424+
if (chkid > 0 && chkid != vb->getPersistenceCheckpointId()) {
425+
vb->setPersistenceCheckpointId(chkid);
426+
}
427+
} else {
428+
return RETRY_FLUSH_VBUCKET;
429+
}
430+
}
431+
432+
return items_flushed;
433+
}
434+
435+
void EPBucket::commit(KVStore& kvstore, const Item* collectionsManifest) {
436+
std::list<PersistenceCallback*>& pcbs = kvstore.getPersistenceCbList();
437+
BlockTimer timer(&stats.diskCommitHisto, "disk_commit", stats.timingLog);
438+
hrtime_t commit_start = gethrtime();
439+
440+
while (!kvstore.commit(collectionsManifest)) {
441+
++stats.commitFailed;
442+
LOG(EXTENSION_LOG_WARNING,
443+
"KVBucket::commit: kvstore.commit failed!!! Retry in 1 sec...");
444+
sleep(1);
445+
}
446+
447+
while (!pcbs.empty()) {
448+
delete pcbs.front();
449+
pcbs.pop_front();
450+
}
451+
452+
++stats.flusherCommits;
453+
hrtime_t commit_end = gethrtime();
454+
uint64_t commit_time = (commit_end - commit_start) / 1000000;
455+
stats.commit_time.store(commit_time);
456+
stats.cumulativeCommitTime.fetch_add(commit_time);
457+
}
458+
210459
void EPBucket::startFlusher() {
211460
for (const auto& shard : vbMap.shards) {
212461
shard->getFlusher()->start();
@@ -344,6 +593,60 @@ ENGINE_ERROR_CODE EPBucket::scheduleCompaction(uint16_t vbid,
344593
return ENGINE_EWOULDBLOCK;
345594
}
346595

596+
void EPBucket::flushOneDeleteAll() {
597+
for (VBucketMap::id_type i = 0; i < vbMap.getSize(); ++i) {
598+
VBucketPtr vb = getVBucket(i);
599+
// Reset the vBucket if it's non-null and not already in the middle of
600+
// being created / destroyed.
601+
if (vb && !(vb->isBucketCreation() || vb->isDeletionDeferred())) {
602+
LockHolder lh(vb_mutexes[vb->getId()]);
603+
getRWUnderlying(vb->getId())->reset(i);
604+
}
605+
}
606+
607+
--stats.diskQueueSize;
608+
setDeleteAllComplete();
609+
}
610+
611+
PersistenceCallback* EPBucket::flushOneDelOrSet(const queued_item &qi,
612+
VBucketPtr &vb) {
613+
614+
if (!vb) {
615+
--stats.diskQueueSize;
616+
return NULL;
617+
}
618+
619+
int64_t bySeqno = qi->getBySeqno();
620+
rel_time_t queued(qi->getQueuedTime());
621+
622+
int dirtyAge = ep_current_time() - queued;
623+
stats.dirtyAgeHisto.add(dirtyAge * 1000000);
624+
stats.dirtyAge.store(dirtyAge);
625+
stats.dirtyAgeHighWat.store(std::max(stats.dirtyAge.load(),
626+
stats.dirtyAgeHighWat.load()));
627+
628+
KVStore *rwUnderlying = getRWUnderlying(qi->getVBucketId());
629+
if (SystemEventFlush::isUpsert(*qi)) {
630+
// TODO: Need to separate disk_insert from disk_update because
631+
// bySeqno doesn't give us that information.
632+
BlockTimer timer(bySeqno == -1 ?
633+
&stats.diskInsertHisto : &stats.diskUpdateHisto,
634+
bySeqno == -1 ? "disk_insert" : "disk_update",
635+
stats.timingLog);
636+
PersistenceCallback *cb =
637+
new PersistenceCallback(qi, vb, stats, qi->getCas());
638+
rwUnderlying->set(*qi, *cb);
639+
return cb;
640+
} else {
641+
BlockTimer timer(&stats.diskDelHisto, "disk_delete",
642+
stats.timingLog);
643+
PersistenceCallback *cb =
644+
new PersistenceCallback(qi, vb, stats, 0);
645+
rwUnderlying->del(*qi, *cb);
646+
return cb;
647+
}
648+
}
649+
347650
void EPBucket::compactInternal(compaction_ctx* ctx) {
348651
BloomFilterCBPtr filter(new BloomFilterCallback(*this));
349652
ctx->bloomFilterCallback = filter;

engines/ep/src/ep_bucket.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,15 @@ class EPBucket : public KVBucket {
3838

3939
void reset() override;
4040

41+
/**
42+
* Flushes all items waiting for persistence in a given vbucket
43+
* @param vbid The id of the vbucket to flush
44+
* @return The number of items flushed
45+
*/
46+
int flushVBucket(uint16_t vbid);
47+
48+
void commit(KVStore& kvstore, const Item* collectionsManifest);
49+
4150
/// Start the Flusher for all shards in this bucket.
4251
void startFlusher();
4352

@@ -124,6 +133,11 @@ class EPBucket : public KVBucket {
124133
}
125134

126135
protected:
136+
void flushOneDeleteAll();
137+
138+
PersistenceCallback* flushOneDelOrSet(const queued_item& qi,
139+
VBucketPtr& vb);
140+
127141
/**
128142
* Compaction of a database file
129143
*

engines/ep/src/flusher.cc

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,25 @@
1818
#include "flusher.h"
1919

2020
#include "common.h"
21+
#include "ep_bucket.h"
2122
#include "tasks.h"
2223

23-
#include <stdlib.h>
24+
#include <platform/timeutils.h>
2425

26+
#include <stdlib.h>
2527
#include <sstream>
2628

29+
Flusher::Flusher(EPBucket* st, KVShard* k)
30+
: store(st),
31+
_state(State::Initializing),
32+
taskId(0),
33+
minSleepTime(0.1),
34+
forceShutdownReceived(false),
35+
doHighPriority(false),
36+
numHighPriority(0),
37+
pendingMutation(false),
38+
shard(k) {
39+
}
2740

2841
bool Flusher::stop(bool isForceShutdown) {
2942
forceShutdownReceived = isForceShutdown;

0 commit comments

Comments
 (0)