Skip to content

Commit 465f125

Browse files
committed
MB-40267: ActiveStreamCheckpointProcessorTask should be a NonIO task
The ActiveStreamCheckpointProcessorTask is incorrectly assigned to the AuxIO thread pool (which is intended for IO-bound tasks not directly related to reading / flushing data). This bug traces back to when ActiveStreamCheckpointProcessorTask was created on the 3.x branch - from the review comments of the original patch (http://review.couchbase.org/c/ep-engine/+/57148/5/src/dcp-producer.cc#152): Jim Walker 2015-11-24 The initial reason is to try and distribute the load. NONIO is already running the DcpConsumer Processor(s), itempager, expirypager, checkpoint remover, hashtable resizer and many more (http://src.couchbase.org/source/search?q=&defs=&refs=NONIO_TASK_IDX&path=&hist=&type=&project=3.1.1) whereas AUXIO is running DcpBackfill and access scanner (other tasks appear to be TAP related). As stated in the comments, for future (master branch) this should be revisited, maybe creating a DCP task type or increasing NONIO threads? There is arguably two problems with it running AuxIO: 1. AuxIO tasks can often have long runtimes (latencies) as they are performing synchronous disk IO - and hence could impact the scheduling latency of the ActiveStreamCheckpointProcessorTask which should ideally have low latency (given it directly affects overall replication latency). 2. The AuxIO thread pool is small (compared to the NonIO thread pool) - thread counts for a few different machine sizes: #CPUs #AuxIO thread #NonIO threads 8 1 2 16 2 4 32 4 8 As such, DCP streaming (for example for in-memory phase of rebalance) can be limited in how much CPU it can use and be prematurely CPU-bound. Change-Id: I9e4549428dc3bd08dd33abe05da8dac521de9ef2 Reviewed-on: http://review.couchbase.org/c/kv_engine/+/140915 Reviewed-by: Ben Huddleston <[email protected]> Tested-by: Dave Rigby <[email protected]>
1 parent a04f014 commit 465f125

File tree

5 files changed

+80
-43
lines changed

5 files changed

+80
-43
lines changed

engines/ep/src/tasks.def.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ TASK(VBucketMemoryAndDiskDeletionTask, AUXIO_TASK_IDX, 0)
5050
TASK(PersistCollectionsManifest, AUXIO_TASK_IDX, 1)
5151
TASK(AccessScanner, AUXIO_TASK_IDX, 2)
5252
TASK(AccessScannerVisitor, AUXIO_TASK_IDX, 2)
53-
TASK(ActiveStreamCheckpointProcessorTask, AUXIO_TASK_IDX, 3)
5453
TASK(BackfillManagerTask, AUXIO_TASK_IDX, 4)
5554

5655
// Read/Write IO tasks
@@ -71,6 +70,7 @@ TASK(DcpConsumerTask, NONIO_TASK_IDX, 2)
7170
TASK(DurabilityCompletionTask, NONIO_TASK_IDX, 1)
7271
TASK(DurabilityTimeoutTask, NONIO_TASK_IDX, 1)
7372
TASK(DurabilityTimeoutVisitor, NONIO_TASK_IDX, 1)
73+
TASK(ActiveStreamCheckpointProcessorTask, NONIO_TASK_IDX, 3)
7474
TASK(ConnNotifierCallback, NONIO_TASK_IDX, 5)
7575
TASK(ClosedUnrefCheckpointRemoverTask, NONIO_TASK_IDX, 6)
7676
TASK(ClosedUnrefCheckpointRemoverVisitorTask, NONIO_TASK_IDX, 6)

engines/ep/src/vbucket_notify_context.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ struct VBNotifyCtx {
4444
*
4545
* 4) When there are no items in an ActiveStream's ready queue the front end
4646
* worker stepping will schedule the
47-
* ActiveStreamCheckpointProcessorTask. This will run on an AuxIO thread
47+
* ActiveStreamCheckpointProcessorTask. This will run on an NonIO thread
4848
* and enqueue nothing into the ActiveStream's ready queue if the only
49-
* item is a prepare. This will slow down other SyncWrites if AuxIO
49+
* item is a prepare. This will slow down other SyncWrites if NonIO
5050
* threads are a bottleneck.
5151
*
5252
* 5) The ActiveStreamCheckpointProcessorTask would then notify the front

engines/ep/tests/module_tests/dcp_reflection_test.cc

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,12 @@ static TaskQueue* getLpAuxQ() {
5353
return task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
5454
}
5555

