Skip to content

Commit 98e8d9d

Browse files
committed
MB-43205: Consumer enforces allowSanitizeValueInDeletion at DCP_DELETE
At DCP_DELETE, if the sanitizer is enabled then the consumer removes any invalid body in the payload. ENGINE_EINVAL is returned otherwise. Change-Id: I6ac22952221b130d6b14a0acb072dc293f27d0db Reviewed-on: http://review.couchbase.org/c/kv_engine/+/143712 Well-Formed: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent ed9bab4 commit 98e8d9d

File tree

4 files changed

+139
-20
lines changed

4 files changed

+139
-20
lines changed

engines/ep/src/dcp/consumer.cc

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -590,9 +590,9 @@ ENGINE_ERROR_CODE DcpConsumer::deletion(uint32_t opaque,
590590
vbucket,
591591
revSeqno));
592592

593-
// MB-37374: 6.6 Producers may legally send user-xattrs in deletion.
594-
// The existing validation for pre-6.6 Producers is still necessary.
595593
if (includeDeletedUserXattrs == IncludeDeletedUserXattrs::No) {
594+
// Case pre-6.6 connection: Body and UserXattrs are invalid in deletion
595+
596596
// MB-29040: Producer may send deleted doc with value that still has
597597
// the user xattrs and the body. Fix up that mistake by running the
598598
// expiry hook which will correctly process the document
@@ -609,16 +609,28 @@ ENGINE_ERROR_CODE DcpConsumer::deletion(uint32_t opaque,
609609
}
610610
}
611611
} else {
612-
// Some validation seems reasonable here, given that in the past we
613-
// already had issues in this area (see if-block above).
612+
// Case 6.6 connection: UserXattrs in deletion is legal since MB-37374,
613+
// Body still invalid.
614+
614615
if (cb::xattr::get_body_size(
615616
datatype,
616617
{reinterpret_cast<const char*>(value.data()),
617618
value.size()}) > 0) {
618-
logger->warn(
619-
"DcpConsumer::deletion: ({}) Value cannot contain a body",
620-
vbucket);
621-
return ENGINE_EINVAL;
619+
// MB-43205 shows that we cannot unconditionally fail here as 6.6
620+
// connections may still see Body in deletions (generated by pre-6.6
621+
// nodes) after an offline upgrade.
622+
// Note: (allowSanitizeValueInDeletion = true) by default, disabling
623+
// the sanitizer is left only for test purpose.
624+
625+
if (!allowSanitizeValueInDeletion) {
626+
logger->error(
627+
"DcpConsumer::deletion: ({}) Value cannot contain a "
628+
"body",
629+
vbucket);
630+
return ENGINE_EINVAL;
631+
}
632+
633+
item->removeBody();
622634
}
623635
}
624636

engines/ep/src/dcp/consumer.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,10 @@ class DcpConsumer : public ConnHandler,
332332
allowSanitizeValueInDeletion.store(value);
333333
}
334334

335+
bool isAllowSanitizeValueInDeletion() {
336+
return allowSanitizeValueInDeletion.load();
337+
}
338+
335339
protected:
336340
/**
337341
* Records when the consumer last received a message from producer.

engines/ep/tests/module_tests/dcp_stream_test.cc

Lines changed: 107 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3447,8 +3447,14 @@ TEST_P(SingleThreadedActiveStreamTest, testExpirationRemovesBody_UserXa_SysXa) {
34473447
Xattrs::UserAndSys);
34483448
}
34493449

3450-
void SingleThreadedPassiveStreamTest::testConsumerRejectsBodyInDelete(
3450+
void SingleThreadedPassiveStreamTest::testConsumerRejectsBodyInDeletion(
34513451
const boost::optional<cb::durability::Requirements>& durReqs) {
3452+
auto& connMap = static_cast<MockDcpConnMap&>(engine->getDcpConnMap());
3453+
connMap.addConn(cookie, consumer);
3454+
ASSERT_TRUE(consumer->isAllowSanitizeValueInDeletion());
3455+
engine->getConfiguration().setAllowSanitizeValueInDeletion(false);
3456+
ASSERT_FALSE(consumer->isAllowSanitizeValueInDeletion());
3457+
34523458
consumer->public_setIncludeDeletedUserXattrs(IncludeDeletedUserXattrs::Yes);
34533459

34543460
// Send deletion in a single seqno snapshot
@@ -3462,7 +3468,8 @@ void SingleThreadedPassiveStreamTest::testConsumerRejectsBodyInDelete(
34623468
{} /*maxVisibleSeqno*/));
34633469

