Skip to content

Commit 5d6f17a

Browse files
committed
Reduce coupling between vbucket.h and other classes
Move a number of VBucket inline methods to vbucket.cc; so clients of vbucket don't need to know the details of the impementation (and hence include those header files). Do a similar thing for KVBucket and EPEngine. Note: This is mostly a preparation patch; subsequent patches will break the dependancy between vbucket.h and checkpoint.h. Change-Id: I1080d5a51666c8bb82791c417504a5a830724f2f Reviewed-on: http://review.couchbase.org/82693 Reviewed-by: James Harrison <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 6b453e9 commit 5d6f17a

File tree

10 files changed

+134
-105
lines changed

10 files changed

+134
-105
lines changed

engines/ep/src/access_scanner.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include "hash_table.h"
2828
#include "kv_bucket.h"
2929
#include "mutation_log.h"
30+
#include "stats.h"
3031
#include "vb_count_visitor.h"
3132

3233
#include <numeric>

engines/ep/src/checkpoint.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1858,7 +1858,7 @@ void CheckpointManager::itemsPersisted() {
18581858
}
18591859
}
18601860

1861-
size_t CheckpointManager::getMemoryUsage_UNLOCKED() {
1861+
size_t CheckpointManager::getMemoryUsage_UNLOCKED() const {
18621862
if (checkpointList.empty()) {
18631863
return 0;
18641864
}
@@ -1870,12 +1870,12 @@ size_t CheckpointManager::getMemoryUsage_UNLOCKED() {
18701870
return memUsage;
18711871
}
18721872

1873-
size_t CheckpointManager::getMemoryUsage() {
1873+
size_t CheckpointManager::getMemoryUsage() const {
18741874
LockHolder lh(queueLock);
18751875
return getMemoryUsage_UNLOCKED();
18761876
}
18771877

1878-
size_t CheckpointManager::getMemoryUsageOfUnrefCheckpoints() {
1878+
size_t CheckpointManager::getMemoryUsageOfUnrefCheckpoints() const {
18791879
LockHolder lh(queueLock);
18801880

18811881
if (checkpointList.empty()) {

engines/ep/src/checkpoint.h

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,6 @@ struct index_entry {
7171
int64_t mutation_id;
7272
};
7373

74-
typedef struct {
75-
uint64_t start;
76-
snapshot_range_t range;
77-
} snapshot_info_t;
78-
7974
/**
8075
* Flag indicating that we must send checkpoint end meta item for the cursor
8176
*/
@@ -551,7 +546,7 @@ class Checkpoint {
551546
* Returns the memory held by all the queued items which includes
552547
* key, metadata and the blob.
553548
*/
554-
size_t getMemConsumption() {
549+
size_t getMemConsumption() const {
555550
return effectiveMemUsage;
556551
}
557552

@@ -795,14 +790,14 @@ class CheckpointManager {
795790
/**
796791
* Return memory consumption of all the checkpoints managed
797792
*/
798-
size_t getMemoryUsage_UNLOCKED();
793+
size_t getMemoryUsage_UNLOCKED() const;
799794

800-
size_t getMemoryUsage();
795+
size_t getMemoryUsage() const;
801796

802797
/**
803798
* Return memory consumption of unreferenced checkpoints
804799
*/
805-
size_t getMemoryUsageOfUnrefCheckpoints();
800+
size_t getMemoryUsageOfUnrefCheckpoints() const;
806801

807802
/**
808803
* Function returns a list of cursors to drop so as to unreference

engines/ep/src/ep_engine.cc

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2004,6 +2004,10 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::itemAllocate(
20042004
}
20052005
}
20062006

2007+
void EventuallyPersistentEngine::itemRelease(const void* cookie, item* itm) {
2008+
delete reinterpret_cast<Item*>(itm);
2009+
}
2010+
20072011
ENGINE_ERROR_CODE EventuallyPersistentEngine::flush(const void *cookie){
20082012
if (!deleteAllEnabled) {
20092013
return ENGINE_ENOTSUP;
@@ -3418,6 +3422,38 @@ void EventuallyPersistentEngine::addSeqnoVbStats(const void *cookie,
34183422
}
34193423
}
34203424

3425+
void EventuallyPersistentEngine::addLookupResult(const void* cookie,
3426+
std::unique_ptr<Item> result) {
3427+
LockHolder lh(lookupMutex);
3428+
auto it = lookups.find(cookie);
3429+
if (it != lookups.end()) {
3430+
if (it->second != NULL) {
3431+
LOG(EXTENSION_LOG_DEBUG,
3432+
"Cleaning up old lookup result for '%s'",
3433+
it->second->getKey().data());
3434+
} else {
3435+
LOG(EXTENSION_LOG_DEBUG, "Cleaning up old null lookup result");
3436+
}
3437+
lookups.erase(it);
3438+
}
3439+
lookups[cookie] = std::move(result);
3440+
}
3441+
3442+
bool EventuallyPersistentEngine::fetchLookupResult(const void* cookie,
3443+
std::unique_ptr<Item>& itm) {
3444+
// This will return *and erase* the lookup result for a connection.
3445+
// You look it up, you own it.
3446+
LockHolder lh(lookupMutex);
3447+
auto it = lookups.find(cookie);
3448+
if (it != lookups.end()) {
3449+
itm = std::move(it->second);
3450+
lookups.erase(it);
3451+
return true;
3452+
} else {
3453+
return false;
3454+
}
3455+
}
3456+
34213457
ENGINE_ERROR_CODE EventuallyPersistentEngine::doSeqnoStats(const void *cookie,
34223458
ADD_STAT add_stat,
34233459
const char* stat_key,

engines/ep/src/ep_engine.h

Lines changed: 5 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "connhandler.h"
2424
#include "kv_bucket.h"
2525
#include "storeddockey.h"
26+
#include "stats.h"
2627
#include "taskable.h"
2728
#include "vb_visitors.h"
2829

@@ -142,12 +143,7 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
142143
return ret;
143144
}
144145

145-
146-
void itemRelease(const void* cookie, item *itm)
147-
{
148-
(void)cookie;
149-
delete (Item*)itm;
150-
}
146+
void itemRelease(const void* cookie, item *itm);
151147

152148
ENGINE_ERROR_CODE get(const void* cookie,
153149
item** itm,
@@ -706,35 +702,9 @@ class EventuallyPersistentEngine : public ENGINE_HANDLE_V1 {
706702
void addSeqnoVbStats(const void *cookie, ADD_STAT add_stat,
707703
const VBucketPtr &vb);
708704

709-
void addLookupResult(const void* cookie, std::unique_ptr<Item> result) {
710-
LockHolder lh(lookupMutex);
711-
auto it = lookups.find(cookie);
712-
if (it != lookups.end()) {
713-
if (it->second != NULL) {
714-
LOG(EXTENSION_LOG_DEBUG,
715-
"Cleaning up old lookup result for '%s'",
716-
it->second->getKey().data());
717-
} else {
718-
LOG(EXTENSION_LOG_DEBUG, "Cleaning up old null lookup result");
719-
}
720-
lookups.erase(it);
721-
}
722-
lookups[cookie] = std::move(result);
723-
}
724-
725-
bool fetchLookupResult(const void* cookie, std::unique_ptr<Item>& itm) {
726-
// This will return *and erase* the lookup result for a connection.
727-
// You look it up, you own it.
728-
LockHolder lh(lookupMutex);
729-
auto it = lookups.find(cookie);
730-
if (it != lookups.end()) {
731-
itm = std::move(it->second);
732-
lookups.erase(it);
733-
return true;
734-
} else {
735-
return false;
736-
}
737-
}
705+
void addLookupResult(const void* cookie, std::unique_ptr<Item> result);
706+
707+
bool fetchLookupResult(const void* cookie, std::unique_ptr<Item>& itm);
738708

739709
// Initialize all required callbacks of this engine with the underlying
740710
// server.

engines/ep/src/ep_types.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ typedef struct {
5151
uint64_t end;
5252
} snapshot_range_t;
5353

54+
typedef struct {
55+
uint64_t start;
56+
snapshot_range_t range;
57+
} snapshot_info_t;
58+
5459
/**
5560
* The following options can be specified
5661
* for retrieving an item for get calls

engines/ep/src/kv_bucket.cc

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,21 @@ bool KVBucket::isMetaDataResident(VBucketPtr &vb, const DocKey& key) {
501501
}
502502
}
503503

504+
void KVBucket::logQTime(TaskId taskType, const ProcessClock::duration enqTime) {
505+
const auto ns_count =
506+
std::chrono::duration_cast<std::chrono::microseconds>(enqTime)
507+
.count();
508+
stats.schedulingHisto[static_cast<int>(taskType)].add(ns_count);
509+
}
510+
511+
void KVBucket::logRunTime(TaskId taskType,
512+
const ProcessClock::duration runTime) {
513+
const auto ns_count =
514+
std::chrono::duration_cast<std::chrono::microseconds>(runTime)
515+
.count();
516+
stats.taskRuntimeHisto[static_cast<int>(taskType)].add(ns_count);
517+
}
518+
504519
ENGINE_ERROR_CODE KVBucket::set(Item& itm,
505520
const void* cookie,
506521
cb::StoreIfPredicate predicate) {
@@ -2637,6 +2652,24 @@ void KVBucket::runVbStatePersistTask(int vbid) {
26372652
scheduleVBStatePersist(vbid);
26382653
}
26392654

2655+
bool KVBucket::compactionCanExpireItems() {
2656+
// Process expired items only if memory usage is lesser than
2657+
// compaction_exp_mem_threshold and disk queue is small
2658+
// enough (marked by replication_throttle_queue_cap)
2659+
2660+
bool isMemoryUsageOk =
2661+
(stats.getTotalMemoryUsed() <
2662+
(stats.getMaxDataSize() * compactionExpMemThreshold));
2663+
2664+
size_t queueSize = stats.diskQueueSize.load();
2665+
bool isQueueSizeOk =
2666+
((stats.replicationThrottleWriteQueueCap == -1) ||
2667+
(queueSize <
2668+
static_cast<size_t>(stats.replicationThrottleWriteQueueCap)));
2669+
2670+
return (isMemoryUsageOk && isQueueSizeOk);
2671+
}
2672+
26402673
void KVBucket::setCursorDroppingLowerUpperThresholds(size_t maxSize) {
26412674
Configuration &config = engine.getConfiguration();
26422675
stats.cursorDroppingLThreshold.store(static_cast<size_t>(maxSize *

engines/ep/src/kv_bucket.h

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -589,17 +589,9 @@ class KVBucket : public KVBucketIface {
589589

590590
bool isMetaDataResident(VBucketPtr &vb, const DocKey& key);
591591

592-
void logQTime(TaskId taskType, const ProcessClock::duration enqTime) {
593-
const auto ns_count = std::chrono::duration_cast
594-
<std::chrono::microseconds>(enqTime).count();
595-
stats.schedulingHisto[static_cast<int>(taskType)].add(ns_count);
596-
}
592+
void logQTime(TaskId taskType, const ProcessClock::duration enqTime);
597593

598-
void logRunTime(TaskId taskType, const ProcessClock::duration runTime) {
599-
const auto ns_count = std::chrono::duration_cast
600-
<std::chrono::microseconds>(runTime).count();
601-
stats.taskRuntimeHisto[static_cast<int>(taskType)].add(ns_count);
602-
}
594+
void logRunTime(TaskId taskType, const ProcessClock::duration runTime);
603595

604596
bool multiBGFetchEnabled() {
605597
StorageProperties storeProp = getStorageProperties();
@@ -707,20 +699,7 @@ class KVBucket : public KVBucketIface {
707699
compactionExpMemThreshold = static_cast<double>(to) / 100.0;
708700
}
709701

710-
bool compactionCanExpireItems() {
711-
// Process expired items only if memory usage is lesser than
712-
// compaction_exp_mem_threshold and disk queue is small
713-
// enough (marked by replication_throttle_queue_cap)
714-
715-
bool isMemoryUsageOk = (stats.getTotalMemoryUsed() <
716-
(stats.getMaxDataSize() * compactionExpMemThreshold));
717-
718-
size_t queueSize = stats.diskQueueSize.load();
719-
bool isQueueSizeOk = ((stats.replicationThrottleWriteQueueCap == -1) ||
720-
(queueSize < static_cast<size_t>(stats.replicationThrottleWriteQueueCap)));
721-
722-
return (isMemoryUsageOk && isQueueSizeOk);
723-
}
702+
bool compactionCanExpireItems();
724703

725704
void setCursorDroppingLowerUpperThresholds(size_t maxSize);
726705

engines/ep/src/vbucket.cc

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,18 @@ VBucket::~VBucket() {
247247
LOG(EXTENSION_LOG_INFO, "Destroying vbucket %d\n", id);
248248
}
249249

250+
int64_t VBucket::getHighSeqno() const {
251+
return checkpointManager.getHighSeqno();
252+
}
253+
254+
size_t VBucket::getChkMgrMemUsage() const {
255+
return checkpointManager.getMemoryUsage();
256+
}
257+
258+
size_t VBucket::getChkMgrMemUsageOfUnrefCheckpoints() const {
259+
return checkpointManager.getMemoryUsageOfUnrefCheckpoints();
260+
}
261+
250262
void VBucket::fireAllOps(EventuallyPersistentEngine &engine,
251263
ENGINE_ERROR_CODE code) {
252264
std::unique_lock<std::mutex> lh(pendingOpLock);
@@ -292,6 +304,17 @@ void VBucket::fireAllOps(EventuallyPersistentEngine &engine) {
292304
}
293305
}
294306

307+
void VBucket::getBackfillItems(std::vector<queued_item>& items) {
308+
LockHolder lh(backfill.mutex);
309+
size_t num_items = backfill.items.size();
310+
while (!backfill.items.empty()) {
311+
items.push_back(backfill.items.front());
312+
backfill.items.pop();
313+
}
314+
stats.vbBackfillQueueSize.fetch_sub(num_items);
315+
stats.memOverhead->fetch_sub(num_items * sizeof(queued_item));
316+
}
317+
295318
void VBucket::setState(vbucket_state_t to) {
296319
WriterLockHolder wlh(stateLock);
297320
setState_UNLOCKED(to, wlh);
@@ -431,6 +454,22 @@ void VBucket::handlePreExpiry(StoredValue& v) {
431454
}
432455
}
433456

457+
bool VBucket::addPendingOp(const void* cookie) {
458+
LockHolder lh(pendingOpLock);
459+
if (state != vbucket_state_pending) {
460+
// State transitioned while we were waiting.
461+
return false;
462+
}
463+
// Start a timer when enqueuing the first client.
464+
if (pendingOps.empty()) {
465+
pendingOpsStart = gethrtime();
466+
}
467+
pendingOps.push_back(cookie);
468+
++stats.pendingOps;
469+
++stats.pendingOpsTotal;
470+
return true;
471+
}
472+
434473
size_t VBucket::getNumNonResidentItems() const {
435474
if (eviction == VALUE_ONLY) {
436475
return ht.getNumInMemoryNonResItems();

0 commit comments

Comments
 (0)