Skip to content

Commit 5e768d6

Browse files
committed
MB-29928: Refactor DefragmenterTask::run
Move the code which is executed when 'enabled' to its own function. Change-Id: I7c651d8ce82446c62858b9dcde7d4b9aa5aa4c9d Reviewed-on: http://review.couchbase.org/c/kv_engine/+/155178 Tested-by: Jim Walker <[email protected]> Reviewed-by: Trond Norbye <[email protected]>
1 parent 6b9b7d5 commit 5e768d6

File tree

2 files changed

+95
-97
lines changed

2 files changed

+95
-97
lines changed

engines/ep/src/defragmenter.cc

Lines changed: 93 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -31,112 +31,108 @@ DefragmenterTask::DefragmenterTask(EventuallyPersistentEngine* e,
3131
bool DefragmenterTask::run() {
3232
TRACE_EVENT0("ep-engine/task", "DefragmenterTask");
3333
if (engine->getConfiguration().isDefragmenterEnabled()) {
34-
// Get our pause/resume visitor. If we didn't finish the previous pass,
35-
// then resume from where we last were, otherwise create a new visitor
36-
// starting from the beginning.
37-
if (!prAdapter) {
38-
auto visitor =
39-
std::make_unique<DefragmentVisitor>(getMaxValueSize());
40-
41-
prAdapter =
42-
std::make_unique<PauseResumeVBAdapter>(std::move(visitor));
43-
epstore_position = engine->getKVBucket()->startPosition();
44-
}
34+
defrag();
35+
}
36+
snooze(getSleepTime());
37+
if (engine->getEpStats().isShutdown) {
38+
return false;
39+
}
40+
return true;
41+
}
4542

46-
// Print start status.
47-
if (getGlobalBucketLogger()->should_log(spdlog::level::debug)) {
48-
std::stringstream ss;
49-
ss << getDescription() << " for bucket '" << engine->getName()
50-
<< "'";
51-
if (epstore_position == engine->getKVBucket()->startPosition()) {
52-
ss << " starting. ";
53-
} else {
54-
ss << " resuming from " << epstore_position << ", ";
55-
ss << prAdapter->getHashtablePosition() << ".";
56-
}
57-
auto fragStats = cb::ArenaMalloc::getFragmentationStats(
58-
engine->getArenaMallocClient());
59-
ss << " Using chunk_duration=" << getChunkDuration().count()
60-
<< " ms."
61-
<< " mem_used=" << stats.getEstimatedTotalMemoryUsed() << ", "
62-
<< fragStats;
63-
EP_LOG_DEBUG("{}", ss.str());
64-
}
43+
void DefragmenterTask::defrag() {
44+
// Get our pause/resume visitor. If we didn't finish the previous pass,
45+
// then resume from where we last were, otherwise create a new visitor
46+
// starting from the beginning.
47+
if (!prAdapter) {
48+
auto visitor = std::make_unique<DefragmentVisitor>(getMaxValueSize());
6549

66-
// Disable thread-caching (as we are about to defragment, and hence don't
67-
// want any of the new Blobs in tcache).
68-
cb::ArenaMalloc::switchToClient(engine->getArenaMallocClient(),
69-
false /* no tcache*/);
70-
71-
// Prepare the underlying visitor.
72-
auto& visitor = getDefragVisitor();
73-
const auto start = std::chrono::steady_clock::now();
74-
const auto deadline = start + getChunkDuration();
75-
visitor.setDeadline(deadline);
76-
visitor.setBlobAgeThreshold(getAgeThreshold());
77-
// Only defragment StoredValues of persistent buckets because the
78-
// HashTable defrag method doesn't yet know how to maintain the
79-
// ephemeral seqno linked-list
80-
if (engine->getConfiguration().getBucketType() == "persistent") {
81-
visitor.setStoredValueAgeThreshold(getStoredValueAgeThreshold());
82-
}
83-
visitor.clearStats();
84-
85-
// Do it - set off the visitor.
86-
epstore_position = engine->getKVBucket()->pauseResumeVisit(
87-
*prAdapter, epstore_position);
88-
const auto end = std::chrono::steady_clock::now();
89-
90-
// Defrag complete. Restore thread caching.
91-
cb::ArenaMalloc::switchToClient(engine->getArenaMallocClient(),
92-
true /* tcache*/);
93-
94-
updateStats(visitor);
95-
96-
// Release any free memory we now have in the allocator back to the OS.
97-
// TODO: Benchmark this - is it necessary? How much of a slowdown does it
98-
// add? How much memory does it return?
99-
cb::ArenaMalloc::releaseMemory(engine->getArenaMallocClient());
100-
101-
// Check if the visitor completed a full pass.
102-
bool completed = (epstore_position ==
103-
engine->getKVBucket()->endPosition());
104-
105-
// Print status.
106-
if (getGlobalBucketLogger()->should_log(spdlog::level::debug)) {
107-
std::stringstream ss;
108-
ss << getDescription() << " for bucket '" << engine->getName()
109-
<< "'";
110-
if (completed) {
111-
ss << " finished.";
112-
} else {
113-
ss << " paused at position " << epstore_position << ".";
114-
}
115-
std::chrono::microseconds duration =
116-
std::chrono::duration_cast<std::chrono::microseconds>(
117-
end - start);
118-
auto fragStats = cb::ArenaMalloc::getFragmentationStats(
119-
engine->getArenaMallocClient());
120-
ss << " Took " << duration.count() << " us."
121-
<< " moved " << visitor.getDefragCount() << "/"
122-
<< visitor.getVisitedCount() << " visited documents."
123-
<< " mem_used=" << stats.getEstimatedTotalMemoryUsed() << ", "
124-
<< fragStats << ". Sleeping for " << getSleepTime()
125-
<< " seconds.";
126-
EP_LOG_DEBUG("{}", ss.str());
50+
prAdapter = std::make_unique<PauseResumeVBAdapter>(std::move(visitor));
51+
epstore_position = engine->getKVBucket()->startPosition();
52+
}
53+
54+
// Print start status.
55+
if (getGlobalBucketLogger()->should_log(spdlog::level::debug)) {
56+
std::stringstream ss;
57+
ss << getDescription() << " for bucket '" << engine->getName() << "'";
58+
if (epstore_position == engine->getKVBucket()->startPosition()) {
59+
ss << " starting. ";
60+
} else {
61+
ss << " resuming from " << epstore_position << ", ";
62+
ss << prAdapter->getHashtablePosition() << ".";
12763
}
64+
auto fragStats = cb::ArenaMalloc::getFragmentationStats(
65+
engine->getArenaMallocClient());
66+
ss << " Using chunk_duration=" << getChunkDuration().count() << " ms."
67+
<< " mem_used=" << stats.getEstimatedTotalMemoryUsed() << ", "
68+
<< fragStats;
69+
EP_LOG_DEBUG("{}", ss.str());
70+
}
71+
72+
// Disable thread-caching (as we are about to defragment, and hence don't
73+
// want any of the new Blobs in tcache).
74+
cb::ArenaMalloc::switchToClient(engine->getArenaMallocClient(),
75+
false /* no tcache*/);
76+
77+
// Prepare the underlying visitor.
78+
auto& visitor = getDefragVisitor();
79+
const auto start = std::chrono::steady_clock::now();
80+
const auto deadline = start + getChunkDuration();
81+
visitor.setDeadline(deadline);
82+
visitor.setBlobAgeThreshold(getAgeThreshold());
83+
// Only defragment StoredValues of persistent buckets because the
84+
// HashTable defrag method doesn't yet know how to maintain the
85+
// ephemeral seqno linked-list
86+
if (engine->getConfiguration().getBucketType() == "persistent") {
87+
visitor.setStoredValueAgeThreshold(getStoredValueAgeThreshold());
88+
}
89+
visitor.clearStats();
90+
91+
// Do it - set off the visitor.
92+
epstore_position = engine->getKVBucket()->pauseResumeVisit(
93+
*prAdapter, epstore_position);
94+
const auto end = std::chrono::steady_clock::now();
95+
96+
// Defrag complete. Restore thread caching.
97+
cb::ArenaMalloc::switchToClient(engine->getArenaMallocClient(),
98+
true /* tcache*/);
99+
100+
updateStats(visitor);
128101

129-
// Delete(reset) visitor if it finished.
102+
// Release any free memory we now have in the allocator back to the OS.
103+
// TODO: Benchmark this - is it necessary? How much of a slowdown does it
104+
// add? How much memory does it return?
105+
cb::ArenaMalloc::releaseMemory(engine->getArenaMallocClient());
106+
107+
// Check if the visitor completed a full pass.
108+
bool completed = (epstore_position == engine->getKVBucket()->endPosition());
109+
110+
// Print status.
111+
if (getGlobalBucketLogger()->should_log(spdlog::level::debug)) {
112+
std::stringstream ss;
113+
ss << getDescription() << " for bucket '" << engine->getName() << "'";
130114
if (completed) {
131-
prAdapter.reset();
115+
ss << " finished.";
116+
} else {
117+
ss << " paused at position " << epstore_position << ".";
132118
}
119+
std::chrono::microseconds duration =
120+
std::chrono::duration_cast<std::chrono::microseconds>(end -
121+
start);
122+
auto fragStats = cb::ArenaMalloc::getFragmentationStats(
123+
engine->getArenaMallocClient());
124+
ss << " Took " << duration.count() << " us."
125+
<< " moved " << visitor.getDefragCount() << "/"
126+
<< visitor.getVisitedCount() << " visited documents."
127+
<< " mem_used=" << stats.getEstimatedTotalMemoryUsed() << ", "
128+
<< fragStats << ". Sleeping for " << getSleepTime() << " seconds.";
129+
EP_LOG_DEBUG("{}", ss.str());
133130
}
134131

135-
snooze(getSleepTime());
136-
if (engine->getEpStats().isShutdown) {
137-
return false;
132+
// Delete(reset) visitor if it finished.
133+
if (completed) {
134+
prAdapter.reset();
138135
}
139-
return true;
140136
}
141137

142138
void DefragmenterTask::stop() {

engines/ep/src/defragmenter.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ class DefragmenterTask : public GlobalTask {
105105
static size_t getMaxValueSize();
106106

107107
private:
108+
/// Main function called from run when defragmenter is enabled
109+
void defrag();
108110

109111
/// Duration (in seconds) defragmenter should sleep for between iterations.
110112
double getSleepTime() const;

0 commit comments

Comments
 (0)