Skip to content

Commit 23408eb

Browse files
committed
Merge branch 'mad-hatter'
* commit '05db33e71': MB-40493: ActiveStream doesn't try to inflate an inflated/empty value Change-Id: Ie2ea97f951ba452317ff8065aad9f078be147489
2 parents e70d261 + 05db33e commit 23408eb

File tree

8 files changed

+148
-36
lines changed

8 files changed

+148
-36
lines changed

engines/ep/src/dcp/active_stream.cc

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1165,7 +1165,7 @@ std::unique_ptr<DcpResponse> ActiveStream::makeResponseFromItem(
11651165
isForceValueCompressionEnabled(),
11661166
isSnappyEnabled())) {
11671167
auto finalItem = make_STRCPtr<Item>(*item);
1168-
finalItem->removeBodyAndOrXattrs(
1168+
const auto wasInflated = finalItem->removeBodyAndOrXattrs(
11691169
includeValue, includeXattributes, includeDeletedUserXattrs);
11701170

11711171
if (isSnappyEnabled()) {
@@ -1180,12 +1180,23 @@ std::unique_ptr<DcpResponse> ActiveStream::makeResponseFromItem(
11801180
}
11811181
}
11821182
} else {
1183-
if (mcbp::datatype::is_snappy(finalItem->getDataType())) {
1183+
// The purpose of this block is to uncompress compressed items
1184+
// as they are being streamed over a connection that doesn't
1185+
// support compression.
1186+
//
1187+
// MB-40493: IncludeValue::NoWithUnderlyingDatatype may reset
1188+
// datatype to SNAPPY, even if the value has been already
1189+
// decompressed (eg, the original value contained Body+Xattr
1190+
// and Body have been removed) or if there is no value at all
1191+
// (eg, the original value contained only a Body, now removed).
1192+
// We need to avoid the call to Item::decompress in both cases,
1193+
// we log an unnecessary warning otherwise.
1194+
if (mcbp::datatype::is_snappy(finalItem->getDataType()) &&
1195+
(wasInflated == Item::WasValueInflated::No) &&
1196+
(finalItem->getNBytes() > 0)) {
11841197
if (!finalItem->decompressValue()) {
11851198
log(spdlog::level::level_enum::warn,
1186-
1187-
"{} Failed to snappy uncompress a compressed "
1188-
"value",
1199+
"{} Failed to snappy uncompress a compressed value",
11891200
logPrefix);
11901201
}
11911202
}

engines/ep/src/item.cc

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -416,42 +416,44 @@ item_info Item::toItemInfo(uint64_t vb_uuid, int64_t hlcEpoch) const {
416416
return info;
417417
}
418418

419-
void Item::removeBody() {
419+
Item::WasValueInflated Item::removeBody() {
420420
if (!value) {
421421
// No value, nothing to do
422-
return;
422+
return WasValueInflated::No;
423423
}
424424

425425
if (!mcbp::datatype::is_xattr(getDataType())) {
426426
// We don't want the body and there are no xattrs, just set empty value
427427
setData(nullptr, 0);
428428
setDataType(PROTOCOL_BINARY_RAW_BYTES);
429-
return;
429+
return WasValueInflated::No;
430430
}
431431

432432
// No-op if already uncompressed
433-
decompressValue();
433+
const auto wasInflated = decompressValue();
434434

435435
// We want only xattrs.
436436
// Note: The following is no-op if no Body present.
437437
std::string_view valBuffer{value->getData(), value->valueSize()};
438438
setData(valBuffer.data(), cb::xattr::get_body_offset(valBuffer));
439439
setDataType(PROTOCOL_BINARY_DATATYPE_XATTR);
440+
441+
return wasInflated ? WasValueInflated::Yes : WasValueInflated::No;
440442
}
441443

442-
void Item::removeXattrs() {
444+
Item::WasValueInflated Item::removeXattrs() {
443445
if (!value) {
444446
// No value, nothing to do
445-
return;
447+
return WasValueInflated::No;
446448
}
447449

448450
if (!mcbp::datatype::is_xattr(getDataType())) {
449451
// No Xattrs, nothing to do
450-
return;
452+
return WasValueInflated::No;
451453
}
452454

453455
// No-op if already uncompressed
454-
decompressValue();
456+
const auto wasInflated = decompressValue();
455457

456458
// We want only the body
457459
std::string_view valBuffer{value->getData(), value->valueSize()};
@@ -466,21 +468,23 @@ void Item::removeXattrs() {
466468
// Subdoc logic for details. Here we have to rectify.
467469
setDataType(getDataType() & ~PROTOCOL_BINARY_DATATYPE_JSON);
468470
}
471+
472+
return wasInflated ? WasValueInflated::Yes : WasValueInflated::No;
469473
}
470474

471-
void Item::removeUserXattrs() {
475+
Item::WasValueInflated Item::removeUserXattrs() {
472476
if (!value) {
473477
// No value, nothing to do
474-
return;
478+
return WasValueInflated::No;
475479
}
476480

477481
if (!mcbp::datatype::is_xattr(getDataType())) {
478482
// No Xattrs, nothing to do
479-
return;
483+
return WasValueInflated::No;
480484
}
481485

482486
// No-op if already uncompressed
483-
decompressValue();
487+
const auto wasInflated = decompressValue();
484488

485489
// The function currently does not support value with body.
486490
// That is fine for now as this is introduced for MB-37374, thus is supposed
@@ -507,33 +511,37 @@ void Item::removeUserXattrs() {
507511
// Note: Doing this unconditionally as we reach this line iff there is no
508512
// body. We would need to do this conditionally otherwise.
509513
setDataType(getDataType() & ~PROTOCOL_BINARY_DATATYPE_JSON);
514+
515+
return wasInflated ? WasValueInflated::Yes : WasValueInflated::No;
510516
}
511517

512-
void Item::removeBodyAndOrXattrs(
518+
Item::WasValueInflated Item::removeBodyAndOrXattrs(
513519
IncludeValue includeVal,
514520
IncludeXattrs includeXattrs,
515521
IncludeDeletedUserXattrs includeDeletedUserXattrs) {
516522
if (!value) {
517523
// If no value (ie, no body and/or xattrs) then nothing to do
518-
return;
524+
return WasValueInflated::No;
519525
}
520526

521527
// Take a copy of the original datatype before proceeding, any modification
522528
// to the value may change the datatype.
523529
const auto originalDatatype = getDataType();
524530

531+
auto wasInflated = WasValueInflated::No;
532+
525533
// Note: IncludeValue acts like "include body"
526534
if (includeVal != IncludeValue::Yes) {
527-
removeBody();
535+
wasInflated = removeBody();
528536
}
529537

530538
if (includeXattrs == IncludeXattrs::No) {
531-
removeXattrs();
539+
wasInflated = removeXattrs();
532540
}
533541

534542
if (isDeleted() &&
535543
includeDeletedUserXattrs == IncludeDeletedUserXattrs::No) {
536-
removeUserXattrs();
544+
wasInflated = removeUserXattrs();
537545
}
538546

539547
// Datatype for no-value must be RAW
@@ -545,6 +553,8 @@ void Item::removeBodyAndOrXattrs(
545553
if (includeVal == IncludeValue::NoWithUnderlyingDatatype) {
546554
setDataType(originalDatatype);
547555
}
556+
557+
return wasInflated;
548558
}
549559

550560
item_info to_item_info(const ItemMetaData& itemMeta,

engines/ep/src/item.h

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -508,26 +508,34 @@ class Item : public RCValue {
508508
*/
509509
item_info toItemInfo(uint64_t vb_uuid, int64_t hlcEpoch) const;
510510

511+
enum class WasValueInflated : uint8_t { No, Yes };
512+
511513
/**
512514
* Remove the Body of this item's value.
513515
* No-op if no Value or no Body present.
514516
* Keeps the Xattrs chunk intact, if any.
517+
*
518+
* @return whether the value has been decompressed for processing
515519
*/
516-
void removeBody();
520+
WasValueInflated removeBody();
517521

518522
/**
519523
* Remove the Xattrs chunk of this item's value.
520524
* No-op if no Value or no Xattr present.
521525
* Keeps the Body intact, if any.
526+
*
527+
* @return whether the value has been decompressed for processing
522528
*/
523-
void removeXattrs();
529+
WasValueInflated removeXattrs();
524530

525531
/**
526532
* Remove user-xattrs from the Xattrs chunk of this item's value.
527533
* No-op if no Value or no user-xattr present.
528534
* Keeps the Body and the sys-xattrs intact, if any.
535+
*
536+
* @return whether the value has been decompressed for processing
529537
*/
530-
void removeUserXattrs();
538+
WasValueInflated removeUserXattrs();
531539

532540
/**
533541
* Removes Body and/or Xattrs from the item depending on the given params
@@ -536,8 +544,10 @@ class Item : public RCValue {
536544
* @param includeXattrs states whether the item should include xattrs
537545
* @param includeDeletedUserXattrs states whether a delete item should
538546
* include user-xattrs
547+
*
548+
* @return whether the value has been decompressed for processing
539549
*/
540-
void removeBodyAndOrXattrs(
550+
WasValueInflated removeBodyAndOrXattrs(
541551
IncludeValue includeVal,
542552
IncludeXattrs includeXattrs,
543553
IncludeDeletedUserXattrs includeDeletedUserXattrs);

engines/ep/tests/mock/mock_dcp_producer.cc

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,12 @@
2020
#include "dcp/active_stream_checkpoint_processor_task.h"
2121
#include "dcp/msg_producers_border_guard.h"
2222
#include "dcp/response.h"
23+
#include "mock_bucket_logger.h"
2324
#include "mock_dcp.h"
2425
#include "mock_dcp_backfill_mgr.h"
2526
#include "mock_stream.h"
2627
#include "vbucket.h"
2728

28-
#include <folly/portability/GTest.h>
29-
3029
extern cb::mcbp::ClientOpcode last_op;
3130

3231
MockDcpProducer::MockDcpProducer(EventuallyPersistentEngine& theEngine,
@@ -155,3 +154,12 @@ BackfillScanBuffer& MockDcpProducer::public_getBackfillScanBuffer() {
155154
return std::dynamic_pointer_cast<MockDcpBackfillManager>(backfillMgr.load())
156155
->public_getBackfillScanBuffer();
157156
}
157+
158+
void MockDcpProducer::setupMockLogger() {
159+
logger = std::make_shared<::testing::NiceMock<MockBucketLogger>>("prod");
160+
}
161+
162+
MockBucketLogger& MockDcpProducer::public_getLogger() const {
163+
EXPECT_TRUE(logger);
164+
return dynamic_cast<MockBucketLogger&>(*logger);
165+
}

engines/ep/tests/mock/mock_dcp_producer.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
class ActiveStreamCheckpointProcessorTask;
2525
struct BackfillScanBuffer;
2626
class MockActiveStream;
27+
class MockBucketLogger;
2728
class MockDcpMessageProducers;
2829

2930
/*
@@ -245,4 +246,8 @@ class MockDcpProducer : public DcpProducer {
245246
void setTotalBtyesSent(size_t v) {
246247
totalBytesSent = v;
247248
}
249+
250+
void setupMockLogger();
251+
252+
MockBucketLogger& public_getLogger() const;
248253
};

engines/ep/tests/module_tests/dcp_test.cc

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
*/
2424

2525
#include "dcp_test.h"
26+
#include "../mock/mock_bucket_logger.h"
2627
#include "../mock/mock_checkpoint_manager.h"
2728
#include "../mock/mock_dcp.h"
2829
#include "../mock/mock_dcp_conn_map.h"
@@ -47,6 +48,7 @@
4748
#include "test_helpers.h"
4849
#include "warmup.h"
4950

51+
#include <folly/portability/GMock.h>
5052
#include <folly/portability/GTest.h>
5153
#include <memcached/server_cookie_iface.h>
5254
#include <platform/cbassert.h>
@@ -798,6 +800,61 @@ TEST_P(CompressionStreamTest, force_value_compression_enabled) {
798800
destroy_dcp_stream();
799801
}
800802

803+
TEST_P(CompressionStreamTest,
804+
NoWithUnderlyingDatatype_CompressionDisabled_ItemCompressed) {
805+
setup_dcp_stream(
806+
0, IncludeValue::NoWithUnderlyingDatatype, IncludeXattrs::Yes);
807+
ASSERT_FALSE(producer->isCompressionEnabled());
808+
ASSERT_EQ(IncludeValue::NoWithUnderlyingDatatype,
809+
stream->public_getIncludeValue());
810+
ASSERT_EQ(IncludeXattrs::Yes, stream->public_getIncludeXattrs());
811+
812+
// Create a compressed item
813+
auto item = makeCompressibleItem(vbid,
814+
makeStoredDocKey("key"),
815+
"body000000000000000000000000000000000000",
816+
isXattr() ? PROTOCOL_BINARY_DATATYPE_JSON
817+
: PROTOCOL_BINARY_RAW_BYTES,
818+
true, // compressed
819+
isXattr());
820+
821+
// ActiveStream::makeResponseFromItem is where we modify the item value (if
822+
// necessary) before pushing items into the Stream::readyQ. Here we just
823+
// pass the item in input to the function and check that we get the expected
824+
// DcpResponse.
825+
826+
// Core expectation of this test: we don't try to inflate an uncompressed or
827+
// empty value, which leads to logging a warning. Before the fix, this
828+
// expectation fails as GMock intercept 1 call to mlog.
829+
producer->setupMockLogger();
830+
using namespace testing;
831+
EXPECT_CALL(producer->public_getLogger(), mlog(_, _)).Times(0);
832+
833+
queued_item originalItem(std::move(item));
834+
const auto resp = stream->public_makeResponseFromItem(
835+
originalItem, SendCommitSyncWriteAs::Commit);
836+
837+
const auto* mut = dynamic_cast<MutationResponse*>(resp.get());
838+
ASSERT_TRUE(mut);
839+
840+
// Expecting a modified item, new allocation occurred.
841+
ASSERT_NE(originalItem.get(), mut->getItem().get());
842+
843+
const auto originalValueSize = originalItem->getNBytes();
844+
ASSERT_GT(originalValueSize, 0);
845+
const auto onTheWireValueSize = mut->getItem()->getNBytes();
846+
847+
if (isXattr()) {
848+
// Stream::makeResponseFromItem will have inflated the value for
849+
// removing Xattrs, and then not re-compressed as passive compression
850+
// is disabled.
851+
EXPECT_GT(onTheWireValueSize, originalValueSize);
852+
} else {
853+
// Body only, which must have been removed.
854+
EXPECT_EQ(0, onTheWireValueSize);
855+
}
856+
}
857+
801858
class ConnectionTest : public DCPTest,
802859
public ::testing::WithParamInterface<
803860
std::tuple<std::string, std::string>> {

engines/ep/tests/module_tests/evp_store_with_meta.cc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -510,8 +510,11 @@ TEST_F(WithMetaTest, storeUncompressedInOffMode) {
510510
return;
511511
}
512512
std::string valueData{R"({"aaaaaaaaa":10000000000})"};
513-
auto item = makeCompressibleItem(vbid, makeStoredDocKey("key"), valueData,
514-
PROTOCOL_BINARY_RAW_BYTES, true);
513+
auto item = makeCompressibleItem(vbid,
514+
makeStoredDocKey("key"),
515+
valueData,
516+
PROTOCOL_BINARY_RAW_BYTES,
517+
true);
515518

516519
item->setCas();
517520

engines/ep/tests/module_tests/test_helpers.cc

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -82,27 +82,35 @@ queued_item makePendingItem(StoredDocKey key,
8282

8383
std::unique_ptr<Item> makeCompressibleItem(Vbid vbid,
8484
const DocKey& key,
85-
const std::string& value,
85+
const std::string& body,
8686
protocol_binary_datatype_t datatype,
8787
bool shouldCompress,
8888
bool makeXattrBody) {
8989
protocol_binary_datatype_t itemDataType = datatype;
90-
std::string v = value;
90+
std::string value = body;
9191
if (makeXattrBody) {
92-
v = createXattrValue(value, true, false);
92+
value = createXattrValue(body, true, false);
9393
itemDataType |= PROTOCOL_BINARY_DATATYPE_XATTR;
9494
}
9595
if (shouldCompress) {
9696
cb::compression::Buffer output;
97-
cb::compression::deflate(cb::compression::Algorithm::Snappy, v, output);
97+
cb::compression::deflate(
98+
cb::compression::Algorithm::Snappy, value, output);
9899
itemDataType |= PROTOCOL_BINARY_DATATYPE_SNAPPY;
99100
return std::make_unique<Item>(key, /*flags*/0, /*exp*/0,
100101
output.data(), output.size(),
101102
itemDataType);
102103
}
103104

104-
return std::make_unique<Item>(
105-
key, /*flags*/ 0, /*exp*/ 0, v.c_str(), v.length(), itemDataType);
105+
return std::make_unique<Item>(key,
106+
0 /*flags*/,
107+
0 /*exp*/,
108+
value.c_str(),
109+
value.length(),
110+
itemDataType,
111+
0 /*cas*/,
112+
-1 /*seqno*/,
113+
vbid);
106114
}
107115

108116
bool queueNewItem(VBucket& vbucket, const std::string& key) {

0 commit comments

Comments
 (0)