Skip to content

Commit 20c6ce3

Browse files
manudhundidaverigby
authored andcommitted
MB-26979: ChkptProcessorTask should not own dcp stream objs
Currently the ActiveStreamCheckpointProcessorTask co-owns the corresponding stream object. So if the producer connection (co-owner) quickly closes the stream and opens a new stream on the same vbucket (that is the streams map of the producer will contain new stream object), then the ActiveStreamCheckpointProcessorTask will contain any entry to the older stream. This could result in a case where the processor task is not run for the new stream which could result in a DCP hang. This commit fixes the issue by making sure that the processor task only contains the vbucket id and the stream is looked up from the producer streams map when needed. However this requires the ActiveStreamCheckpointProcessorTask to hold a shared reference to the producer though the producer obj holds a shared reference to the task (thereby resulting in a cyclic reference). Hence in the delete path, the cyclic reference is broken by manually deleting the producer reference. The problem of cyclic reference can be averted in the master branch by the use of shared/weak ptr. Also, to test the code, some refactoring of test code is done. Change-Id: I4b16bb81aac6f45a137affa9870be6f1416e9464 Reviewed-on: http://review.couchbase.org/86277 Well-Formed: Build Bot <[email protected]> Tested-by: Build Bot <[email protected]> Reviewed-by: Jim Walker <[email protected]>
1 parent e6286bd commit 20c6ce3

File tree

9 files changed

+385
-166
lines changed

9 files changed

+385
-166
lines changed

