Skip to content

Commit 0198c3f

Browse files
author
Gerrit Code Review
committed
Merge "Merge branch 'cheshire-cat'"
2 parents c40de50 + 16212e9 commit 0198c3f

File tree

7 files changed

+475
-86
lines changed

7 files changed

+475
-86
lines changed
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
### Prepare (0x60)
2+
3+
Tells the consumer that the message contains a prepare (first stage of a
4+
durable write).
5+
6+
The request:
7+
* Must have extras
8+
* Must have key
9+
* May have value
10+
11+
Extra looks like:
12+
13+
Byte/ 0 | 1 | 2 | 3 |
14+
/ | | | |
15+
|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
16+
+---------------+---------------+---------------+---------------+
17+
0| by_seqo |
18+
| |
19+
+---------------+---------------+---------------+---------------+
20+
8| rev seqno |
21+
| |
22+
+---------------+---------------+---------------+---------------+
23+
16| flags |
24+
+---------------+---------------+---------------+---------------+
25+
20| expiration |
26+
+---------------+---------------+---------------+---------------+
27+
24| lock_time |
28+
+---------------+---------------+---------------+---------------+
29+
28| NRU | deleted | durability |
30+
+---------------+---------------+---------------+
31+
Total 31 bytes
32+
33+
NRU is an internal field used by the server and may safely be ignored by other consumers.
34+
35+
The consumer should not send a reply to this command. The following example shows the breakdown of the message:
36+
37+
The following message is a prepare of key "hello" with durability level of 'Majority'
38+
39+
40+
Byte/ 0 | 1 | 2 | 3 |
41+
/ | | | |
42+
|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
43+
+---------------+---------------+---------------+---------------+
44+
0| 0x80 | 0x57 | 0x00 | 0x05 |
45+
+---------------+---------------+---------------+---------------+
46+
4| 0x1f | 0x00 | 0x02 | 0x10 |
47+
+---------------+---------------+---------------+---------------+
48+
8| 0x00 | 0x00 | 0x00 | 0x29 |
49+
+---------------+---------------+---------------+---------------+
50+
12| 0x00 | 0x00 | 0x12 | 0x10 |
51+
+---------------+---------------+---------------+---------------+
52+
16| 0x00 | 0x00 | 0x00 | 0x00 |
53+
+---------------+---------------+---------------+---------------+
54+
20| 0x00 | 0x00 | 0x00 | 0x00 |
55+
+---------------+---------------+---------------+---------------+
56+
24| 0x00 | 0x00 | 0x00 | 0x00 |
57+
+---------------+---------------+---------------+---------------+
58+
28| 0x00 | 0x00 | 0x00 | 0x04 |
59+
+---------------+---------------+---------------+---------------+
60+
32| 0x00 | 0x00 | 0x00 | 0x00 |
61+
+---------------+---------------+---------------+---------------+
62+
36| 0x00 | 0x00 | 0x00 | 0x01 |
63+
+---------------+---------------+---------------+---------------+
64+
40| 0x00 | 0x00 | 0x00 | 0x00 |
65+
+---------------+---------------+---------------+---------------+
66+
44| 0x00 | 0x00 | 0x00 | 0x00 |
67+
+---------------+---------------+---------------+---------------+
68+
48| 0x00 | 0x00 | 0x00 | 0x00 |
69+
+---------------+---------------+---------------+---------------+
70+
52| 0x00 | 0x00 | 0x01 | 0x68 ('h') |
71+
+---------------+---------------+---------------+---------------+
72+
56| 0x65 ('e') | 0x6c ('l') | 0x6c ('l') | 0x6f ('o') |
73+
+---------------+---------------+---------------+---------------+
74+
60| 0x77 ('w') | 0x6f ('o') | 0x72 ('r') | 0x6c ('l') |
75+
+---------------+---------------+---------------+---------------+
76+
64| 0x64 ('d') |
77+
+---------------+
78+
DCP_PREPARE command
79+
Field (offset) (value)
80+
Magic (0) : 0x80
81+
Opcode (1) : 0x60
82+
Key length (2,3) : 0x0005
83+
Extra length (4) : 0x1f
84+
Data type (5) : 0x00
85+
Vbucket (6,7) : 0x0210
86+
Total body (8-11) : 0x00000029
87+
Opaque (12-15): 0x00001210
88+
CAS (16-23): 0x0000000000000000
89+
by seqno (24-31): 0x0000000000000004
90+
rev seqno (32-39): 0x0000000000000001
91+
flags (40-43): 0x00000000
92+
expiration (44-47): 0x00000000
93+
lock time (48-51): 0x00000000
94+
nru (52) : 0
95+
deleted (53) : 0
96+
durability (54) : 1
97+
Key (55-59): hello
98+
Value (60-65): world
99+
100+
### Returns
101+
102+
This message will not return a response unless an error occurs.
103+
104+
### DCP buffer acknowledgement
105+
106+
When the producer sends a DCP prepare an issue (MB-46634) means extra 2 bytes are accounted for,
107+
these bytes are not sent to the consumer, but the consumer must include them
108+
in buffer acknowledgement messages.

