Skip to content

Commit 27780ed

Browse files
jimwwalkertrondn
authored andcommitted
MB-29040: [2/2] Sanitise delete with value via DcpConsumer
1) Allow a wider range of datatypes to be received by adjusting the mcbp validator. 2) Update the DcpConsumer so that delete with values are checked and sanitised, this means even a deleted marked as 'xattr' needs checking because the source may be sending an incorrect xattr with raw body + user xattrs which will need deleting. 3) Add a DcpConsumer test which checks we strip the faulty input. Change-Id: I219f21df9a63bc6b1c004fa382bd1f32f94a3e90 Reviewed-on: http://review.couchbase.org/93041 Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 0e543aa commit 27780ed

File tree

6 files changed

+159
-26
lines changed

6 files changed

+159
-26
lines changed

daemon/mcbp_validators.cc

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -383,24 +383,38 @@ static protocol_binary_response_status dcp_mutation_validator(const Cookie& cook
383383
return verify_common_dcp_restrictions(cookie);
384384
}
385385

386+
/// @return true if the datatype is valid for a deletion
387+
static bool valid_dcp_delete_datatype(protocol_binary_datatype_t datatype) {
388+
// MB-29040: Allowing xattr + JSON. A bug in the producer means
389+
// it may send XATTR|JSON (with snappy possible). These are now allowed
390+
// so rebalance won't be failed and the consumer will sanitise the faulty
391+
// documents.
392+
std::array<const protocol_binary_datatype_t, 5> valid = {
393+
{PROTOCOL_BINARY_RAW_BYTES,
394+
PROTOCOL_BINARY_DATATYPE_XATTR,
395+
PROTOCOL_BINARY_DATATYPE_XATTR | PROTOCOL_BINARY_DATATYPE_SNAPPY,
396+
PROTOCOL_BINARY_DATATYPE_XATTR | PROTOCOL_BINARY_DATATYPE_JSON,
397+
PROTOCOL_BINARY_DATATYPE_XATTR | PROTOCOL_BINARY_DATATYPE_SNAPPY |
398+
PROTOCOL_BINARY_DATATYPE_JSON}};
399+
for (auto d : valid) {
400+
if (datatype == d) {
401+
return true;
402+
}
403+
}
404+
return false;
405+
}
406+
386407
static protocol_binary_response_status dcp_deletion_validator(const Cookie& cookie)
387408
{
388409
auto req = static_cast<protocol_binary_request_dcp_deletion*>(
389410
cookie.getPacketAsVoidPtr());
390-
const auto datatype = req->message.header.request.datatype;
391411

392412
if (req->message.header.request.magic != PROTOCOL_BINARY_REQ ||
393413
req->message.header.request.keylen == 0) {
394414
return PROTOCOL_BINARY_RESPONSE_EINVAL;
395415
}
396416

397-
// Check datatype - only allow raw, or iff XATTRs are enabled XATTR (with or
398-
// without snappy)
399-
if (!(mcbp::datatype::is_raw(datatype) ||
400-
((datatype == PROTOCOL_BINARY_DATATYPE_XATTR ||
401-
datatype == (PROTOCOL_BINARY_DATATYPE_XATTR |
402-
PROTOCOL_BINARY_DATATYPE_SNAPPY)) &&
403-
may_accept_xattr(cookie)))) {
417+
if (!valid_dcp_delete_datatype(req->message.header.request.datatype)) {
404418
return PROTOCOL_BINARY_RESPONSE_EINVAL;
405419
}
406420

engines/ep/src/dcp/consumer.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,16 @@ ENGINE_ERROR_CODE DcpConsumer::deletion(uint32_t opaque,
543543
revSeqno));
544544
item->setDeleted();
545545

546+
// MB-29040: Producer may send deleted doc with value that still has
547+
// the user xattrs and the body. Fix up that mistake by running the
548+
// expiry hook which will correctly process the document
549+
if (mcbp::datatype::is_xattr(datatype) && value.size()) {
550+
auto vb = engine_.getVBucket(vbucket);
551+
if (vb) {
552+
engine_.getKVBucket()->runPreExpiryHook(*vb, *item);
553+
}
554+
}
555+
546556
std::unique_ptr<ExtendedMetaData> emd;
547557
if (meta.size() > 0) {
548558
emd = std::make_unique<ExtendedMetaData>(meta.data(), meta.size());

engines/ep/src/kv_bucket.cc

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,22 @@ void KVBucket::getValue(Item& it) {
531531
}
532532
}
533533

534+
void KVBucket::runPreExpiryHook(VBucket& vb, Item& it) {
535+
it.decompressValue(); // A no-op for already decompressed items
536+
auto info =
537+
it.toItemInfo(vb.failovers->getLatestUUID(), vb.getHLCEpochSeqno());
538+
if (engine.getServerApi()->document->pre_expiry(info)) {
539+
// The payload is modified and contains data we should use
540+
it.replaceValue(Blob::New(static_cast<char*>(info.value[0].iov_base),
541+
info.value[0].iov_len));
542+
it.setDataType(info.datatype);
543+
} else {
544+
// Make the document empty and raw
545+
it.replaceValue(Blob::New(0));
546+
it.setDataType(PROTOCOL_BINARY_RAW_BYTES);
547+
}
548+
}
549+
534550
void KVBucket::deleteExpiredItem(Item& it,
535551
time_t startTime,
536552
ExpireBy source) {
@@ -546,19 +562,7 @@ void KVBucket::deleteExpiredItem(Item& it,
546562
// Process positive seqnos (ignoring special *temp* items) and only
547563
// those items with a value
548564
if (it.getBySeqno() >= 0 && it.getNBytes()) {
549-
auto info = it.toItemInfo(vb->failovers->getLatestUUID(),
550-
vb->getHLCEpochSeqno());
551-
if (engine.getServerApi()->document->pre_expiry(info)) {
552-
// The payload is modified and contains data we should use
553-
it.replaceValue(
554-
Blob::New(static_cast<char*>(info.value[0].iov_base),
555-
info.value[0].iov_len));
556-
it.setDataType(info.datatype);
557-
} else {
558-
// Make the document empty and raw
559-
it.replaceValue(Blob::New(0));
560-
it.setDataType(PROTOCOL_BINARY_RAW_BYTES);
561-
}
565+
runPreExpiryHook(*vb, it);
562566
}
563567

564568
// Obtain reader access to the VB state change lock so that

engines/ep/src/kv_bucket.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,18 @@ class KVBucket : public KVBucketIface {
572572
VBucket::id_type vbucket,
573573
const char** msg);
574574

575+
/**
576+
* Run the server-api pre-expiry hook against the Item - the function
577+
* may (if the pre-expiry hook dictates) mutate the Item value so that
578+
* xattrs and the value are removed. The method doesn't care for the Item
579+
* state (i.e. isDeleted) and the callers should be passing expired/deleted
580+
* items only.
581+
*
582+
* @param vb The vbucket it belongs to
583+
* @param it A reference to the Item to run the hook against and possibly
584+
* mutate.
585+
*/
586+
void runPreExpiryHook(VBucket& vb, Item& it);
575587
void deleteExpiredItem(Item& it, time_t startTime, ExpireBy source);
576588
void deleteExpiredItems(std::list<Item>&, ExpireBy);
577589

engines/ep/tests/module_tests/evp_store_single_threaded_test.cc

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2521,6 +2521,102 @@ TEST_F(MB_29287, dataloss_hole) {
25212521
EXPECT_EQ(1, vb->checkpointManager->getNumOfCursors());
25222522
}
25232523

2524+
class XattrCompressedTest
2525+
: public SingleThreadedEPBucketTest,
2526+
public ::testing::WithParamInterface<::testing::tuple<bool, bool>> {
2527+
public:
2528+
bool isXattrSystem() const {
2529+
return ::testing::get<0>(GetParam());
2530+
}
2531+
bool isSnappy() const {
2532+
return ::testing::get<1>(GetParam());
2533+
}
2534+
};
2535+
2536+
// Create a replica VB and consumer, then send it an xattr value which should
2537+
// of been stripped at the source, but wasn't because of MB29040. Then check
2538+
// the consumer sanitises the document. Run the test with user/system xattrs
2539+
// and snappy on/off
2540+
TEST_P(XattrCompressedTest, MB_29040_sanitise_input) {
2541+
setVBucketStateAndRunPersistTask(vbid, vbucket_state_replica);
2542+
2543+
auto consumer = std::make_shared<MockDcpConsumer>(
2544+
*engine, cookie, "MB_29040_sanitise_input");
2545+
int opaque = 1;
2546+
ASSERT_EQ(ENGINE_SUCCESS, consumer->addStream(opaque, vbid, /*flags*/ 0));
2547+
2548+
std::string body;
2549+
if (!isXattrSystem()) {
2550+
body.assign("value");
2551+
}
2552+
auto value = createXattrValue(body, isXattrSystem(), isSnappy());
2553+
2554+
// Send deletion in a single seqno snapshot
2555+
int64_t bySeqno = 1;
2556+
EXPECT_EQ(ENGINE_SUCCESS,
2557+
consumer->snapshotMarker(
2558+
opaque, vbid, bySeqno, bySeqno, MARKER_FLAG_CHK));
2559+
2560+
cb::const_byte_buffer valueBuf{
2561+
reinterpret_cast<const uint8_t*>(value.data()), value.size()};
2562+
EXPECT_EQ(
2563+
ENGINE_SUCCESS,
2564+
consumer->deletion(
2565+
opaque,
2566+
{"key", DocNamespace::DefaultCollection},
2567+
valueBuf,
2568+
/*priv_bytes*/ 0,
2569+
PROTOCOL_BINARY_DATATYPE_XATTR |
2570+
(isSnappy() ? PROTOCOL_BINARY_DATATYPE_SNAPPY : 0),
2571+
/*cas*/ 3,
2572+
vbid,
2573+
bySeqno,
2574+
/*revSeqno*/ 0,
2575+
/*meta*/ {}));
2576+
2577+
EXPECT_EQ(std::make_pair(false, size_t(1)),
2578+
getEPBucket().flushVBucket(vbid));
2579+
2580+
ASSERT_EQ(ENGINE_SUCCESS, consumer->closeStream(opaque, vbid));
2581+
2582+
// Switch to active
2583+
setVBucketStateAndRunPersistTask(vbid, vbucket_state_active);
2584+
2585+
get_options_t options = static_cast<get_options_t>(
2586+
QUEUE_BG_FETCH | HONOR_STATES | TRACK_REFERENCE | DELETE_TEMP |
2587+
HIDE_LOCKED_CAS | TRACK_STATISTICS | GET_DELETED_VALUE);
2588+
auto gv = store->get(
2589+
{"key", DocNamespace::DefaultCollection}, vbid, cookie, options);
2590+
EXPECT_EQ(ENGINE_EWOULDBLOCK, gv.getStatus());
2591+
2592+
// Manually run the bgfetch task.
2593+
MockGlobalTask mockTask(engine->getTaskable(), TaskId::MultiBGFetcherTask);
2594+
store->getVBucket(vbid)->getShard()->getBgFetcher()->run(&mockTask);
2595+
gv = store->get({"key", DocNamespace::DefaultCollection},
2596+
vbid,
2597+
cookie,
2598+
GET_DELETED_VALUE);
2599+
ASSERT_EQ(ENGINE_SUCCESS, gv.getStatus());
2600+
2601+
// This is the only system key test_helpers::createXattrValue gives us
2602+
cb::xattr::Blob blob;
2603+
blob.set("_sync", "{\"cas\":\"0xdeadbeefcafefeed\"}");
2604+
2605+
EXPECT_TRUE(gv.item->isDeleted());
2606+
EXPECT_EQ(0, gv.item->getFlags());
2607+
EXPECT_EQ(3, gv.item->getCas());
2608+
EXPECT_EQ(isXattrSystem() ? blob.size() : 0,
2609+
gv.item->getValue()->valueSize());
2610+
EXPECT_EQ(isXattrSystem() ? PROTOCOL_BINARY_DATATYPE_XATTR
2611+
: PROTOCOL_BINARY_RAW_BYTES,
2612+
gv.item->getDataType());
2613+
}
2614+
25242615
INSTANTIATE_TEST_CASE_P(XattrSystemUserTest,
25252616
XattrSystemUserTest,
25262617
::testing::Bool(), );
2618+
2619+
INSTANTIATE_TEST_CASE_P(XattrCompressedTest,
2620+
XattrCompressedTest,
2621+
::testing::Combine(::testing::Bool(),
2622+
::testing::Bool()), );

tests/mcbp/mcbp_test.cc

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1956,13 +1956,10 @@ TEST_P(DcpDeletionValidatorTest, ValidDatatype) {
19561956

19571957
TEST_P(DcpDeletionValidatorTest, InvalidDatatype) {
19581958
using cb::mcbp::Datatype;
1959-
const std::array<uint8_t, 5> datatypes = {
1959+
const std::array<uint8_t, 3> datatypes = {
19601960
{uint8_t(Datatype::JSON),
19611961
uint8_t(Datatype::Snappy),
1962-
uint8_t(Datatype::Snappy) | uint8_t(Datatype::JSON),
1963-
uint8_t(Datatype::Xattr) | uint8_t(Datatype::JSON),
1964-
(uint8_t(Datatype::Xattr) | uint8_t(Datatype::Snappy) |
1965-
uint8_t(Datatype::JSON))}};
1962+
uint8_t(Datatype::Snappy) | uint8_t(Datatype::JSON)}};
19661963

19671964
for (auto invalid : datatypes) {
19681965
header.request.datatype = invalid;

0 commit comments

Comments
 (0)