Skip to content

Commit 30a1c9e

Browse files
BenHuddlestondaverigby
authored andcommitted
MB-42256: Handle seqno ack responses
We may receive a seqno ack response if a vBucket is deleted at the "wrong" time. The consumer node must be able to handle this instead of taking down the DCP connection or it can cause rebalances to fail that should not Change-Id: I26c4ca5301a64cffad8bcc8a5c4f547b8fd9d86f Reviewed-on: http://review.couchbase.org/c/kv_engine/+/139108 Well-Formed: Build Bot <[email protected]> Tested-by: Ben Huddleston <[email protected]> Reviewed-by: Dave Rigby <[email protected]>
1 parent aee0fe1 commit 30a1c9e

File tree

4 files changed

+72
-0
lines changed

4 files changed

+72
-0
lines changed

daemon/mcbp_executors.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -654,6 +654,8 @@ void initialize_mbcp_lookup_map() {
654654
process_bin_dcp_response);
655655
setup_response_handler(cb::mcbp::ClientOpcode::DcpAbort,
656656
process_bin_dcp_response);
657+
setup_response_handler(cb::mcbp::ClientOpcode::DcpSeqnoAcknowledged,
658+
process_bin_dcp_response);
657659
setup_response_handler(cb::mcbp::ClientOpcode::GetErrorMap,
658660
process_bin_dcp_response);
659661

engines/ep/src/dcp/consumer.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1035,6 +1035,12 @@ bool DcpConsumer::handleResponse(const protocol_binary_response_header* resp) {
10351035
producerIsVersion5orHigher = status != cb::mcbp::Status::UnknownCommand;
10361036
getErrorMapState = GetErrorMapState::Skip;
10371037
return true;
1038+
} else if (opcode == cb::mcbp::ClientOpcode::DcpSeqnoAcknowledged) {
1039+
// Seqno ack might respond in a non-success case if the vBucket has gone
1040+
// away on the producer. We don't really care if this happens, the
1041+
// stream has probably already gone away, but we don't want to take down
1042+
// the connection (return false) as it might cause a rebalance to fail.
1043+
return true;
10381044
}
10391045

10401046
logger->warn("Trying to handle an unknown response {}, disconnecting",

engines/ep/tests/module_tests/dcp_test.cc

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2988,6 +2988,22 @@ TEST_P(ConnectionTest, ReplicateJustBeforeThrottleThreshold) {
29882988
sendConsumerMutationsNearThreshold(false);
29892989
}
29902990

2991+
TEST_P(ConnectionTest, ConsumerHandlesSeqnoAckResponse) {
2992+
const void* cookie = create_mock_cookie();
2993+
2994+
/* Set up a consumer connection */
2995+
auto consumer =
2996+
std::make_shared<MockDcpConsumer>(*engine, cookie, "test_consumer");
2997+
2998+
protocol_binary_response_header resp{};
2999+
resp.response.setMagic(cb::mcbp::Magic::AltClientResponse);
3000+
resp.response.setOpcode(cb::mcbp::ClientOpcode::DcpSeqnoAcknowledged);
3001+
resp.response.setStatus(cb::mcbp::Status::NotMyVbucket);
3002+
EXPECT_TRUE(consumer->handleResponse(&resp));
3003+
3004+
destroy_mock_cookie(cookie);
3005+
}
3006+
29913007
void ConnectionTest::processConsumerMutationsNearThreshold(
29923008
bool beyondThreshold) {
29933009
const void* cookie = create_mock_cookie();

tests/testapp/testapp_durability.cc

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,54 @@ TEST_P(DurabilityTest, GetAndTouchNotSupported) {
168168
executeTouchOrGatCommand(ClientOpcode::Gat);
169169
}
170170

171+
TEST_P(DurabilityTest, AckResponseHandled) {
172+
if (mcd_env->getTestBucket().getName() != "default_engine") {
173+
// Will fail in EP engine if the DCP consumer isn't present
174+
return;
175+
}
176+
// Send our response to see if the executor accepts it. If not, it would
177+
// disconnect the connection
178+
std::vector<uint8_t> rspBuffer(1024);
179+
ResponseBuilder rspBuilder({rspBuffer.data(), rspBuffer.size()});
180+
rspBuilder.setOpcode(cb::mcbp::ClientOpcode::DcpSeqnoAcknowledged);
181+
rspBuilder.setMagic(Magic::AltClientResponse);
182+
rspBuilder.setFramingExtras(encode(cb::durability::Requirements()));
183+
rspBuilder.setKey(
184+
{reinterpret_cast<const uint8_t*>(name.data()), name.size()});
185+
rspBuffer.resize(rspBuilder.getFrame()->getFrame().size());
186+
187+
Frame rspFrame;
188+
rspFrame.payload = std::move(rspBuffer);
189+
190+
auto& conn = getConnection();
191+
conn.sendFrame(rspFrame);
192+
193+
// Send something else, a GAT in this case, to test that the connection is
194+
// still up
195+
std::vector<uint8_t> reqBuffer(1024);
196+
RequestBuilder reqBuilder({reqBuffer.data(), reqBuffer.size()});
197+
std::string value = "";
198+
reqBuilder.setOpcode(ClientOpcode::Gat);
199+
reqBuilder.setMagic(Magic::AltClientRequest);
200+
reqBuilder.setFramingExtras(encode(cb::durability::Requirements()));
201+
reqBuilder.setExtras(request::GatPayload().getBuffer());
202+
reqBuilder.setKey(
203+
{reinterpret_cast<const uint8_t*>(name.data()), name.size()});
204+
reqBuilder.setValue(
205+
{reinterpret_cast<const uint8_t*>(value.data()), value.size()});
206+
reqBuffer.resize(reqBuilder.getFrame()->getFrame().size());
207+
208+
Frame reqFrame;
209+
reqFrame.payload = std::move(reqBuffer);
210+
conn.sendFrame(reqFrame);
211+
212+
BinprotResponse resp;
213+
conn.recvResponse(resp);
214+
215+
EXPECT_EQ(Status::NotSupported, resp.getStatus());
216+
EXPECT_NE(0xdeadbeef, ntohll(resp.getCas()));
217+
}
218+
171219
class SubdocDurabilityTest : public DurabilityTest {
172220
protected:
173221
void SetUp() override {

0 commit comments

Comments
 (0)