Skip to content

Commit 9e750e3

Browse files
committed
Merge remote-tracking branch 'ep-engine/watson' into kv_engine/watson_ep
* ep-engine/watson: MB-26979: ChkptProcessorTask should not own dcp stream objs MB-24142: Use correct unit for slowTask recording [BP] MB-25798: Don't schedule backfill until previous is complete [BP] MB-25798: Re-register a dropped cursor if we don't backfill [BP] MB-25798: Backfill task leave stream state unchanged [BP] MB-25798: Log when DcpProducer::Buffer log is full [BP] MB-25798: Log seqno data for when not scheduling backfill [BP] MB-25798: Improved logging for handle slow stream & scheduling backfill MB-25630: Include read-only KVStore stats in 'cbstats kvtimings'
2 parents 4136ffa + 20c6ce3 commit 9e750e3

File tree

14 files changed

+690
-259
lines changed

14 files changed

+690
-259
lines changed

src/checkpoint.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -678,9 +678,9 @@ CursorRegResult CheckpointManager::registerCursorBySeqno(
678678
* number we are looking for is higher than anything currently assigned
679679
* and there is already an assert above for this case.
680680
*/
681-
LOG(EXTENSION_LOG_WARNING, "Cursor not registered into vb %d "
682-
" for stream '%s' because seqno %" PRIu64 " is too high",
683-
vbucketId, name.c_str(), startBySeqno);
681+
throw std::logic_error(
682+
"CheckpointManager::registerCursorBySeqno the sequences number "
683+
"is higher than anything currently assigned");
684684
}
685685
return result;
686686
}