56+
static TaskQueue* getLpNonIoQ() {
57+
auto* task_executor =
58+
reinterpret_cast<SingleThreadedExecutorPool*>(ExecutorPool::get());
59+
return task_executor->getLpTaskQ()[NONIO_TASK_IDX];
60+
}
61+
5662
/**
5763
* Test fixture which creates two ep-engine (bucket) instances, using one
5864
* as a source for DCP replication and the second as the destination.
@@ -385,17 +391,30 @@ std::unique_ptr<DcpResponse>
385391
DCPLoopbackStreamTest::DcpRoute::getNextProducerMsg(ActiveStream* stream) {
386392
std::unique_ptr<DcpResponse> producerMsg(stream->next());
387393
if (!producerMsg) {
388-
auto queueSize = getLpAuxQ()->getReadyQueueSize() +
389-
getLpAuxQ()->getFutureQueueSize();
390-
EXPECT_GE(queueSize, 1)
391-
<< "Expected to have at least "
392-
"ActiveStreamCheckpointProcessorTask "
393-
"in ready/future queue after null producerMsg";
394-
395-
// Run the next waiting task to populate the streams' items.
396-
CheckedExecutor executor(ExecutorPool::get(), *getLpAuxQ());
397-
executor.runCurrentTask();
398-
executor.completeCurrentTask();
394+
// Run the next ready task to populate the streams' items. This could
395+
// either be a NonIO task (ActiveStreamCheckpointProcessorTask) or
396+
// AuxIO task (
397+
398+
// Note that the actual count of ready tasks isn't just the reaadyQueue
399+
// - tasks in the futureQ whose waketime is less than or equal to now
400+
// can also be run.
401+
const auto auxIoQueueSize = getLpAuxQ()->getReadyQueueSize() +
402+
getLpAuxQ()->getFutureQueueSize();
403+
const auto nonIoQueueSize = getLpNonIoQ()->getReadyQueueSize() +
404+
getLpNonIoQ()->getFutureQueueSize();
405+
if (auxIoQueueSize > 0) {
406+
CheckedExecutor executor(ExecutorPool::get(), *getLpAuxQ());
407+
executor.runCurrentTask();
408+
executor.completeCurrentTask();
409+
} else if (nonIoQueueSize > 0) {
410+
CheckedExecutor executor(ExecutorPool::get(), *getLpNonIoQ());
411+
executor.runCurrentTask();
412+
executor.completeCurrentTask();
413+
} else {
414+
ADD_FAILURE() << "Expected to have at least one task in AuxIO / "
415+
"NonIO ready/future queues after null "
416+
"producerMsg, but both are zero";
417+
}
399418
if (!stream->getItemsRemaining()) {
400419
return {};
401420
}
@@ -559,7 +578,7 @@ DCPLoopbackStreamTest::DcpRoute::doStreamRequest(int flags) {
559578
2)
560579
<< "Should have both persistence and DCP producer cursor on "
561580
"producer VB";
562-
EXPECT_GE(getLpAuxQ()->getFutureQueueSize(), 1);
581+
EXPECT_GE(getLpNonIoQ()->getFutureQueueSize(), 1);
563582
// Finally the stream-request response sends the failover table back
564583
// to the consumer... simulate that
565584
auto failoverLog = producerVb->failovers->getFailoverLog();

engines/ep/tests/module_tests/ephemeral_bucket_test.cc

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -207,12 +207,12 @@ TEST_F(SingleThreadedEphemeralTest, RangeIteratorVBDeleteRaceTest) {
207207
EXPECT_FALSE(
208208
task_executor->isTaskScheduled(NONIO_TASK_IDX, vbDeleteTaskName));
209209

210-
auto& lpAuxioQ = *task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
211210
// Now bin the producer
212211
producer->cancelCheckpointCreatorTask();
213212
/* Checkpoint processor task finishes up and releases its producer
214213
reference */
215-
runNextTask(lpAuxioQ, "Process checkpoint(s) for DCP producer " + testName);
214+
auto& lpNonIoQ = *task_executor->getLpTaskQ()[NONIO_TASK_IDX];
215+
runNextTask(lpNonIoQ, "Process checkpoint(s) for DCP producer " + testName);
216216