34643470
const auto verifyDCPFailure =
3465-
[this, &durReqs](const cb::const_byte_buffer& value) -> void {
3471+
[this, &durReqs](const cb::const_byte_buffer& value,
3472+
protocol_binary_datatype_t datatype) -> void {
34663473
const uint32_t opaque = 1;
34673474
int64_t bySeqno = 1;
34683475
if (durReqs) {
@@ -3471,7 +3478,7 @@ void SingleThreadedPassiveStreamTest::testConsumerRejectsBodyInDelete(
34713478
{"key", DocKeyEncodesCollectionId::No},
34723479
value,
34733480
0 /*priv_bytes*/,
3474-
PROTOCOL_BINARY_RAW_BYTES,
3481+
datatype,
34753482
0 /*cas*/,
34763483
vbid,
34773484
0 /*flags*/,
@@ -3488,7 +3495,7 @@ void SingleThreadedPassiveStreamTest::testConsumerRejectsBodyInDelete(
34883495
{"key", DocKeyEncodesCollectionId::No},
34893496
value,
34903497
0 /*priv_bytes*/,
3491-
PROTOCOL_BINARY_RAW_BYTES,
3498+
datatype,
34923499
0 /*cas*/,
34933500
vbid,
34943501
bySeqno,
@@ -3503,7 +3510,7 @@ void SingleThreadedPassiveStreamTest::testConsumerRejectsBodyInDelete(
35033510
body.size()};
35043511
{
35053512
SCOPED_TRACE("");
3506-
verifyDCPFailure(value);
3513+
verifyDCPFailure(value, PROTOCOL_BINARY_RAW_BYTES);
35073514
}
35083515

35093516
// Verify the same for body + xattrs
@@ -3512,16 +3519,106 @@ void SingleThreadedPassiveStreamTest::testConsumerRejectsBodyInDelete(
35123519
xattrValue.size()};
35133520
{
35143521
SCOPED_TRACE("");
3515-
verifyDCPFailure(value);
3522+
verifyDCPFailure(
3523+
value,
3524+
PROTOCOL_BINARY_RAW_BYTES | PROTOCOL_BINARY_DATATYPE_XATTR);
35163525
}
3526+
3527+
connMap.removeConn(cookie);
3528+
}
3529+
3530+
TEST_P(SingleThreadedPassiveStreamTest, ConsumerRejectsBodyInDeletion) {
3531+
testConsumerRejectsBodyInDeletion({});
35173532
}
35183533

3519-
TEST_P(SingleThreadedPassiveStreamTest, ConsumerRejectsBodyInDelete) {
3520-
testConsumerRejectsBodyInDelete({});
3534+
TEST_P(SingleThreadedPassiveStreamTest, ConsumerRejectsBodyInSyncDeletion) {
3535+
testConsumerRejectsBodyInDeletion(cb::durability::Requirements());
3536+
}
3537+
3538+
void SingleThreadedPassiveStreamTest::testConsumerSanitizesBodyInDeletion() {
3539+
ASSERT_TRUE(consumer->isAllowSanitizeValueInDeletion());
3540+
consumer->public_setIncludeDeletedUserXattrs(IncludeDeletedUserXattrs::Yes);
3541+
3542+
EXPECT_EQ(ENGINE_SUCCESS,
3543+
consumer->snapshotMarker(1 /*opaque*/,
3544+
vbid,
3545+
1 /*startSeqno*/,
3546+
10 /*endSeqno*/,
3547+
MARKER_FLAG_CHK,
3548+
{} /*HCS*/,
3549+
{} /*maxVisibleSeqno*/));
3550+
3551+
// Build up a value with just raw body and verify that DCP deletion succeeds
3552+
// and the Body has been removed from the payload.
3553+
const std::string body = "body";
3554+
cb::const_byte_buffer value{reinterpret_cast<const uint8_t*>(body.data()),
3555+
body.size()};
3556+
const uint32_t opaque = 1;
3557+
const auto key = makeStoredDocKey("key");
3558+
EXPECT_EQ(ENGINE_SUCCESS,
3559+
consumer->deletion(opaque,
3560+
key,
3561+
value,
3562+
0 /*priv_bytes*/,
3563+
PROTOCOL_BINARY_RAW_BYTES,
3564+
0 /*cas*/,
3565+
vbid,
3566+
1 /*bySeqno*/,
3567+
0 /*revSeqno*/,
3568+
{} /*meta*/));
3569+
// Verify that the body has been removed
3570+
auto& vb = *store->getVBucket(vbid);
3571+
auto& ht = vb.ht;
3572+
{
3573+
const auto& res = ht.findForUpdate(key);
3574+
const auto* sv = res.committed;
3575+
EXPECT_TRUE(sv);
3576+
EXPECT_EQ(1, sv->getBySeqno());
3577+
EXPECT_FALSE(sv->getValue().get());
3578+
}
3579+
3580+
// Verify the same for body + user-xattrs + sys-xattrs
3581+
const auto xattrValue = createXattrValue(body);
3582+
value = {reinterpret_cast<const uint8_t*>(xattrValue.data()),
3583+
xattrValue.size()};
3584+
EXPECT_EQ(ENGINE_SUCCESS,
3585+
consumer->deletion(opaque,
3586+
key,
3587+
value,
3588+
0 /*priv_bytes*/,
3589+
PROTOCOL_BINARY_RAW_BYTES |
3590+
PROTOCOL_BINARY_DATATYPE_XATTR,
3591+
0 /*cas*/,
3592+
vbid,
3593+
2 /*bySeqno*/,
3594+
0 /*revSeqno*/,
3595+
{} /*meta*/));
3596+
3597+
{
3598+
const auto& res = ht.findForUpdate(key);
3599+
const auto* sv = res.committed;
3600+
EXPECT_TRUE(sv);
3601+
EXPECT_EQ(2, sv->getBySeqno());
3602+
EXPECT_TRUE(sv->getValue().get());
3603+
EXPECT_LT(sv->getValue()->valueSize(), xattrValue.size());
3604+
3605+
const auto finalValue =
3606+
cb::char_buffer(const_cast<char*>(sv->getValue()->getData()),
3607+
sv->getValue()->valueSize());
3608+
// No body
3609+
EXPECT_EQ(0, cb::xattr::get_body_size(sv->getDatatype(), finalValue));
3610+
// Must have user/sys xattrs (created at createXattrValue())
3611+
cb::xattr::Blob blob(finalValue, false /*compressed*/);
3612+
for (uint8_t i = 1; i <= 6; ++i) {
3613+
EXPECT_FALSE(blob.get("ABCuser" + std::to_string(i)).empty());
3614+
}
3615+
EXPECT_FALSE(blob.get("meta").empty());
3616+
EXPECT_FALSE(blob.get("_sync").empty());
3617+
}
35213618
}
35223619

3523-
TEST_P(SingleThreadedPassiveStreamTest, ConsumerRejectsBodyInSyncDelete) {
3524-
testConsumerRejectsBodyInDelete(cb::durability::Requirements());
3620+
TEST_P(SingleThreadedPassiveStreamTest, ConsumerSanitizesBodyInDeletion) {
3621+
testConsumerSanitizesBodyInDeletion();
35253622
}
35263623

35273624
void SingleThreadedPassiveStreamTest::testConsumerReceivesUserXattrsInDelete(

engines/ep/tests/module_tests/dcp_stream_test.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,13 +151,19 @@ class SingleThreadedPassiveStreamTest
151151
vbucket_state_t initialState);
152152

153153
/**
154-
* Test that the Consumer rejects body in deletion's value
154+
* Test that the Consumer rejects body in deletion's value abd fails.
155+
* That is the behaviour when (allowSanitizeValueInDeletion = false).
155156
*
156157
* @param durReqs (optional) The Dur Reqs, if we are testing SyncDelete
157158
*/
158-
void testConsumerRejectsBodyInDelete(
159+
void testConsumerRejectsBodyInDeletion(
159160
const boost::optional<cb::durability::Requirements>& durReqs);
160161

162+
/**
163+
* Test that the Consumer removes the body (if any) in deletion's value.
164+
*/
165+
void testConsumerSanitizesBodyInDeletion();
166+
161167
/**
162168
* Test that the Consumer accepts user-xattrs in deletion for enabled
163169
* connections.

0 commit comments

Comments
 (0)