Skip to content

Commit 2657a2b

Browse files
committed
Merge remote-tracking branch 'couchbase/alice'
* couchbase/alice: MB-31495: Fix bug in getRandomKey MB-31175: Ephemeral HTTombstonePurger underflows age check MB-31327: Ephemeral backfill does not respect purge seqno. MB-31141: Don't reject snappy|raw DCP deletes MB-31141: Account for nmeta in deleteWithMeta Change-Id: I15638b53f28908581011a852798494c4e76c54d2
2 parents 0238430 + 9444e64 commit 2657a2b

18 files changed

+428
-86
lines changed

daemon/mcbp_validators.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,8 +420,12 @@ static bool valid_dcp_delete_datatype(protocol_binary_datatype_t datatype) {
420420
// it may send XATTR|JSON (with snappy possible). These are now allowed
421421
// so rebalance won't be failed and the consumer will sanitise the faulty
422422
// documents.
423-
std::array<const protocol_binary_datatype_t, 5> valid = {
423+
// MB-31141: Allowing RAW+Snappy. A bug in delWithMeta has allowed us to
424+
// create deletes with a non-zero value tagged as RAW, which when snappy
425+
// is enabled gets DCP shipped as RAW+Snappy.
426+
std::array<const protocol_binary_datatype_t, 6> valid = {
424427
{PROTOCOL_BINARY_RAW_BYTES,
428+
PROTOCOL_BINARY_RAW_BYTES | PROTOCOL_BINARY_DATATYPE_SNAPPY,
425429
PROTOCOL_BINARY_DATATYPE_XATTR,
426430
PROTOCOL_BINARY_DATATYPE_XATTR | PROTOCOL_BINARY_DATATYPE_SNAPPY,
427431
PROTOCOL_BINARY_DATATYPE_XATTR | PROTOCOL_BINARY_DATATYPE_JSON,

engines/ep/src/dcp/backfill_memory.cc

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,21 @@ backfill_status_t DCPBackfillMemoryBuffered::create() {
210210
return backfill_snooze;
211211
}
212212

213+
/* Check startSeqno against the purge-seqno of the vb.
214+
* If the startSeqno != 1 (a 0 to n request) then startSeqno must be
215+
* greater than purgeSeqno. */
216+
if (startSeqno != 1 && (startSeqno <= evb->getPurgeSeqno())) {
217+
EP_LOG_WARN(
218+
"DCPBackfillMemoryBuffered::create(): "
219+
"({}) running backfill failed because the startSeqno:{} is < "
220+
"purgeSeqno:{}",
221+
getVBucketId(),
222+
startSeqno,
223+
evb->getPurgeSeqno());
224+
stream->setDead(END_STREAM_ROLLBACK);
225+
return backfill_finished;
226+
}
227+
213228
/* Advance the cursor till start, mark snapshot and update backfill
214229
remaining count */
215230
while (rangeItr.curr() != rangeItr.end()) {

engines/ep/src/dcp/consumer.cc

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -572,10 +572,16 @@ ENGINE_ERROR_CODE DcpConsumer::deletion(uint32_t opaque,
572572
// MB-29040: Producer may send deleted doc with value that still has
573573
// the user xattrs and the body. Fix up that mistake by running the
574574
// expiry hook which will correctly process the document
575-
if (mcbp::datatype::is_xattr(datatype) && value.size()) {
576-
auto vb = engine_.getVBucket(vbucket);
577-
if (vb) {
578-
engine_.getKVBucket()->runPreExpiryHook(*vb, *item);
575+
if (value.size()) {
576+
if (mcbp::datatype::is_xattr(datatype)) {
577+
auto vb = engine_.getVBucket(vbucket);
578+
if (vb) {
579+
engine_.getKVBucket()->runPreExpiryHook(*vb, *item);
580+
}
581+
} else {
582+
// MB-31141: Deletes cannot have a value
583+
item->replaceValue(Blob::New(0));
584+
item->setDataType(PROTOCOL_BINARY_RAW_BYTES);
579585
}
580586
}
581587

engines/ep/src/ep_engine.cc

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5135,9 +5135,12 @@ ENGINE_ERROR_CODE EventuallyPersistentEngine::deleteWithMeta(
51355135
keyOffset += 2; // 2 bytes for nmeta
51365136
nmeta = ntohs(nmeta);
51375137
if (nmeta > 0) {
5138-
emd = cb::const_byte_buffer(
5139-
request->bytes + sizeof(request->bytes) + nkey + keyOffset,
5140-
nmeta);
5138+
// Correct the vallen
5139+
vallen -= nmeta;
5140+
emd = cb::const_byte_buffer(request->bytes +
5141+
sizeof(request->bytes) + nkey +
5142+
keyOffset + vallen,
5143+
nmeta);
51415144
}
51425145
}
51435146

engines/ep/src/ephemeral_tombstone_purger.cc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,13 @@ bool EphemeralVBucket::HTTombstonePurger::visit(
4444
const HashTable::HashBucketLock& hbl, StoredValue& v) {
4545
auto* osv = v.toOrderedStoredValue();
4646

47-
if (osv->isDeleted() && (now - osv->getDeletedTime() >= purgeAge)) {
47+
// MB-31175: Item must have been deleted before this task starts to ensure
48+
// that we do not get a -ve value when we check if the time difference
49+
// is >= purgeAge. This is preferable to updating the task start time for
50+
// every visit and has little impact as this task runs frequently.
51+
if (osv->isDeleted() &&
52+
(now >= osv->getDeletedTime()) &&
53+
(now - osv->getDeletedTime() >= purgeAge)) {
4854
// This item should be purged. Remove from the HashTable and move over
4955
// to being owned by the sequence list.
5056
auto ownedSV = vbucket->ht.unlocked_release(hbl, v.getKey());

engines/ep/src/kv_bucket.cc

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1407,24 +1407,24 @@ GetValue KVBucket::getInternal(const DocKey& key,
14071407
GetValue KVBucket::getRandomKey() {
14081408
size_t max = vbMap.getSize();
14091409

1410-
const Vbid::id_type start = labs(random()) % max;
1410+
const Vbid::id_type start = labs(getRandom()) % max;
14111411
Vbid::id_type curr = start;
14121412
std::unique_ptr<Item> itm;
14131413

14141414
while (itm == NULL) {
14151415
VBucketPtr vb = getVBucket(Vbid(curr++));
14161416
while (!vb || vb->getState() != vbucket_state_active) {
1417-
if (curr == start) {
1418-
return GetValue(NULL, ENGINE_KEY_ENOENT);
1419-
}
14201417
if (curr == max) {
14211418
curr = 0;
14221419
}
1420+
if (curr == start) {
1421+
return GetValue(NULL, ENGINE_KEY_ENOENT);
1422+
}
14231423

14241424
vb = getVBucket(Vbid(curr++));
14251425
}
14261426

1427-
if ((itm = vb->ht.getRandomKey(random()))) {
1427+
if ((itm = vb->ht.getRandomKey(getRandom()))) {
14281428
GetValue rv(std::move(itm), ENGINE_SUCCESS);
14291429
return rv;
14301430
}

engines/ep/src/kv_bucket.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,11 @@ class KVBucket : public KVBucketIface {
177177
options);
178178
}
179179

180+
/**
181+
* Retrieve a value randomly from the store.
182+
*
183+
* @return a GetValue representing the value retrieved
184+
*/
180185
GetValue getRandomKey(void);
181186

182187
/**
@@ -1015,6 +1020,12 @@ class KVBucket : public KVBucketIface {
10151020

10161021
std::atomic<size_t> maxTtl;
10171022

1023+
/**
1024+
* Allows us to override the random function. This is used for testing
1025+
* purposes where we want a constant number as opposed to a random one.
1026+
*/
1027+
std::function<long()> getRandom = random;
1028+
10181029
friend class KVBucketTest;
10191030

10201031
DISALLOW_COPY_AND_ASSIGN(KVBucket);

engines/ep/src/stored-value.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,7 @@ std::ostream& operator<<(std::ostream& os, const StoredValue& sv) {
421421

422422
// seqno, revid, expiry / purge time
423423
os << "seq:" << sv.getBySeqno() << " rev:" << sv.getRevSeqno();
424+
os << " cas:" << sv.getCas();
424425
os << " key:\"" << sv.getKey() << "\"";
425426
if (sv.isOrdered() && sv.isDeleted()) {
426427
os << " del_time:" << sv.lock_expiry_or_delete_time;

engines/ep/tests/mock/mock_stream.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,8 @@ class MockActiveStream : public ActiveStream {
171171
void public_registerCursor(CheckpointManager& manager,
172172
const std::string& name,
173173
int64_t seqno);
174+
175+
bool isDead() { return ActiveStream::getState() == StreamState::Dead; };
174176
};
175177

176178
/**

engines/ep/tests/module_tests/dcp_test.cc

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1248,6 +1248,23 @@ TEST_P(StreamTest, backfillGetsNoItems) {
12481248
}
12491249
}
12501250

1251+
TEST_P(StreamTest, bufferedMemoryBackfillPurgeGreaterThanStart) {
1252+
if (engine->getConfiguration().getBucketType() == "ephemeral") {
1253+
setup_dcp_stream(0, IncludeValue::No, IncludeXattrs::No);
1254+
auto evb = std::shared_ptr<EphemeralVBucket>(
1255+
std::dynamic_pointer_cast<EphemeralVBucket>(vb0));
1256+
1257+
// Force the purgeSeqno because it's easier than creating and
1258+
// deleting items
1259+
evb->setPurgeSeqno(3);
1260+
1261+
// Backfill with start != 1 and start != end and start < purge
1262+
DCPBackfillMemoryBuffered dcpbfm (evb, stream, 2, 4);
1263+
dcpbfm.run();
1264+
EXPECT_TRUE(stream->isDead());
1265+
}
1266+
}
1267+
12511268
/* Regression test for MB-17766 - ensure that when an ActiveStream is preparing
12521269
* queued items to be sent out via a DCP consumer, that nextCheckpointItem()
12531270
* doesn't incorrectly return false (meaning that there are no more checkpoint
@@ -3086,28 +3103,25 @@ TEST_P(ConnectionTest, test_mb24424_deleteResponse) {
30863103
ASSERT_TRUE(stream->isActive());
30873104

30883105
std::string key = "key";
3089-
std::string data = R"({"json":"yes"})";
30903106
const DocKey docKey{reinterpret_cast<const uint8_t*>(key.data()),
30913107
key.size(),
30923108
DocKeyEncodesCollectionId::No};
3093-
cb::const_byte_buffer value{reinterpret_cast<const uint8_t*>(data.data()),
3094-
data.size()};
30953109
uint8_t extMeta[1] = {uint8_t(PROTOCOL_BINARY_DATATYPE_JSON)};
30963110
cb::const_byte_buffer meta{extMeta, sizeof(uint8_t)};
30973111

3098-
consumer->deletion(/*opaque*/1,
3099-
/*key*/docKey,
3100-
/*values*/value,
3101-
/*priv_bytes*/0,
3102-
/*datatype*/PROTOCOL_BINARY_DATATYPE_JSON,
3103-
/*cas*/0,
3104-
/*vbucket*/vbid,
3105-
/*bySeqno*/1,
3106-
/*revSeqno*/0,
3107-
/*meta*/meta);
3108-
3109-
auto messageSize = MutationResponse::deletionBaseMsgBytes +
3110-
key.size() + data.size() + sizeof(extMeta);
3112+
consumer->deletion(/*opaque*/ 1,
3113+
/*key*/ docKey,
3114+
/*value*/ {},
3115+
/*priv_bytes*/ 0,
3116+
/*datatype*/ PROTOCOL_BINARY_RAW_BYTES,
3117+
/*cas*/ 0,
3118+
/*vbucket*/ vbid,
3119+
/*bySeqno*/ 1,
3120+
/*revSeqno*/ 0,
3121+
/*meta*/ meta);
3122+
3123+
auto messageSize = MutationResponse::deletionBaseMsgBytes + key.size() +
3124+
sizeof(extMeta);
31113125

31123126
EXPECT_EQ(messageSize, stream->responseMessageSize);
31133127

0 commit comments

Comments
 (0)