src/connmap.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1146,7 +1146,7 @@ void DcpConnMap::closeStreams(CookieToConnectionMap& map) {
11461146
DcpProducer* producer = dynamic_cast<DcpProducer*> (itr.second.get());
11471147
if (producer) {
11481148
producer->closeAllStreams();
1149-
producer->clearCheckpointProcessorTaskQueues();
1149+
producer->cancelCheckpointCreatorTask();
11501150
// The producer may be in EWOULDBLOCK (if it's idle), therefore
11511151
// notify him to ensure the front-end connection can close the TCP
11521152
// connection.
@@ -1207,7 +1207,7 @@ void DcpConnMap::disconnect(const void *cookie) {
12071207
DcpProducer* producer = dynamic_cast<DcpProducer*> (conn.get());
12081208
if (producer) {
12091209
producer->closeAllStreams();
1210-
producer->clearCheckpointProcessorTaskQueues();
1210+
producer->cancelCheckpointCreatorTask();
12111211
} else {
12121212
// Cancel consumer's processer task before closing all streams
12131213
static_cast<DcpConsumer*>(conn.get())->cancelTask();

src/dcp/producer.cc

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -184,14 +184,18 @@ DcpProducer::DcpProducer(EventuallyPersistentEngine &e, const void *cookie,
184184

185185
backfillMgr.reset(new BackfillManager(&engine_));
186186

187-
checkpointCreatorTask = new ActiveStreamCheckpointProcessorTask(e);
187+
checkpointCreatorTask = new ActiveStreamCheckpointProcessorTask(e, this);
188188
ExecutorPool::get()->schedule(checkpointCreatorTask, AUXIO_TASK_IDX);
189189
}
190190

191191
DcpProducer::~DcpProducer() {
192192
backfillMgr.reset();
193193
delete rejectResp;
194+
}
194195

196+
void DcpProducer::cancelCheckpointCreatorTask() {
197+
static_cast<ActiveStreamCheckpointProcessorTask*>(
198+
checkpointCreatorTask.get())->cancelTask();
195199
ExecutorPool::get()->cancel(checkpointCreatorTask->getId());
196200
}
197201

@@ -1037,8 +1041,3 @@ void DcpProducer::scheduleCheckpointProcessorTask(stream_t s) {
10371041
static_cast<ActiveStreamCheckpointProcessorTask*>(checkpointCreatorTask.get())
10381042
->schedule(s);
10391043
}
1040-
1041-
void DcpProducer::clearCheckpointProcessorTaskQueues() {
1042-
static_cast<ActiveStreamCheckpointProcessorTask*>(checkpointCreatorTask.get())
1043-
->clearQueues();
1044-
}

src/dcp/producer.h

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,13 @@ class DcpProducer : public Producer {
3232
DcpProducer(EventuallyPersistentEngine &e, const void *cookie,
3333
const std::string &n, bool notifyOnly);
3434

35-
~DcpProducer();
35+
virtual ~DcpProducer();
36+
37+
/**
38+
* Clears active stream checkpoint processor task's queue, resets its
39+
* shared reference to the producer and cancels the task.
40+
*/
41+
void cancelCheckpointCreatorTask();
3642

3743
ENGINE_ERROR_CODE streamRequest(uint32_t flags, uint32_t opaque,
3844
uint16_t vbucket, uint64_t start_seqno,
@@ -201,10 +207,14 @@ class DcpProducer : public Producer {
201207
*/
202208
void scheduleCheckpointProcessorTask(stream_t s);
203209

204-
/*
205-
Clears active stream checkpoint processor task's queue.
206-
*/
207-
void clearCheckpointProcessorTaskQueues();
210+
/**
211+
* Returns a shared reference to the stream associated with the vbucket
212+
*
213+
* @param vbid Vbucket id
214+
*
215+
* @return shared reference ptr to the stream associated
216+
*/
217+
stream_t findStreamByVbid(uint16_t vbid);
208218

209219
protected:
210220

@@ -218,13 +228,14 @@ class DcpProducer : public Producer {
218228
Couchbase::RelaxedAtomic<bool> enabled;
219229
} noopCtx;
220230

231+
ExTask checkpointCreatorTask;
232+
221233
private:
222234

223235

224236
DcpResponse* getNextItem();
225237

226238
size_t getItemsRemaining();
227-
stream_t findStreamByVbid(uint16_t vbid);
228239

229240
std::string priority;
230241

@@ -256,9 +267,7 @@ class DcpProducer : public Producer {
256267
AtomicValue<size_t> itemsSent;
257268
AtomicValue<size_t> totalBytesSent;
258269

259-
ExTask checkpointCreatorTask;
260270
static const uint32_t defaultNoopInerval;
261-
262271
};
263272

264273
#endif // SRC_DCP_PRODUCER_H_

src/dcp/stream.cc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -798,20 +798,23 @@ void ActiveStreamCheckpointProcessorTask::wakeup() {
798798
}
799799

800800
void ActiveStreamCheckpointProcessorTask::schedule(stream_t stream) {
801-
pushUnique(stream);
801+
pushUnique(stream->getVBucket());
802802

803803
bool expected = false;
804804
if (notified.compare_exchange_strong(expected, true)) {
805805
wakeup();
806806
}
807807
}
808808

809-
void ActiveStreamCheckpointProcessorTask::clearQueues() {
809+
void ActiveStreamCheckpointProcessorTask::cancelTask() {
810810
LockHolder lh(workQueueLock);
811811
while (!queue.empty()) {
812812
queue.pop();
813813
}
814814
queuedVbuckets.clear();
815+
/* Reset the producer while holding the lock as it is a
816+
SingleThreadedRCPtr */
817+
producer.reset();
815818
}
816819

817820
void ActiveStream::nextCheckpointItemTask() {

src/dcp/stream.h

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -386,12 +386,14 @@ class ActiveStream : public Stream {
386386

387387
class ActiveStreamCheckpointProcessorTask : public GlobalTask {
388388
public:
389-
ActiveStreamCheckpointProcessorTask(EventuallyPersistentEngine& e)
389+
ActiveStreamCheckpointProcessorTask(EventuallyPersistentEngine& e,
390+
dcp_producer_t p)
390391
: GlobalTask(&e, TaskId::ActiveStreamCheckpointProcessorTask,
391392
INT_MAX, false),
392393
notified(false),
393394
iterationsBeforeYield(e.getConfiguration()
394-
.getDcpProducerSnapshotMarkerYieldLimit()) { }
395+
.getDcpProducerSnapshotMarkerYieldLimit()),
396+
producer(p) { }
395397

396398
std::string getDescription() {
397399
std::string rv("Process checkpoint(s) for DCP producer");
@@ -401,17 +403,38 @@ class ActiveStreamCheckpointProcessorTask : public GlobalTask {
401403
bool run();
402404
void schedule(stream_t stream);
403405
void wakeup();
404-
void clearQueues();
406+
407+
/* Clears the queues and resets the producer reference */
408+
void cancelTask();
409+
410+
/* Returns the number of unique streams waiting to be processed */
411+
size_t queueSize() {
412+
LockHolder lh(workQueueLock);
413+
return queue.size();
414+
}
405415

406416
private:
407417

408418
stream_t queuePop() {
409419
stream_t rval;
410-
LockHolder lh(workQueueLock);
411-
if (!queue.empty()) {
412-
rval = queue.front();
420+
uint16_t vbid = 0;
421+
dcp_producer_t producerRefCpy;
422+
{
423+
LockHolder lh(workQueueLock);
424+
if (queue.empty()) {
425+
return rval;
426+
}
427+
vbid = queue.front();
413428
queue.pop();
414-
queuedVbuckets.erase(rval->getVBucket());
429+
queuedVbuckets.erase(vbid);
430+
/* Get a copy of refPtr as we are releasing the workQueueLock */
431+
producerRefCpy = producer;
432+
}
433+
434+
/* findStreamByVbid acquires DcpProducer::streamsMutex, hence called
435+
without acquiring workQueueLock */
436+
if (producerRefCpy) {
437+
return producerRefCpy->findStreamByVbid(vbid);
415438
}
416439
return rval;
417440
}
@@ -421,25 +444,34 @@ class ActiveStreamCheckpointProcessorTask : public GlobalTask {
421444
return queue.empty();
422445
}
423446

424-
void pushUnique(stream_t stream) {
447+
void pushUnique(uint16_t vbid) {
425448
LockHolder lh(workQueueLock);
426-
if (queuedVbuckets.count(stream->getVBucket()) == 0) {
427-
queue.push(stream);
428-
queuedVbuckets.insert(stream->getVBucket());
449+
if (queuedVbuckets.count(vbid) == 0) {
450+
queue.push(vbid);
451+
queuedVbuckets.insert(vbid);
429452
}
430453
}
431454

432455
Mutex workQueueLock;
433456

434457
/**
435-
* Maintain a queue of unique stream_t
458+
* Maintain a queue of unique vbucket ids for which stream should be
459+
* processed.
436460
* There's no need to have the same stream in the queue more than once
461+
*
462+
* The streams are kept in the 'streams map' of the producer object. We
463+
* should not hold a shared reference to the stream object here in order to
464+
* avoid multiple stream ownership issues
437465
*/
438-
std::queue<stream_t> queue;
439-
std::set<uint16_t> queuedVbuckets;
466+
std::queue<uint16_t> queue;
467+
std::unordered_set<uint16_t> queuedVbuckets;
440468

441469
AtomicValue<bool> notified;
442470
size_t iterationsBeforeYield;
471+
472+
/* shared reference to the producer object, should be deleted when the task
473+
is stopped because the producer object contains a reference to this */
474+
dcp_producer_t producer;
443475
};
444476

445477
class NotifierStream : public Stream {

tests/mock/mock_dcp_producer.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#pragma once
1919

2020
#include "dcp/producer.h"
21+
#include "dcp/stream.h"
2122

2223
/*
2324
* Mock of the DcpProducer class. Wraps the real DcpProducer, but exposes
@@ -54,4 +55,11 @@ class MockDcpProducer: public DcpProducer {
5455
bool getNoopEnabled() {
5556
return noopCtx.enabled;
5657
}
57-
};
58+
59+
ActiveStreamCheckpointProcessorTask& getCheckpointSnapshotTask() const {
60+
return *static_cast<ActiveStreamCheckpointProcessorTask*>(
61+
checkpointCreatorTask.get());
62+
}
63+
};
64+
65+
using mock_dcp_producer_t = SingleThreadedRCPtr<MockDcpProducer>;

0 commit comments

Comments
 (0)