Skip to content

Commit 8c3189f

Browse files
committed
MB-43205: Consumer enforces allowSanitizeValueInDeletion at DCP_PREPARE
At DCP_PREPARE, if the sanitizer is enabled then the consumer removes any invalid body in the payload for SyncDelete. ENGINE_EINVAL is returned otherwise. Change-Id: I33970e329517101ad6e7f93c90fca5e509cd2176 Reviewed-on: http://review.couchbase.org/c/kv_engine/+/143799 Well-Formed: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]> Tested-by: Build Bot <[email protected]>
1 parent 98e8d9d commit 8c3189f

File tree

3 files changed

+108
-43
lines changed

3 files changed

+108
-43
lines changed

engines/ep/src/dcp/consumer.cc

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1757,11 +1757,16 @@ ENGINE_ERROR_CODE DcpConsumer::prepare(uint32_t opaque,
17571757
datatype,
17581758
{reinterpret_cast<const char*>(value.data()),
17591759
value.size()}) > 0) {
1760-
logger->warn(
1761-
"DcpConsumer::prepare: ({}) Value cannot contain a body "
1762-
"for SyncDelete",
1763-
vbucket);
1764-
return ENGINE_EINVAL;
1760+
if (!allowSanitizeValueInDeletion) {
1761+
logger->error(
1762+
"DcpConsumer::prepare: ({}) Value cannot contain a "
1763+
"body "
1764+
"for SyncDelete",
1765+
vbucket);
1766+
return ENGINE_EINVAL;
1767+
}
1768+
1769+
item->removeBody();
17651770
}
17661771
}
17671772

engines/ep/tests/module_tests/dcp_stream_test.cc

Lines changed: 93 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3535,70 +3535,122 @@ TEST_P(SingleThreadedPassiveStreamTest, ConsumerRejectsBodyInSyncDeletion) {
35353535
testConsumerRejectsBodyInDeletion(cb::durability::Requirements());
35363536
}
35373537

