Skip to content

Commit e6fe669

Browse files
committed
MB-47318: [BP] Add DcpConsumer related client connection methods
Add methods so a test can create and drive a DCP consumer with snapshot/mutate/delete. The formats of the snapshot and delete match the format used by a current producer (the V2 variants). Prepare/Commit not added as they're out of scope for the test which will utilise these. Change-Id: Ibd2ccce7c30b13ebb82b36de32273a2ea3500f21 Reviewed-on: https://review.couchbase.org/c/kv_engine/+/165017 Well-Formed: Restriction Checker Tested-by: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]>
1 parent 3010db4 commit e6fe669

File tree

4 files changed

+527
-3
lines changed

4 files changed

+527
-3
lines changed

protocol/connection/client_connection.cc

Lines changed: 157 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "frameinfo.h"
2020

2121
#include <cbsasl/client.h>
22+
#include <mcbp/dcp_snapshot_marker_codec.h>
2223
#include <mcbp/mcbp.h>
2324
#include <mcbp/protocol/framebuilder.h>
2425
#include <memcached/protocol_binary.h>
@@ -741,7 +742,7 @@ void MemcachedConnection::recvFrame(Frame& frame) {
741742
}
742743
}
743744

744-
void MemcachedConnection::sendCommand(const BinprotCommand& command) {
745+
size_t MemcachedConnection::sendCommand(const BinprotCommand& command) {
745746
traceData.reset();
746747

747748
auto encoded = command.encode();
@@ -765,14 +766,17 @@ void MemcachedConnection::sendCommand(const BinprotCommand& command) {
765766
iovec iov{};
766767
iov.iov_base = encoded.header.data();
767768
iov.iov_len = encoded.header.size();
769+
size_t sentBytes = iov.iov_len;
768770
message.push_back(iov);
769771
for (auto buf : encoded.bufs) {
770772
iov.iov_base = const_cast<uint8_t*>(buf.data());
771773
iov.iov_len = buf.size();
774+
sentBytes += iov.iov_len;
772775
message.push_back(iov);
773776
}
774777

775778
sendBuffer(message);
779+
return sentBytes;
776780
}
777781

778782
void MemcachedConnection::recvResponse(BinprotResponse& response) {
@@ -1484,6 +1488,158 @@ Document MemcachedConnection::getRandomKey(Vbid vbucket) {
14841488
return ret;
14851489
}
14861490

1491+
void MemcachedConnection::dcpOpenProducer(const std::string& name) {
1492+
BinprotDcpOpenCommand open{name,
1493+
cb::mcbp::request::DcpOpenPayload::Producer};
1494+
const auto response = BinprotResponse(execute(open));
1495+
if (!response.isSuccess()) {
1496+
throw ConnectionError("Failed dcpOpenProducer", response);
1497+
}
1498+
}
1499+
1500+
void MemcachedConnection::dcpOpenConsumer(const std::string& name) {
1501+
BinprotDcpOpenCommand open{name};
1502+
const auto response = BinprotResponse(execute(open));
1503+
if (!response.isSuccess()) {
1504+
throw ConnectionError("Failed dcpOpenConsumer", response);
1505+
}
1506+
}
1507+
1508+
void MemcachedConnection::dcpControl(const std::string& key,
1509+
const std::string& value) {
1510+
BinprotDcpControlCommand control;
1511+
control.setKey(key);
1512+
control.setValue(value);
1513+
const auto response = BinprotResponse(execute(control));
1514+
if (!response.isSuccess()) {
1515+
throw ConnectionError("Failed dcpControl", response);
1516+
}
1517+
}
1518+
1519+
void MemcachedConnection::dcpStreamRequest(Vbid vbid,
1520+
uint32_t flags,
1521+
uint64_t startSeq,
1522+
uint64_t endSeq,
1523+
uint64_t vbUuid,
1524+
uint64_t snapStart,
1525+
uint64_t snapEnd) {
1526+
BinprotDcpStreamRequestCommand stream(
1527+
vbid, flags, startSeq, endSeq, vbUuid, snapStart, snapEnd);
1528+
const auto response = BinprotResponse(execute(stream));
1529+
if (!response.isSuccess()) {
1530+
throw ConnectionError("Failed dcpStreamRequest", response);
1531+
}
1532+
}
1533+
1534+
void MemcachedConnection::dcpAddStream(Vbid vbid, uint32_t flags) {
1535+
sendCommand(BinprotDcpAddStreamCommand{flags}.setVBucket(vbid));
1536+
}
1537+
1538+
void MemcachedConnection::dcpStreamRequestResponse(
1539+
uint32_t opaque,
1540+
const std::vector<std::pair<uint64_t, uint64_t>>& failovers) {
1541+
BinprotCommandResponse rsp{cb::mcbp::ClientOpcode::DcpStreamReq, opaque};
1542+
1543+
// Turn the vector of pairs into a protocol failover table (in a string
1544+
// so we can attach to the response)
1545+
std::string table;
1546+
for (const auto& entry : failovers) {
1547+
auto wireUuid = htonll(entry.first);
1548+
auto wireSeqno = htonll(entry.second);
1549+
1550+
std::copy_n(reinterpret_cast<uint8_t*>(&wireUuid),
1551+
sizeof(uint64_t),
1552+
std::back_inserter(table));
1553+
1554+
std::copy_n(reinterpret_cast<uint8_t*>(&wireSeqno),
1555+
sizeof(uint64_t),
1556+
std::back_inserter(table));
1557+
}
1558+
1559+
rsp.setValue(table);
1560+
sendCommand(rsp);
1561+
}
1562+
1563+
size_t MemcachedConnection::dcpSnapshotMarkerV2(uint32_t opaque,
1564+
uint64_t start,
1565+
uint64_t end,
1566+
uint32_t flags) {
1567+
const auto size = sizeof(cb::mcbp::Request) +
1568+
sizeof(cb::mcbp::request::DcpSnapshotMarkerV2xPayload) +
1569+
sizeof(cb::mcbp::request::DcpSnapshotMarkerV2_0Value);
1570+
Frame buffer;
1571+
buffer.payload.resize(size);
1572+
1573+
cb::mcbp::FrameBuilder<cb::mcbp::Request> builder(
1574+
{buffer.payload.data(), buffer.payload.size()});
1575+
builder.setMagic(cb::mcbp::Magic::ClientRequest);
1576+
builder.setOpcode(cb::mcbp::ClientOpcode::DcpSnapshotMarker);
1577+
builder.setOpaque(opaque);
1578+
1579+
cb::mcbp::encodeDcpSnapshotMarker(builder, start, end, flags, {}, end);
1580+
sendFrame(buffer);
1581+
return buffer.payload.size();
1582+
}
1583+
1584+
size_t MemcachedConnection::dcpMutation(const Document& doc,
1585+
uint32_t opaque,
1586+
uint64_t seqno,
1587+
uint64_t revSeqno,
1588+
uint32_t lockTime,
1589+
uint8_t nru) {
1590+
// No reply expected
1591+
return sendCommand(BinprotDcpMutationCommand{doc.info.id,
1592+
doc.value,
1593+
opaque,
1594+
uint8_t(doc.info.datatype),
1595+
doc.info.expiration,
1596+
doc.info.cas,
1597+
seqno,
1598+
revSeqno,
1599+
doc.info.flags,
1600+
lockTime,
1601+
nru});
1602+
}
1603+
1604+
size_t MemcachedConnection::dcpDeletionV2(const Document& doc,
1605+
uint32_t opaque,
1606+
uint64_t seqno,
1607+
uint64_t revSeqno,
1608+
uint32_t deleteTime) {
1609+
// No reply expected
1610+
return sendCommand(BinprotDcpDeletionV2Command{doc.info.id,
1611+
doc.value,
1612+
opaque,
1613+
uint8_t(doc.info.datatype),
1614+
doc.info.cas,
1615+
seqno,
1616+
revSeqno,
1617+
deleteTime});
1618+
}
1619+
1620+
void MemcachedConnection::recvDcpBufferAck(uint32_t expected) {
1621+
Frame frame;
1622+
recvFrame(frame);
1623+
const auto* request = frame.getRequest();
1624+
if (request->getClientOpcode() !=
1625+
cb::mcbp::ClientOpcode::DcpBufferAcknowledgement) {
1626+
throw std::logic_error(
1627+
"MemcachedConnection::recvDcpBufferAck not a buffer ack "
1628+
"opcode");
1629+
}
1630+
auto* dcpBufferAck =
1631+
reinterpret_cast<const cb::mcbp::request::DcpBufferAckPayload*>(
1632+
request->getExtdata().data());
1633+
1634+
if (dcpBufferAck->getBufferBytes() != expected) {
1635+
throw std::logic_error(
1636+
"MemcachedConnection::recvDcpBufferAck: Unexpected buffer "
1637+
"bytes:" +
1638+
std::to_string(dcpBufferAck->getBufferBytes()) +
1639+
" expected:" + std::to_string(expected));
1640+
}
1641+
}
1642+
14871643
void MemcachedConnection::setUnorderedExecutionMode(ExecutionMode mode) {
14881644
switch (mode) {
14891645
case ExecutionMode::Ordered:

protocol/connection/client_connection.h

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -576,7 +576,7 @@ class MemcachedConnection {
576576
*/
577577
void recvFrame(Frame& frame);
578578

579-
void sendCommand(const BinprotCommand& command);
579+
size_t sendCommand(const BinprotCommand& command);
580580

581581
void recvResponse(BinprotResponse& response);
582582

@@ -860,6 +860,58 @@ class MemcachedConnection {
860860

861861
Document getRandomKey(Vbid vbid);
862862

863+
void dcpOpenProducer(const std::string& name);
864+
void dcpOpenConsumer(const std::string& name);
865+
void dcpControl(const std::string& key, const std::string& value);
866+
void dcpStreamRequest(Vbid vbid,
867+
uint32_t flags,
868+
uint64_t startSeq,
869+
uint64_t endSeq,
870+
uint64_t vbUuid,
871+
uint64_t snapStart,
872+
uint64_t snapEnd);
873+
void dcpStreamRequest(Vbid vbid,
874+
uint32_t flags,
875+
uint64_t startSeq,
876+
uint64_t endSeq,
877+
uint64_t vbUuid,
878+
uint64_t snapStart,
879+
uint64_t snapEnd,
880+
const nlohmann::json& value);
881+
882+
/* following dcp functions are for working with a consumer */
883+
void dcpAddStream(Vbid vbid, uint32_t flags = 0);
884+
885+
/**
886+
* Send a success response for a DcpStreamRequest
887+
* Includes a value encoding a failover table
888+
* @param opaque request/response opaque
889+
* @param failovers vector of pair representing failover table. The pair
890+
* encodes first = uuid, second = seqno
891+
*/
892+
void dcpStreamRequestResponse(
893+
uint32_t opaque,
894+
const std::vector<std::pair<uint64_t, uint64_t>>& failovers);
895+
/**
896+
* Send the V2 marker with max visible seqno set to end
897+
*/
898+
size_t dcpSnapshotMarkerV2(uint32_t opaque,
899+
uint64_t start,
900+
uint64_t end,
901+
uint32_t flags);
902+
size_t dcpMutation(const Document& doc,
903+
uint32_t opaque,
904+
uint64_t seqno,
905+
uint64_t revSeqno = 0,
906+
uint32_t lockTime = 0,
907+
uint8_t nru = 0);
908+
size_t dcpDeletionV2(const Document& doc,
909+
uint32_t opaque,
910+
uint64_t seqno,
911+
uint64_t revSeqno = 0,
912+
uint32_t deleteTime = 0);
913+
void recvDcpBufferAck(uint32_t expected);
914+
863915
protected:
864916
void read(Frame& frame, size_t bytes);
865917

0 commit comments

Comments
 (0)