engines/ep/src/dcp/passive_stream.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -466,10 +466,12 @@ process_items_error_t PassiveStream::processBufferedMessages(
466466
static_cast<MutationConsumerMessage*>(response.get()));
467467
break;
468468
case DcpResponse::Event::Commit:
469-
ret = processCommit(static_cast<CommitSyncWrite&>(*response));
469+
ret = processCommit(
470+
static_cast<CommitSyncWriteConsumer&>(*response));
470471
break;
471472
case DcpResponse::Event::Abort:
472-
ret = processAbort(dynamic_cast<AbortSyncWrite&>(*response));
473+
ret = processAbort(
474+
dynamic_cast<AbortSyncWriteConsumer&>(*response));
473475
break;
474476
case DcpResponse::Event::SnapshotMarker:
475477
processMarker(static_cast<SnapshotMarker*>(response.get()));

engines/ep/src/dcp/response.cc

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -139,32 +139,32 @@ std::ostream& operator<<(std::ostream& os, const DcpResponse& r) {
139139
return os;
140140
}
141141

142-
CommitSyncWriteConsumer::CommitSyncWriteConsumer(uint32_t opaque,
143-
Vbid vbucket,
144-
uint64_t preparedSeqno,
145-
uint64_t commitSeqno,
146-
const DocKey& key)
147-
: DcpResponse(Event::Commit, opaque, cb::mcbp::DcpStreamId{}),
148-
vbucket(vbucket),
149-
key(key),
150-
payload(preparedSeqno, commitSeqno) {
151-
}
152-
153-
uint32_t CommitSyncWriteConsumer::getMessageSize() const {
154-
throw std::logic_error(
155-
"CommitSyncWriteConsumer::getMessageSize should not be called");
156-
}
157-
158142
CommitSyncWrite::CommitSyncWrite(uint32_t opaque,
159143
Vbid vbucket,
160144
uint64_t preparedSeqno,
161145
uint64_t commitSeqno,
162146
const DocKey& key,
163147
DocKeyEncodesCollectionId includeCollectionID)
164-
: CommitSyncWriteConsumer(opaque, vbucket, preparedSeqno, commitSeqno, key),
148+
: DcpResponse(Event::Commit, opaque, cb::mcbp::DcpStreamId{}),
149+
vbucket(vbucket),
150+
key(key),
151+
payload(preparedSeqno, commitSeqno),
165152
includeCollectionID(includeCollectionID) {
166153
}
167154