3538-
void SingleThreadedPassiveStreamTest::testConsumerSanitizesBodyInDeletion() {
3538+
void SingleThreadedPassiveStreamTest::testConsumerSanitizesBodyInDeletion(
3539+
const boost::optional<cb::durability::Requirements>& durReqs) {
35393540
ASSERT_TRUE(consumer->isAllowSanitizeValueInDeletion());
35403541
consumer->public_setIncludeDeletedUserXattrs(IncludeDeletedUserXattrs::Yes);
35413542

3543+
auto& vb = *store->getVBucket(vbid);
3544+
ASSERT_EQ(0, vb.getHighSeqno());
3545+
3546+
const uint64_t initialEndSeqno = 10;
35423547
EXPECT_EQ(ENGINE_SUCCESS,
35433548
consumer->snapshotMarker(1 /*opaque*/,
35443549
vbid,
35453550
1 /*startSeqno*/,
3546-
10 /*endSeqno*/,
3551+
initialEndSeqno,
35473552
MARKER_FLAG_CHK,
35483553
{} /*HCS*/,
35493554
{} /*maxVisibleSeqno*/));
35503555

3556+
const auto key = makeStoredDocKey("key");
3557+
const auto verifyDCPSuccess = [this, &key, &durReqs](
3558+
const cb::const_byte_buffer& value,
3559+
protocol_binary_datatype_t datatype,
3560+
int64_t bySeqno) -> void {
3561+
const uint32_t opaque = 1;
3562+
if (durReqs) {
3563+
EXPECT_EQ(ENGINE_SUCCESS,
3564+
consumer->prepare(opaque,
3565+
key,
3566+
value,
3567+
0 /*priv_bytes*/,
3568+
datatype,
3569+
0 /*cas*/,
3570+
vbid,
3571+
0 /*flags*/,
3572+
bySeqno,
3573+
0 /*revSeqno*/,
3574+
0 /*exp*/,
3575+
0 /*lockTime*/,
3576+
0 /*nru*/,
3577+
DocumentState::Deleted,
3578+
durReqs->getLevel()));
3579+
} else {
3580+
EXPECT_EQ(ENGINE_SUCCESS,
3581+
consumer->deletion(opaque,
3582+
key,
3583+
value,
3584+
0 /*priv_bytes*/,
3585+
datatype,
3586+
0 /*cas*/,
3587+
vbid,
3588+
bySeqno,
3589+
0 /*revSeqno*/,
3590+
{} /*meta*/));
3591+
}
3592+
};
3593+
35513594
// Build up a value with just raw body and verify that DCP deletion succeeds
35523595
// and the Body has been removed from the payload.
35533596
const std::string body = "body";
35543597
cb::const_byte_buffer value{reinterpret_cast<const uint8_t*>(body.data()),
35553598
body.size()};
3556-
const uint32_t opaque = 1;
3557-
const auto key = makeStoredDocKey("key");
3558-
EXPECT_EQ(ENGINE_SUCCESS,
3559-
consumer->deletion(opaque,
3560-
key,
3561-
value,
3562-
0 /*priv_bytes*/,
3563-
PROTOCOL_BINARY_RAW_BYTES,
3564-
0 /*cas*/,
3565-
vbid,
3566-
1 /*bySeqno*/,
3567-
0 /*revSeqno*/,
3568-
{} /*meta*/));
3569-
// Verify that the body has been removed
3570-
auto& vb = *store->getVBucket(vbid);
3599+
{
3600+
SCOPED_TRACE("");
3601+
verifyDCPSuccess(value, PROTOCOL_BINARY_RAW_BYTES, 1 /*bySeqno*/);
3602+
}
35713603
auto& ht = vb.ht;
35723604
{
3573-
const auto& res = ht.findForUpdate(key);
3574-
const auto* sv = res.committed;
3605+
auto res = ht.findForUpdate(key);
3606+
const auto* sv = durReqs ? res.pending.getSV() : res.committed;
35753607
EXPECT_TRUE(sv);
35763608
EXPECT_EQ(1, sv->getBySeqno());
3577-
EXPECT_FALSE(sv->getValue().get());
3609+
if (durReqs) {
3610+
EXPECT_EQ(0, sv->getValue()->valueSize());
3611+
} else {
3612+
// Note: Normal deletion with 0-size value goes through DelWithMeta
3613+
// that sets the value to nullptr.
3614+
EXPECT_FALSE(sv->getValue().get());
3615+
}
3616+
}
3617+
3618+
int64_t nextSeqno = 2;
3619+
if (durReqs) {
3620+
// Need to commit the first prepare for queuing a new one in the next
3621+
// steps.
3622+
EXPECT_EQ(ENGINE_SUCCESS,
3623+
vb.commit(key, 1, {}, vb.lockCollections(key)));
3624+
// Replica doesn't like 2 prepares for the same key into the same
3625+
// checkpoint.
3626+
const int64_t newStartSeqno = initialEndSeqno + 1;
3627+
EXPECT_EQ(ENGINE_SUCCESS,
3628+
consumer->snapshotMarker(1 /*opaque*/,
3629+
vbid,
3630+
newStartSeqno,
3631+
newStartSeqno + 10 /*endSeqno*/,
3632+
MARKER_FLAG_CHK,
3633+
{} /*HCS*/,
3634+
{} /*maxVisibleSeqno*/));
3635+
nextSeqno = newStartSeqno;
35783636
}
35793637

35803638
// Verify the same for body + user-xattrs + sys-xattrs
35813639
const auto xattrValue = createXattrValue(body);
35823640
value = {reinterpret_cast<const uint8_t*>(xattrValue.data()),
35833641
xattrValue.size()};
3584-
EXPECT_EQ(ENGINE_SUCCESS,
3585-
consumer->deletion(opaque,
3586-
key,
3587-
value,
3588-
0 /*priv_bytes*/,
3589-
PROTOCOL_BINARY_RAW_BYTES |
3590-
PROTOCOL_BINARY_DATATYPE_XATTR,
3591-
0 /*cas*/,
3592-
vbid,
3593-
2 /*bySeqno*/,
3594-
0 /*revSeqno*/,
3595-
{} /*meta*/));
3596-
35973642
{
3598-
const auto& res = ht.findForUpdate(key);
3599-
const auto* sv = res.committed;
3643+
SCOPED_TRACE("");
3644+
verifyDCPSuccess(
3645+
value,
3646+
PROTOCOL_BINARY_RAW_BYTES | PROTOCOL_BINARY_DATATYPE_XATTR,
3647+
nextSeqno);
3648+
}
3649+
{
3650+
auto res = ht.findForUpdate(key);
3651+
const auto* sv = durReqs ? res.pending.getSV() : res.committed;
36003652
EXPECT_TRUE(sv);
3601-
EXPECT_EQ(2, sv->getBySeqno());
3653+
EXPECT_EQ(nextSeqno, sv->getBySeqno());
36023654
EXPECT_TRUE(sv->getValue().get());
36033655
EXPECT_LT(sv->getValue()->valueSize(), xattrValue.size());
36043656

@@ -3618,7 +3670,11 @@ void SingleThreadedPassiveStreamTest::testConsumerSanitizesBodyInDeletion() {
36183670
}
36193671

36203672
TEST_P(SingleThreadedPassiveStreamTest, ConsumerSanitizesBodyInDeletion) {
3621-
testConsumerSanitizesBodyInDeletion();
3673+
testConsumerSanitizesBodyInDeletion({});
3674+
}
3675+
3676+
TEST_P(SingleThreadedPassiveStreamTest, ConsumerSanitizesBodyInSyncDeletion) {
3677+
testConsumerSanitizesBodyInDeletion(cb::durability::Requirements());
36223678
}
36233679

36243680
void SingleThreadedPassiveStreamTest::testConsumerReceivesUserXattrsInDelete(

engines/ep/tests/module_tests/dcp_stream_test.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,8 +161,12 @@ class SingleThreadedPassiveStreamTest
161161

162162
/**
163163
* Test that the Consumer removes the body (if any) in deletion's value.
164+
* That is the behaviour when (allowSanitizeValueInDeletion = true).
165+
*
166+
* @param durReqs (optional) The Dur Reqs, if we are testing SyncDelete
164167
*/
165-
void testConsumerSanitizesBodyInDeletion();
168+
void testConsumerSanitizesBodyInDeletion(
169+
const boost::optional<cb::durability::Requirements>& durReqs);
166170

167171
/**
168172
* Test that the Consumer accepts user-xattrs in deletion for enabled

0 commit comments

Comments
 (0)