Skip to content

Commit a5eef6e

Browse files
committed
MB-59189: Do not access the cursor without a queueLock
Logging added under MB-58961 in neo accesses the cursor's position without a checkpoint manager lock, which is unsafe, as the checkpoint manager might have to dedup the item this cursor is currently on at any time. Return the Item the cursor gets registered on in CursorRegResult, so the calling code can access it without going via a cursor deref to print the info log. Change-Id: Ie819af7fd12743d18579f17652eb00cf47e55c9e Reviewed-on: https://review.couchbase.org/c/kv_engine/+/200830 Reviewed-by: Paolo Cocchi <[email protected]> Tested-by: Vesko Karaganev <[email protected]> Well-Formed: Restriction Checker Reviewed-by: Jim Walker <[email protected]> Reviewed-by: Dave Rigby <[email protected]>
1 parent 7196bec commit a5eef6e

File tree

3 files changed

+18
-10
lines changed

3 files changed

+18
-10
lines changed

engines/ep/src/checkpoint_manager.cc

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -328,10 +328,12 @@ CursorRegResult CheckpointManager::registerCursorBySeqno(
328328
// Note: The legacy logic (below) for cursor-registering is based on
329329
// Checkpoint::getMinimumCursorSeqno()/getHighSeqno() that are
330330
// meaningless if all mutations have been expelled.
331+
auto pos = (*ckptIt)->begin();
331332
newCursor = std::make_shared<CheckpointCursor>(
332-
name, ckptIt, (*ckptIt)->begin(), droppable, 0);
333+
name, ckptIt, pos, droppable, 0);
333334
result.nextSeqno = lastBySeqno + 1;
334335
result.cursor.setCursor(newCursor);
336+
result.position = *pos;
335337

336338
// Trigger backfill only if there's a gap between lastProcessedSeqno and
337339
// nextSeqnoAvailableFromCheckpoint
@@ -366,10 +368,12 @@ CursorRegResult CheckpointManager::registerCursorBySeqno(
366368
if (lastProcessedSeqno < st) {
367369
// Requested sequence number is before the start of this
368370
// checkpoint, position cursor at the checkpoint begin.
371+
auto pos = (*ckptIt)->begin();
369372
newCursor = std::make_shared<CheckpointCursor>(
370-
name, ckptIt, (*ckptIt)->begin(), droppable, 0);
373+
name, ckptIt, pos, droppable, 0);
371374
result.nextSeqno = st;
372375
result.cursor.setCursor(newCursor);
376+
result.position = *pos;
373377
// Set tryBackfill:true only if lastProcessedSeqno to st is non
374378
// contiguous. Note this is likely the fix for MB-53616/MB-58302
375379
// which were noted in master only, and likely backported to the
@@ -424,6 +428,7 @@ CursorRegResult CheckpointManager::registerCursorBySeqno(
424428
newCursor = std::make_shared<CheckpointCursor>(
425429
name, ckptIt, pos, droppable, distance);
426430
result.cursor.setCursor(newCursor);
431+
result.position = *pos;
427432
break;
428433
}
429434
}

engines/ep/src/cursor.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111

1212
#pragma once
1313

14+
#include "ep_types.h"
15+
#include <platform/atomic.h>
16+
1417
#include <folly/SharedMutex.h>
1518
#include <memory>
1619
#include <mutex>
@@ -94,4 +97,6 @@ struct CursorRegResult {
9497
uint64_t nextSeqno;
9598
// The registered cursor
9699
Cursor cursor;
100+
// The item the cursor was registered on.
101+
queued_item position;
97102
};

engines/ep/src/dcp/active_stream.cc

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -261,8 +261,7 @@ void ActiveStream::registerCursor(CheckpointManager& chkptmgr,
261261
curChkSeqno = result.nextSeqno;
262262
cursor = result.cursor;
263263

264-
const auto lockedCursor = result.cursor.lock();
265-
Expects(lockedCursor);
264+
Expects(result.position);
266265
log(spdlog::level::level_enum::info,
267266
"{} ActiveStream::registerCursor name \"{}\", "
268267
"lastProcessedSeqno:{}, pendingBackfill:{}, "
@@ -272,8 +271,8 @@ void ActiveStream::registerCursor(CheckpointManager& chkptmgr,
272271
lastProcessedSeqno,
273272
pendingBackfill,
274273
result.tryBackfill,
275-
::to_string((*lockedCursor->getPos())->getOperation()),
276-
(*lockedCursor->getPos())->getBySeqno(),
274+
::to_string(result.position->getOperation()),
275+
result.position->getBySeqno(),
277276
result.nextSeqno);
278277
} catch (std::exception& error) {
279278
log(spdlog::level::level_enum::warn,
@@ -1813,8 +1812,7 @@ void ActiveStream::scheduleBackfill_UNLOCKED(DcpProducer& producer,
18131812
return;
18141813
}
18151814

1816-
const auto lockedCursor = registerResult.cursor.lock();
1817-
Expects(lockedCursor);
1815+
Expects(registerResult.position);
18181816
log(spdlog::level::level_enum::info,
18191817
"{} ActiveStream::scheduleBackfill_UNLOCKED register cursor with "
18201818
"name \"{}\" lastProcessedSeqno:{}, result{{tryBackfill:{}, op:{}, "
@@ -1823,8 +1821,8 @@ void ActiveStream::scheduleBackfill_UNLOCKED(DcpProducer& producer,
18231821
name_,
18241822
lastReadSeqno.load(),
18251823
registerResult.tryBackfill,
1826-
::to_string((*lockedCursor->getPos())->getOperation()),
1827-
(*lockedCursor->getPos())->getBySeqno(),
1824+
::to_string(registerResult.position->getOperation()),
1825+
registerResult.position->getBySeqno(),
18281826
registerResult.nextSeqno);
18291827

18301828
curChkSeqno = registerResult.nextSeqno;

0 commit comments

Comments
 (0)