155+
CommitSyncWriteConsumer::CommitSyncWriteConsumer(uint32_t opaque,
156+
Vbid vbucket,
157+
uint64_t preparedSeqno,
158+
uint64_t commitSeqno,
159+
const DocKey& key)
160+
: CommitSyncWrite(opaque,
161+
vbucket,
162+
preparedSeqno,
163+
commitSeqno,
164+
key,
165+
key.getEncoding()) {
166+
}
167+
168168
uint32_t CommitSyncWrite::getMessageSize() const {
169169
auto size = commitBaseMsgBytes;
170170
if (includeCollectionID == DocKeyEncodesCollectionId::Yes) {
@@ -175,32 +175,32 @@ uint32_t CommitSyncWrite::getMessageSize() const {
175175
return size;
176176
}
177177

178-
AbortSyncWriteConsumer::AbortSyncWriteConsumer(uint32_t opaque,
179-
Vbid vbucket,
180-
const DocKey& key,
181-
uint64_t preparedSeqno,
182-
uint64_t abortSeqno)
183-
: DcpResponse(Event::Abort, opaque, cb::mcbp::DcpStreamId{}),
184-
vbucket(vbucket),
185-
key(key),
186-
payload(preparedSeqno, abortSeqno) {
187-
}
188-
189-
uint32_t AbortSyncWriteConsumer::getMessageSize() const {
190-
throw std::logic_error(
191-
"AbortSyncWriteConsumer::getMessageSize should not be called");
192-
}
193-
194178
AbortSyncWrite::AbortSyncWrite(uint32_t opaque,
195179
Vbid vbucket,
196180
const DocKey& key,
197181
uint64_t preparedSeqno,
198182
uint64_t abortSeqno,
199183
DocKeyEncodesCollectionId includeCollectionID)
200-
: AbortSyncWriteConsumer(opaque, vbucket, key, preparedSeqno, abortSeqno),
184+
: DcpResponse(Event::Abort, opaque, cb::mcbp::DcpStreamId{}),
185+
vbucket(vbucket),
186+
key(key),
187+
payload(preparedSeqno, abortSeqno),
201188
includeCollectionID(includeCollectionID) {
202189
}
203190

191+
AbortSyncWriteConsumer::AbortSyncWriteConsumer(uint32_t opaque,
192+
Vbid vbucket,
193+
const DocKey& key,
194+
uint64_t preparedSeqno,
195+
uint64_t abortSeqno)
196+
: AbortSyncWrite(opaque,
197+
vbucket,
198+
key,
199+
preparedSeqno,
200+
abortSeqno,
201+
key.getEncoding()) {
202+
}
203+
204204
uint32_t AbortSyncWrite::getMessageSize() const {
205205
auto size = abortBaseMsgBytes;
206206
if (includeCollectionID == DocKeyEncodesCollectionId::Yes) {

engines/ep/src/dcp/response.h

Lines changed: 43 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -642,17 +642,20 @@ class SeqnoAcknowledgement : public DcpResponse {
642642
};
643643

644644
/**
645-
* Represents the Commit of a prepared SyncWrite, consumer side. Cannot call
646-
* getMessageSize
645+
* Represents the Commit of a prepared SyncWrite, producer side. Producer side
646+
* requires the collection mode of the producer to determine how keys are
647+
* encoded.
647648
*/
648-
class CommitSyncWriteConsumer : public DcpResponse {
649+
class CommitSyncWrite : public DcpResponse {
649650
public:
650-
CommitSyncWriteConsumer(uint32_t opaque,
651-
Vbid vbucket,
652-
uint64_t preparedSeqno,
653-
uint64_t commitSeqno,
654-
const DocKey& key);
651+
CommitSyncWrite(uint32_t opaque,
652+
Vbid vbucket,
653+
uint64_t preparedSeqno,
654+
uint64_t commitSeqno,
655+
const DocKey& key,
656+
DocKeyEncodesCollectionId includeCollectionID);
655657

658+
public:
656659
OptionalSeqno getBySeqno() const override {
657660
return OptionalSeqno{payload.getCommitSeqno()};
658661
}
@@ -672,48 +675,44 @@ class CommitSyncWriteConsumer : public DcpResponse {
672675
uint64_t getCommitSeqno() const {
673676
return payload.getCommitSeqno();
674677
}
678+
679+
static constexpr uint32_t commitBaseMsgBytes =
680+
sizeof(protocol_binary_request_header) +
681+
sizeof(cb::mcbp::request::DcpCommitPayload);
675682
uint32_t getMessageSize() const override;
676683

677-
protected:
684+
private:
678685
Vbid vbucket;
679686
StoredDocKey key;
680687
cb::mcbp::request::DcpCommitPayload payload;
688+
DocKeyEncodesCollectionId includeCollectionID;
681689
};
682690

683691
/**
684-
* Represents the Commit of a prepared SyncWrite, producer side. Provides
685-
* getMessageSize
692+
* Represents the Commit of a prepared SyncWrite, consumer side. The key will
693+
* define how getMessageSize calculates the 'ack' size
686694
*/
687-
class CommitSyncWrite : public CommitSyncWriteConsumer {
688-
public:
689-
CommitSyncWrite(uint32_t opaque,
690-
Vbid vbucket,
691-
uint64_t preparedSeqno,
692-
uint64_t commitSeqno,
693-
const DocKey& key,
694-
DocKeyEncodesCollectionId includeCollectionID);
695-
695+
class CommitSyncWriteConsumer : public CommitSyncWrite {
696696
public:
697-
static constexpr uint32_t commitBaseMsgBytes =
698-
sizeof(protocol_binary_request_header) +
699-
sizeof(cb::mcbp::request::DcpCommitPayload);
700-
uint32_t getMessageSize() const override;
701-
702-
private:
703-
DocKeyEncodesCollectionId includeCollectionID;
697+
CommitSyncWriteConsumer(uint32_t opaque,
698+
Vbid vbucket,
699+
uint64_t preparedSeqno,
700+
uint64_t commitSeqno,
701+
const DocKey& key);
704702
};
705703

706704
/**
707-
* Represents the Abort of a prepared SyncWrite. Consumer side, cannot call
708-
* getMessageSize
705+
* Represents the Abort of a prepared SyncWrite. Producer side requires
706+
* the collection mode of the producer to determine how keys are encoded.
709707
*/
710-
class AbortSyncWriteConsumer : public DcpResponse {
708+
class AbortSyncWrite : public DcpResponse {
711709
public:
712-
AbortSyncWriteConsumer(uint32_t opaque,
713-
Vbid vbucket,
714-
const DocKey& key,
715-
uint64_t preparedSeqno,
716-
uint64_t abortSeqno);
710+
AbortSyncWrite(uint32_t opaque,
711+
Vbid vbucket,
712+
const DocKey& key,
713+
uint64_t preparedSeqno,
714+
uint64_t abortSeqno,
715+
DocKeyEncodesCollectionId includeCollectionID);
717716

718717
Vbid getVbucket() const {
719718
return vbucket;
@@ -741,33 +740,27 @@ class AbortSyncWriteConsumer : public DcpResponse {
741740

742741
uint32_t getMessageSize() const override;
743742

744-
protected:
743+
private:
745744
Vbid vbucket;
746745
StoredDocKey key;
747746
cb::mcbp::request::DcpAbortPayload payload;
747+
DocKeyEncodesCollectionId includeCollectionID;
748748
};
749749

750750
/**
751-
* Represents the Abort of a prepared SyncWrite. Producer side, can call
752-
* getMessageSize
751+
* Represents the Abort of a prepared SyncWrite.
753752
*/
754-
class AbortSyncWrite : public AbortSyncWriteConsumer {
753+
class AbortSyncWriteConsumer : public AbortSyncWrite {
755754
public:
756-
AbortSyncWrite(uint32_t opaque,
757-
Vbid vbucket,
758-
const DocKey& key,
759-
uint64_t preparedSeqno,
760-
uint64_t abortSeqno,
761-
DocKeyEncodesCollectionId includeCollectionID);
755+
AbortSyncWriteConsumer(uint32_t opaque,
756+
Vbid vbucket,
757+
const DocKey& key,
758+
uint64_t preparedSeqno,
759+
uint64_t abortSeqno);
762760

763761
static constexpr uint32_t abortBaseMsgBytes =
764762
sizeof(protocol_binary_request_header) +
765763
sizeof(cb::mcbp::request::DcpAbortPayload);
766-
767-
uint32_t getMessageSize() const override;
768-
769-
private:
770-
DocKeyEncodesCollectionId includeCollectionID;
771764
};
772765

773766
/**

engines/ep/tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ ADD_EXECUTABLE(ep-engine_ep_unit_tests
5959
module_tests/defragmenter_test.cc
6060
module_tests/dcp_durability_stream_test.cc
6161
module_tests/dcp_reflection_test.cc
62+
module_tests/dcp_response_test.cc
6263
module_tests/dcp_single_threaded_test.cc
6364
module_tests/dcp_stream_test.cc
6465
module_tests/dcp_stream_ephemeral_test.cc

0 commit comments

Comments
 (0)