src/connmap.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1146,7 +1146,7 @@ void DcpConnMap::closeStreams(CookieToConnectionMap& map) {
11461146
DcpProducer* producer = dynamic_cast<DcpProducer*> (itr.second.get());
11471147
if (producer) {
11481148
producer->closeAllStreams();
1149-
producer->clearCheckpointProcessorTaskQueues();
1149+
producer->cancelCheckpointCreatorTask();
11501150
// The producer may be in EWOULDBLOCK (if it's idle), therefore
11511151
// notify him to ensure the front-end connection can close the TCP
11521152
// connection.
@@ -1207,7 +1207,7 @@ void DcpConnMap::disconnect(const void *cookie) {
12071207
DcpProducer* producer = dynamic_cast<DcpProducer*> (conn.get());
12081208
if (producer) {
12091209
producer->closeAllStreams();
1210-
producer->clearCheckpointProcessorTaskQueues();
1210+
producer->cancelCheckpointCreatorTask();
12111211
} else {
12121212
// Cancel consumer's processer task before closing all streams
12131213
static_cast<DcpConsumer*>(conn.get())->cancelTask();

src/couch-kvstore/couch-kvstore.cc

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1011,9 +1011,6 @@ void CouchKVStore::addStats(const std::string &prefix,
10111011

10121012
void CouchKVStore::addTimingStats(const std::string &prefix,
10131013
ADD_STAT add_stat, const void *c) {
1014-
if (isReadOnly()) {
1015-
return;
1016-
}
10171014
const char *prefix_str = prefix.c_str();
10181015
addStat(prefix_str, "commit", st.commitHisto, add_stat, c);
10191016
addStat(prefix_str, "compact", st.compactHisto, add_stat, c);

src/dcp/producer.cc

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,16 @@ bool DcpProducer::BufferLog::pauseIfFull() {
8181

8282
void DcpProducer::BufferLog::unpauseIfSpaceAvailable() {
8383
ReaderLockHolder rlh(logLock);
84-
if (getState_UNLOCKED() != Full) {
84+
if (getState_UNLOCKED() == Full) {
85+
LOG(EXTENSION_LOG_NOTICE, "%s Unable to notify paused connection "
86+
"because DcpProducer::BufferLog is full; "
87+
"ackedBytes:%" PRIu64 ", bytesSent:%" PRIu64 ", "
88+
"maxBytes:%" PRIu64 ,
89+
producer.logHeader(),
90+
uint64_t(ackedBytes),
91+
uint64_t(bytesSent),
92+
uint64_t(maxBytes));
93+
} else {
8594
producer.notifyPaused(true);
8695
}
8796
}
@@ -93,6 +102,14 @@ void DcpProducer::BufferLog::acknowledge(size_t bytes) {
93102
release_UNLOCKED(bytes);
94103
ackedBytes += bytes;
95104
if (state == Full) {
105+
LOG(EXTENSION_LOG_NOTICE, "%s Notifying paused connection now that "
106+
"DcpProducer::Bufferlog is no longer full; "
107+
"ackedBytes:%" PRIu64 ", bytesSent:%" PRIu64 ", "
108+
"maxBytes:%" PRIu64 ,
109+
producer.logHeader(),
110+
uint64_t(ackedBytes),
111+
uint64_t(bytesSent),
112+
uint64_t(maxBytes));
96113
producer.notifyPaused(true);
97114
}
98115
}
@@ -167,14 +184,18 @@ DcpProducer::DcpProducer(EventuallyPersistentEngine &e, const void *cookie,
167184

168185
backfillMgr.reset(new BackfillManager(&engine_));
169186

170-
checkpointCreatorTask = new ActiveStreamCheckpointProcessorTask(e);
187+
checkpointCreatorTask = new ActiveStreamCheckpointProcessorTask(e, this);
171188
ExecutorPool::get()->schedule(checkpointCreatorTask, AUXIO_TASK_IDX);
172189
}
173190

174191
DcpProducer::~DcpProducer() {
175192
backfillMgr.reset();
176193
delete rejectResp;
194+
}
177195

196+
void DcpProducer::cancelCheckpointCreatorTask() {
197+
static_cast<ActiveStreamCheckpointProcessorTask*>(
198+
checkpointCreatorTask.get())->cancelTask();
178199
ExecutorPool::get()->cancel(checkpointCreatorTask->getId());
179200
}
180201

@@ -784,18 +805,8 @@ bool DcpProducer::handleSlowStream(uint16_t vbid,
784805
if (stream) {
785806
if (stream->getName().compare(name) == 0) {
786807
ActiveStream* as = static_cast<ActiveStream*>(stream.get());
787-
if (as) {
788-
LOG(EXTENSION_LOG_NOTICE, "%s (vb %" PRIu16 ") Producer "
789-
"is handling slow stream;"
790-
" state:%s lastReadSeqno:%" PRIu64
791-
" lastSentSeqno:%" PRIu64,
792-
logHeader(), vbid,
793-
Stream::stateName(as->getState()),
794-
as->getLastReadSeqno(),
795-
as->getLastSentSeqno());
796-
as->handleSlowStream();
797-
return true;
798-
}
808+
as->handleSlowStream();
809+
return true;
799810
}
800811
}
801812
}
@@ -1030,8 +1041,3 @@ void DcpProducer::scheduleCheckpointProcessorTask(stream_t s) {
10301041
static_cast<ActiveStreamCheckpointProcessorTask*>(checkpointCreatorTask.get())
10311042
->schedule(s);
10321043
}
1033-
1034-
void DcpProducer::clearCheckpointProcessorTaskQueues() {
1035-
static_cast<ActiveStreamCheckpointProcessorTask*>(checkpointCreatorTask.get())
1036-
->clearQueues();
1037-
}

src/dcp/producer.h

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,13 @@ class DcpProducer : public Producer {
3232
DcpProducer(EventuallyPersistentEngine &e, const void *cookie,
3333
const std::string &n, bool notifyOnly);
3434

35-
~DcpProducer();
35+
virtual ~DcpProducer();
36+
37+
/**
38+
* Clears active stream checkpoint processor task's queue, resets its
39+
* shared reference to the producer and cancels the task.
40+
*/
41+
void cancelCheckpointCreatorTask();
3642

3743
ENGINE_ERROR_CODE streamRequest(uint32_t flags, uint32_t opaque,
3844
uint16_t vbucket, uint64_t start_seqno,
@@ -201,10 +207,14 @@ class DcpProducer : public Producer {
201207
*/
202208
void scheduleCheckpointProcessorTask(stream_t s);
203209

204-
/*
205-
Clears active stream checkpoint processor task's queue.
206-
*/
207-
void clearCheckpointProcessorTaskQueues();
210+
/**
211+
* Returns a shared reference to the stream associated with the vbucket
212+
*
213+
* @param vbid Vbucket id
214+
*
215+
* @return shared reference ptr to the stream associated
216+
*/
217+
stream_t findStreamByVbid(uint16_t vbid);
208218

209219
protected:
210220

@@ -218,13 +228,14 @@ class DcpProducer : public Producer {
218228
Couchbase::RelaxedAtomic<bool> enabled;
219229
} noopCtx;
220230

231+
ExTask checkpointCreatorTask;
232+
221233
private:
222234

223235

224236
DcpResponse* getNextItem();
225237

226238
size_t getItemsRemaining();
227-
stream_t findStreamByVbid(uint16_t vbid);
228239

229240
std::string priority;
230241

@@ -256,9 +267,7 @@ class DcpProducer : public Producer {
256267
AtomicValue<size_t> itemsSent;
257268
AtomicValue<size_t> totalBytesSent;
258269

259-
ExTask checkpointCreatorTask;
260270
static const uint32_t defaultNoopInerval;
261-
262271
};
263272

264273
#endif // SRC_DCP_PRODUCER_H_

0 commit comments

Comments
 (0)