217217
engine->getDcpConnMap().shutdownAllConnections();
218218
mock_stream.reset();
@@ -221,6 +221,7 @@ TEST_F(SingleThreadedEphemeralTest, RangeIteratorVBDeleteRaceTest) {
221221
// run the backfill task so the backfill can reach state
222222
// backfill_finished and be destroyed destroying the range iterator
223223
// in the process
224+
auto& lpAuxioQ = *task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
224225
runNextTask(lpAuxioQ, "Backfilling items for a DCP Connection");
225226

226227
// Now the backfill is gone, the evb can be deleted
@@ -429,13 +430,13 @@ TEST_F(SingleThreadedEphemeralPurgerTest, PurgeAcrossAllVbuckets) {
429430
bucket->enableTombstonePurgerTask();
430431
bucket->attemptToFreeMemory(); // this wakes up the HTCleaner task
431432

432-
auto& lpAuxioQ = *task_executor->getLpTaskQ()[NONIO_TASK_IDX];
433+
auto& lpNonIoQ = *task_executor->getLpTaskQ()[NONIO_TASK_IDX];
433434
/* Run the HTCleaner and EphTombstoneStaleItemDeleter tasks. We expect
434435
pause and resume of EphTombstoneStaleItemDeleter atleast once and we run
435436
until all the deleted items across all the vbuckets are purged */
436437
int numPaused = 0;
437438
while (!checkAllPurged(expPurgeUpto)) {
438-
runNextTask(lpAuxioQ);
439+
runNextTask(lpNonIoQ);
439440
++numPaused;
440441
}
441442
EXPECT_GT(numPaused, 2 /* 1 run of 'HTCleaner' and more than 1 run of
@@ -519,4 +520,4 @@ TEST_F(SingleThreadedEphemeralPurgerTest, HTCleanerSkipsPrepares) {
519520
ASSERT_FALSE(res.pending);
520521
ASSERT_FALSE(res.committed);
521522
}
522-
}
523+
}

engines/ep/tests/module_tests/evp_store_single_threaded_test.cc

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -980,10 +980,10 @@ TEST_F(MB29369_SingleThreadedEPBucketTest,
980980
auto producer = createDcpProducer(cookie, IncludeDeleteTime::Yes);
981981
producer->scheduleCheckpointProcessorTask();
982982

983-
auto& lpAuxioQ = *task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
984-
EXPECT_EQ(1, lpAuxioQ.getFutureQueueSize()) << "Expected to have "
985-
"ActiveStreamCheckpointProce"
986-
"ssorTask in AuxIO Queue";
983+
auto& lpNonIoQ = *task_executor->getLpTaskQ()[NONIO_TASK_IDX];
984+
EXPECT_EQ(1, lpNonIoQ.getFutureQueueSize())
985+
<< "Expected to have ActiveStreamCheckpointProcessorTask in NonIO "
986+
"Queue";
987987

988988
// Create dcp_producer_snapshot_marker_yield_limit + 1 streams -
989989
// this means that we don't process all pending vBuckets on a single
@@ -1075,11 +1075,13 @@ TEST_F(MB29369_SingleThreadedEPBucketTest,
10751075
// If the Backfilling task then runs, which returns a disk snapshot and
10761076
// re-registers the cursor; we still have an
10771077
// ActiveStreamCheckpointProcessorTask outstanding with the vb in the queue.
1078-
EXPECT_EQ(2, lpAuxioQ.getFutureQueueSize());
1078+
EXPECT_EQ(1, lpNonIoQ.getFutureQueueSize());
1079+
auto& lpAuxioQ = *task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
1080+
EXPECT_EQ(1, lpAuxioQ.getFutureQueueSize());
10791081

10801082
// Run the ActiveStreamCheckpointProcessorTask; which should re-schedule
10811083
// due to having items outstanding.
1082-
runNextTask(lpAuxioQ,
1084+
runNextTask(lpNonIoQ,
10831085
"Process checkpoint(s) for DCP producer test_producer");
10841086

10851087
// Now run backfilling task.
@@ -1101,7 +1103,7 @@ TEST_F(MB29369_SingleThreadedEPBucketTest,
11011103

11021104
// Now run chkptProcessorTask to complete it's queue. With the bug, this
11031105
// results in us discarding the last item we just added to vBucket.
1104-
runNextTask(lpAuxioQ,
1106+
runNextTask(lpNonIoQ,
11051107
"Process checkpoint(s) for DCP producer test_producer");
11061108

11071109
// Let the backfill task complete running (it requires multiple steps to
@@ -1127,7 +1129,7 @@ TEST_F(MB29369_SingleThreadedEPBucketTest,
11271129

11281130
// Now run chkptProcessorTask to complete it's queue, this will now be able
11291131
// to access the checkpoint and get key2
1130-
runNextTask(lpAuxioQ,
1132+
runNextTask(lpNonIoQ,
11311133
"Process checkpoint(s) for DCP producer test_producer");
11321134

11331135
result = stream->next();
@@ -1160,9 +1162,10 @@ TEST_P(STParamPersistentBucketTest, MB29585_backfilling_whilst_snapshot_runs) {
11601162
setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
11611163

11621164
auto& lpAuxioQ = *task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
1163-
EXPECT_EQ(1, lpAuxioQ.getFutureQueueSize()) << "Expected to have "
1164-
"ActiveStreamCheckpointProce"
1165-
"ssorTask in AuxIO Queue";
1165+
auto& lpNonIoQ = *task_executor->getLpTaskQ()[NONIO_TASK_IDX];
1166+
EXPECT_EQ(1, lpNonIoQ.getFutureQueueSize())
1167+
<< "Expected to have ActiveStreamCheckpointProcessorTask in NonIO "
1168+
"Queue";
11661169

11671170
// Create first stream
11681171
auto vb = store->getVBucket(vbid);
@@ -1224,7 +1227,7 @@ TEST_P(STParamPersistentBucketTest, MB29585_backfilling_whilst_snapshot_runs) {
12241227

12251228
// Next we must deque, but not run the snapshot task, we will interleave it
12261229
// with backfill later
1227-
CheckedExecutor checkpointTask(task_executor, lpAuxioQ);
1230+
CheckedExecutor checkpointTask(task_executor, lpNonIoQ);
12281231
EXPECT_STREQ("Process checkpoint(s) for DCP producer test_producer",
12291232
checkpointTask.getTaskName().data());
12301233

@@ -1276,7 +1279,7 @@ TEST_P(STParamPersistentBucketTest, MB29585_backfilling_whilst_snapshot_runs) {
12761279
FAIL() << "Expected Event::Mutation named 'key2'";
12771280
}
12781281

1279-
runNextTask(lpAuxioQ,
1282+
runNextTask(lpNonIoQ,
12801283
"Process checkpoint(s) for DCP producer test_producer");
12811284

12821285
result = stream->next();
@@ -1539,7 +1542,9 @@ TEST_P(STParamPersistentBucketTest, MB22960_cursor_dropping_data_loss) {
15391542
ckpt_mgr.removeCursor(mock_stream->getCursor().lock().get());
15401543

15411544
auto& lpAuxioQ = *task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
1542-
EXPECT_EQ(2, lpAuxioQ.getFutureQueueSize());
1545+
EXPECT_EQ(1, lpAuxioQ.getFutureQueueSize());
1546+
auto& lpNonIoQ = *task_executor->getLpTaskQ()[NONIO_TASK_IDX];
1547+
EXPECT_EQ(1, lpNonIoQ.getFutureQueueSize());
15431548
// backfill:create()
15441549
runNextTask(lpAuxioQ);
15451550
// backfill:scan()
@@ -1743,7 +1748,9 @@ TEST_P(STParamPersistentBucketTest,
17431748
mock_stream->next();
17441749

17451750
auto& lpAuxioQ = *task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
1746-
EXPECT_EQ(2, lpAuxioQ.getFutureQueueSize());
1751+
EXPECT_EQ(1, lpAuxioQ.getFutureQueueSize());
1752+
auto& lpNonIoQ = *task_executor->getLpTaskQ()[NONIO_TASK_IDX];
1753+
EXPECT_EQ(1, lpNonIoQ.getFutureQueueSize());
17471754
// backfill:create()
17481755
runNextTask(lpAuxioQ, "Backfilling items for a DCP Connection");
17491756
// backfill:scan()
@@ -1783,7 +1790,7 @@ TEST_P(STParamPersistentBucketTest,
17831790
ASSERT_EQ(1, registerCursorCount);
17841791

17851792
// ActiveStreamCheckpointProcessorTask
1786-
runNextTask(lpAuxioQ, "Process checkpoint(s) for DCP producer " + testName);
1793+
runNextTask(lpNonIoQ, "Process checkpoint(s) for DCP producer " + testName);
17871794
// BackfillManagerTask
17881795
runNextTask(lpAuxioQ, "Backfilling items for a DCP Connection");
17891796

@@ -1917,7 +1924,7 @@ TEST_P(STParamPersistentBucketTest, MB19428_no_streams_against_dead_vbucket) {
19171924
getEPBucket().flushVBucket(vbid));
19181925

19191926
setVBucketStateAndRunPersistTask(vbid, vbucket_state_dead);
1920-
auto& lpAuxioQ = *task_executor->getLpTaskQ()[AUXIO_TASK_IDX];
1927+
auto& lpNonIoQ = *task_executor->getLpTaskQ()[NONIO_TASK_IDX];
19211928

19221929
{
19231930
// Create a Mock Dcp producer
@@ -1928,7 +1935,7 @@ TEST_P(STParamPersistentBucketTest, MB19428_no_streams_against_dead_vbucket) {
19281935

19291936
// Creating a producer will not create an
19301937
// ActiveStreamCheckpointProcessorTask until a stream is created.
1931-
EXPECT_EQ(0, lpAuxioQ.getFutureQueueSize());
1938+
EXPECT_EQ(0, lpNonIoQ.getFutureQueueSize());
19321939

19331940
uint64_t rollbackSeqno;
19341941
auto err = producer->streamRequest(
@@ -1948,7 +1955,7 @@ TEST_P(STParamPersistentBucketTest, MB19428_no_streams_against_dead_vbucket) {
19481955

19491956
// The streamRequest failed and should not of created anymore tasks than
19501957
// ActiveStreamCheckpointProcessorTask.
1951-
EXPECT_EQ(1, lpAuxioQ.getFutureQueueSize());
1958+
EXPECT_EQ(1, lpNonIoQ.getFutureQueueSize());
19521959

19531960
// Stop Producer checkpoint processor task
19541961
producer->cancelCheckpointCreatorTask();
@@ -1999,8 +2006,9 @@ TEST_F(SingleThreadedEPBucketTest, ReadyQueueMaintainsWakeTimeOrder) {
19992006
TEST_F(SingleThreadedEPBucketTest, MB20235_wake_and_work_count) {
20002007
class TestTask : public GlobalTask {
20012008
public:
2002-
TestTask(EventuallyPersistentEngine *e, double s) :
2003-
GlobalTask(e, TaskId::ActiveStreamCheckpointProcessorTask, s) {}
2009+
TestTask(EventuallyPersistentEngine* e, double s)
2010+
: GlobalTask(e, TaskId::AccessScanner, s) {
2011+
}
20042012
bool run() override {
20052013
return false;
20062014
}
@@ -2504,6 +2512,12 @@ TEST_P(MB20054_SingleThreadedEPStoreTest,
25042512
EXPECT_EQ(0, lpAuxioQ->getFutureQueueSize());
25052513
EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
25062514

2515+
auto lpNonIoQ = task_executor->getLpTaskQ()[NONIO_TASK_IDX];
2516+
// Initially have ConnNotifierCallback and ConnManagerTasks on NonIO queue.
2517+
const size_t numInitialNonIoTasks = 2;
2518+
EXPECT_EQ(numInitialNonIoTasks, lpNonIoQ->getFutureQueueSize());
2519+
EXPECT_EQ(0, lpNonIoQ->getReadyQueueSize());
2520+
25072521
// Directly flush the vbucket, ensuring data is on disk.
25082522
// (This would normally also wake up the checkpoint remover task, but
25092523
// as that task was never registered with the ExecutorPool in this test
@@ -2550,9 +2564,12 @@ TEST_P(MB20054_SingleThreadedEPStoreTest,
25502564
dummy_dcp_add_failover_cb,
25512565
{}));
25522566

2553-
// FutureQ should now have an additional DCPBackfill task.
2554-
EXPECT_EQ(2, lpAuxioQ->getFutureQueueSize());
2567+
// FutureQ should now have an additional DCPBackfill task /
2568+
// ActiveStreamCheckpointProcessorTask.
2569+
EXPECT_EQ(1, lpAuxioQ->getFutureQueueSize());
25552570
EXPECT_EQ(0, lpAuxioQ->getReadyQueueSize());
2571+
EXPECT_EQ(numInitialNonIoTasks + 1, lpNonIoQ->getFutureQueueSize());
2572+
EXPECT_EQ(0, lpNonIoQ->getReadyQueueSize());
25562573

25572574
// Create an executor 'thread' to obtain shared ownership of the next
25582575
// AuxIO task (which should be BackfillManagerTask). As long as this

0 commit comments

Comments
 (0)