Skip to content

Commit 05db33e

Browse files
committed
MB-40493: ActiveStream doesn't try to inflate an inflated/empty value
An unnecessary warning is logged otherwise. Change-Id: I8416275ee71b1391616dea6bab053e47a2c0e090 Reviewed-on: http://review.couchbase.org/c/kv_engine/+/136710 Tested-by: Build Bot <[email protected]> Reviewed-by: Jim Walker <[email protected]> Well-Formed: Build Bot <[email protected]>
1 parent 473489d commit 05db33e

File tree

8 files changed

+148
-36
lines changed

8 files changed

+148
-36
lines changed

engines/ep/src/dcp/active_stream.cc

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1057,7 +1057,7 @@ std::unique_ptr<DcpResponse> ActiveStream::makeResponseFromItem(
10571057
isForceValueCompressionEnabled(),
10581058
isSnappyEnabled())) {
10591059
auto finalItem = std::make_unique<Item>(*item);
1060-
finalItem->removeBodyAndOrXattrs(
1060+
const auto wasInflated = finalItem->removeBodyAndOrXattrs(
10611061
includeValue, includeXattributes, includeDeletedUserXattrs);
10621062

10631063
if (isSnappyEnabled()) {
@@ -1072,12 +1072,23 @@ std::unique_ptr<DcpResponse> ActiveStream::makeResponseFromItem(
10721072
}
10731073
}
10741074
} else {
1075-
if (mcbp::datatype::is_snappy(finalItem->getDataType())) {
1075+
// The purpose of this block is to uncompress compressed items
1076+
// as they are being streamed over a connection that doesn't
1077+
// support compression.
1078+
//
1079+
// MB-40493: IncludeValue::NoWithUnderlyingDatatype may reset
1080+
// datatype to SNAPPY, even if the value has been already
1081+
// decompressed (eg, the original value contained Body+Xattr
1082+
// and Body have been removed) or if there is no value at all
1083+
// (eg, the original value contained only a Body, now removed).
1084+
// We need to avoid the call to Item::decompress in both cases,
1085+
// we log an unnecessary warning otherwise.
1086+
if (mcbp::datatype::is_snappy(finalItem->getDataType()) &&
1087+
(wasInflated == Item::WasValueInflated::No) &&
1088+
(finalItem->getNBytes() > 0)) {
10761089
if (!finalItem->decompressValue()) {
10771090
log(spdlog::level::level_enum::warn,
1078-
1079-
"{} Failed to snappy uncompress a compressed "
1080-
"value",
1091+
"{} Failed to snappy uncompress a compressed value",
10811092
logPrefix);
10821093
}
10831094
}

engines/ep/src/item.cc

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -405,42 +405,44 @@ item_info Item::toItemInfo(uint64_t vb_uuid, int64_t hlcEpoch) const {
405405
return info;
406406
}
407407

408-
void Item::removeBody() {
408+
Item::WasValueInflated Item::removeBody() {
409409
if (!value) {
410410
// No value, nothing to do
411-
return;
411+
return WasValueInflated::No;
412412
}
413413

414414
if (!mcbp::datatype::is_xattr(getDataType())) {
415415
// We don't want the body and there are no xattrs, just set empty value
416416
setData(nullptr, 0);
417417
setDataType(PROTOCOL_BINARY_RAW_BYTES);
418-
return;
418+
return WasValueInflated::No;
419419
}
420420

421421
// No-op if already uncompressed
422-
decompressValue();
422+
const auto wasInflated = decompressValue();
423423

424424
// We want only xattrs.
425425
// Note: The following is no-op if no Body present.
426426
const cb::const_char_buffer valBuffer{value->getData(), value->valueSize()};
427427
setData(valBuffer.data(), cb::xattr::get_body_offset(valBuffer));
428428
setDataType(PROTOCOL_BINARY_DATATYPE_XATTR);
429+
430+
return wasInflated ? WasValueInflated::Yes : WasValueInflated::No;
429431
}
430432

431-
void Item::removeXattrs() {
433+
Item::WasValueInflated Item::removeXattrs() {
432434
if (!value) {
433435
// No value, nothing to do
434-
return;
436+
return WasValueInflated::No;
435437
}
436438

437439
if (!mcbp::datatype::is_xattr(getDataType())) {
438440
// No Xattrs, nothing to do
439-
return;
441+
return WasValueInflated::No;
440442
}
441443

442444
// No-op if already uncompressed
443-
decompressValue();
445+
const auto wasInflated = decompressValue();
444446

445447
// We want only the body
446448
const cb::const_char_buffer valBuffer{value->getData(), value->valueSize()};
@@ -454,21 +456,23 @@ void Item::removeXattrs() {
454456
// Subdoc logic for details. Here we have to rectify.
455457
setDataType(getDataType() & ~PROTOCOL_BINARY_DATATYPE_JSON);
456458
}
459+
460+
return wasInflated ? WasValueInflated::Yes : WasValueInflated::No;
457461
}
458462

459-
void Item::removeUserXattrs() {
463+
Item::WasValueInflated Item::removeUserXattrs() {
460464
if (!value) {
461465
// No value, nothing to do
462-
return;
466+
return WasValueInflated::No;
463467
}
464468

465469
if (!mcbp::datatype::is_xattr(getDataType())) {
466470
// No Xattrs, nothing to do
467-
return;
471+
return WasValueInflated::No;
468472
}
469473

470474
// No-op if already uncompressed
471-
decompressValue();
475+
const auto wasInflated = decompressValue();
472476

473477
// The function currently does not support value with body.
474478
// That is fine for now as this is introduced for MB-37374, thus is supposed
@@ -494,33 +498,37 @@ void Item::removeUserXattrs() {
494498
// Note: Doing this unconditionally as we reach this line iff there is no
495499
// body. We would need to do this conditionally otherwise.
496500
setDataType(getDataType() & ~PROTOCOL_BINARY_DATATYPE_JSON);
501+
502+
return wasInflated ? WasValueInflated::Yes : WasValueInflated::No;
497503
}
498504

499-
void Item::removeBodyAndOrXattrs(
505+
Item::WasValueInflated Item::removeBodyAndOrXattrs(
500506
IncludeValue includeVal,
501507
IncludeXattrs includeXattrs,
502508
IncludeDeletedUserXattrs includeDeletedUserXattrs) {
503509
if (!value) {
504510
// If no value (ie, no body and/or xattrs) then nothing to do
505-
return;
511+
return WasValueInflated::No;
506512
}
507513

508514
// Take a copy of the original datatype before proceeding, any modification
509515
// to the value may change the datatype.
510516
const auto originalDatatype = getDataType();
511517

518+
auto wasInflated = WasValueInflated::No;
519+
512520
// Note: IncludeValue acts like "include body"
513521
if (includeVal != IncludeValue::Yes) {
514-
removeBody();
522+
wasInflated = removeBody();
515523
}
516524

517525
if (includeXattrs == IncludeXattrs::No) {
518-
removeXattrs();
526+
wasInflated = removeXattrs();
519527
}
520528

521529
if (isDeleted() &&
522530
includeDeletedUserXattrs == IncludeDeletedUserXattrs::No) {
523-
removeUserXattrs();
531+
wasInflated = removeUserXattrs();
524532
}
525533

526534
// Datatype for no-value must be RAW
@@ -532,6 +540,8 @@ void Item::removeBodyAndOrXattrs(
532540
if (includeVal == IncludeValue::NoWithUnderlyingDatatype) {
533541
setDataType(originalDatatype);
534542
}
543+
544+
return wasInflated;
535545
}
536546

537547
item_info to_item_info(const ItemMetaData& itemMeta,

engines/ep/src/item.h

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -494,26 +494,34 @@ class Item : public RCValue {
494494
*/
495495
item_info toItemInfo(uint64_t vb_uuid, int64_t hlcEpoch) const;
496496

497+
enum class WasValueInflated : uint8_t { No, Yes };
498+
497499
/**
498500
* Remove the Body of this item's value.
499501
* No-op if no Value or no Body present.
500502
* Keeps the Xattrs chunk intact, if any.
503+
*
504+
* @return whether the value has been decompressed for processing
501505
*/
502-
void removeBody();
506+
WasValueInflated removeBody();
503507

504508
/**
505509
* Remove the Xattrs chunk of this item's value.
506510
* No-op if no Value or no Xattr present.
507511
* Keeps the Body intact, if any.
512+
*
513+
* @return whether the value has been decompressed for processing
508514
*/
509-
void removeXattrs();
515+
WasValueInflated removeXattrs();
510516

511517
/**
512518
* Remove user-xattrs from the Xattrs chunk of this item's value.
513519
* No-op if no Value or no user-xattr present.
514520
* Keeps the Body and the sys-xattrs intact, if any.
521+
*
522+
* @return whether the value has been decompressed for processing
515523
*/
516-
void removeUserXattrs();
524+
WasValueInflated removeUserXattrs();
517525

518526
/**
519527
* Removes Body and/or Xattrs from the item depending on the given params
@@ -522,8 +530,10 @@ class Item : public RCValue {
522530
* @param includeXattrs states whether the item should include xattrs
523531
* @param includeDeletedUserXattrs states whether a delete item should
524532
* include user-xattrs
533+
*
534+
* @return whether the value has been decompressed for processing
525535
*/
526-
void removeBodyAndOrXattrs(
536+
WasValueInflated removeBodyAndOrXattrs(
527537
IncludeValue includeVal,
528538
IncludeXattrs includeXattrs,
529539
IncludeDeletedUserXattrs includeDeletedUserXattrs);

engines/ep/tests/mock/mock_dcp_producer.cc

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,12 @@
2020
#include "dcp/active_stream_checkpoint_processor_task.h"
2121
#include "dcp/msg_producers_border_guard.h"
2222
#include "dcp/response.h"
23+
#include "mock_bucket_logger.h"
2324
#include "mock_dcp.h"
2425
#include "mock_dcp_backfill_mgr.h"
2526
#include "mock_stream.h"
2627
#include "vbucket.h"
2728

28-
#include <folly/portability/GTest.h>
29-
3029
extern cb::mcbp::ClientOpcode last_op;
3130

3231
MockDcpProducer::MockDcpProducer(EventuallyPersistentEngine& theEngine,
@@ -154,3 +153,12 @@ BackfillScanBuffer& MockDcpProducer::public_getBackfillScanBuffer() {
154153
void MockDcpProducer::bytesForceRead(size_t bytes) {
155154
backfillMgr->bytesForceRead(bytes);
156155
}
156+
157+
void MockDcpProducer::setupMockLogger() {
158+
logger = std::make_shared<::testing::NiceMock<MockBucketLogger>>("prod");
159+
}
160+
161+
MockBucketLogger& MockDcpProducer::public_getLogger() const {
162+
EXPECT_TRUE(logger);
163+
return dynamic_cast<MockBucketLogger&>(*logger);
164+
}

engines/ep/tests/mock/mock_dcp_producer.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
class ActiveStreamCheckpointProcessorTask;
2525
struct BackfillScanBuffer;
2626
class MockActiveStream;
27+
class MockBucketLogger;
2728
class MockDcpMessageProducers;
2829

2930
/*
@@ -236,4 +237,8 @@ class MockDcpProducer : public DcpProducer {
236237
IncludeDeletedUserXattrs public_getIncludeDeletedUserXattrs() const {
237238
return includeDeletedUserXattrs;
238239
}
240+
241+
void setupMockLogger();
242+
243+
MockBucketLogger& public_getLogger() const;
239244
};

engines/ep/tests/module_tests/dcp_test.cc

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
*/
2424

2525
#include "dcp_test.h"
26+
#include "../mock/mock_bucket_logger.h"
2627
#include "../mock/mock_checkpoint_manager.h"
2728
#include "../mock/mock_dcp.h"
2829
#include "../mock/mock_dcp_conn_map.h"
@@ -49,6 +50,7 @@
4950
#include "vbucket.h"
5051
#include "warmup.h"
5152

53+
#include <folly/portability/GMock.h>
5254
#include <folly/portability/GTest.h>
5355
#include <memcached/server_cookie_iface.h>
5456
#include <platform/cbassert.h>
@@ -698,6 +700,61 @@ TEST_P(CompressionStreamTest, force_value_compression_enabled) {
698700
destroy_dcp_stream();
699701
}
700702

703+
TEST_P(CompressionStreamTest,
704+
NoWithUnderlyingDatatype_CompressionDisabled_ItemCompressed) {
705+
setup_dcp_stream(
706+
0, IncludeValue::NoWithUnderlyingDatatype, IncludeXattrs::Yes);
707+
ASSERT_FALSE(producer->isCompressionEnabled());
708+
ASSERT_EQ(IncludeValue::NoWithUnderlyingDatatype,
709+
stream->public_getIncludeValue());
710+
ASSERT_EQ(IncludeXattrs::Yes, stream->public_getIncludeXattrs());
711+
712+
// Create a compressed item
713+
auto item = makeCompressibleItem(vbid,
714+
makeStoredDocKey("key"),
715+
"body000000000000000000000000000000000000",
716+
isXattr() ? PROTOCOL_BINARY_DATATYPE_JSON
717+
: PROTOCOL_BINARY_RAW_BYTES,
718+
true, // compressed
719+
isXattr());
720+
721+
// ActiveStream::makeResponseFromItem is where we modify the item value (if
722+
// necessary) before pushing items into the Stream::readyQ. Here we just
723+
// pass the item in input to the function and check that we get the expected
724+
// DcpResponse.
725+
726+
// Core expectation of this test: we don't try to inflate an uncompressed or
727+
// empty value, which leads to logging a warning. Before the fix, this
728+
// expectation fails as GMock intercept 1 call to mlog.
729+
producer->setupMockLogger();
730+
using namespace testing;
731+
EXPECT_CALL(producer->public_getLogger(), mlog(_, _)).Times(0);
732+
733+
queued_item originalItem(std::move(item));
734+
const auto resp = stream->public_makeResponseFromItem(
735+
originalItem, SendCommitSyncWriteAs::Commit);
736+
737+
const auto* mut = dynamic_cast<MutationResponse*>(resp.get());
738+
ASSERT_TRUE(mut);
739+
740+
// Expecting a modified item, new allocation occurred.
741+
ASSERT_NE(originalItem.get(), mut->getItem().get());
742+
743+
const auto originalValueSize = originalItem->getNBytes();
744+
ASSERT_GT(originalValueSize, 0);
745+
const auto onTheWireValueSize = mut->getItem()->getNBytes();
746+
747+
if (isXattr()) {
748+
// Stream::makeResponseFromItem will have inflated the value for
749+
// removing Xattrs, and then not re-compressed as passive compression
750+
// is disabled.
751+
EXPECT_GT(onTheWireValueSize, originalValueSize);
752+
} else {
753+
// Body only, which must have been removed.
754+
EXPECT_EQ(0, onTheWireValueSize);
755+
}
756+
}
757+
701758
class ConnectionTest : public DCPTest,
702759
public ::testing::WithParamInterface<
703760
std::tuple<std::string, std::string>> {

engines/ep/tests/module_tests/evp_store_with_meta.cc

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -510,8 +510,11 @@ TEST_F(WithMetaTest, storeUncompressedInOffMode) {
510510
return;
511511
}
512512
std::string valueData{R"({"aaaaaaaaa":10000000000})"};
513-
auto item = makeCompressibleItem(vbid, makeStoredDocKey("key"), valueData,
514-
PROTOCOL_BINARY_RAW_BYTES, true);
513+
auto item = makeCompressibleItem(vbid,
514+
makeStoredDocKey("key"),
515+
valueData,
516+
PROTOCOL_BINARY_RAW_BYTES,
517+
true);
515518

516519
item->setCas();
517520

engines/ep/tests/module_tests/test_helpers.cc

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,27 +77,35 @@ queued_item makePendingItem(StoredDocKey key,
7777

7878
std::unique_ptr<Item> makeCompressibleItem(Vbid vbid,
7979
const DocKey& key,
80-
const std::string& value,
80+
const std::string& body,
8181
protocol_binary_datatype_t datatype,
8282
bool shouldCompress,
8383
bool makeXattrBody) {
8484
protocol_binary_datatype_t itemDataType = datatype;
85-
std::string v = value;
85+
std::string value = body;
8686
if (makeXattrBody) {
87-
v = createXattrValue(value, true, false);
87+
value = createXattrValue(body, true, false);
8888
itemDataType |= PROTOCOL_BINARY_DATATYPE_XATTR;
8989
}
9090
if (shouldCompress) {
9191
cb::compression::Buffer output;
92-
cb::compression::deflate(cb::compression::Algorithm::Snappy, v, output);
92+
cb::compression::deflate(
93+
cb::compression::Algorithm::Snappy, value, output);
9394
itemDataType |= PROTOCOL_BINARY_DATATYPE_SNAPPY;
9495
return std::make_unique<Item>(key, /*flags*/0, /*exp*/0,
9596
output.data(), output.size(),
9697
itemDataType);
9798
}
9899

99-
return std::make_unique<Item>(
100-
key, /*flags*/ 0, /*exp*/ 0, v.c_str(), v.length(), itemDataType);
100+
return std::make_unique<Item>(key,
101+
0 /*flags*/,
102+
0 /*exp*/,
103+
value.c_str(),
104+
value.length(),
105+
itemDataType,
106+
0 /*cas*/,
107+
-1 /*seqno*/,
108+
vbid);
101109
}
102110

103111
bool queueNewItem(VBucket& vbucket, const std::string& key) {

0 commit comments

Comments
 (0)