Skip to content

Commit 3660a5a

Browse files
jimwwalkerdaverigby
authored andcommitted
MB-46628: Consumer must be able to call CommitSyncWrite::getMessageSize
An update to commit/abort handling added an exception to both CommitSyncWriteConsumer and AbortSyncWriteConsumer, this was incorrect and meant that when DCP messages are buffered, we hit the exception. This commit tweaks the CommitSyncWrite hierarchy and allows getMessageSize to be called for both producer/consumer. Test cases added in this commit noted that DcpPrepare has an incorrect buffer size, now tracked as MB-46634. This commit adds very basic documentation for dcp prepare to make this issue more evident. Change-Id: Ic504de6103f8bdbf9f9f258d6bcbf01b55a7e408 Reviewed-on: http://review.couchbase.org/c/kv_engine/+/154677 Well-Formed: Build Bot <[email protected]> Reviewed-by: Dave Rigby <[email protected]> Tested-by: Dave Rigby <[email protected]>
1 parent 4c56d34 commit 3660a5a

File tree

7 files changed

+475
-86
lines changed

7 files changed

+475
-86
lines changed
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
### Prepare (0x60)
2+
3+
Tells the consumer that the message contains a prepare (first stage of a
4+
durable write).
5+
6+
The request:
7+
* Must have extras
8+
* Must have key
9+
* May have value
10+
11+
Extra looks like:
12+
13+
Byte/ 0 | 1 | 2 | 3 |
14+
/ | | | |
15+
|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
16+
+---------------+---------------+---------------+---------------+
17+
0| by_seqo |
18+
| |
19+
+---------------+---------------+---------------+---------------+
20+
8| rev seqno |
21+
| |
22+
+---------------+---------------+---------------+---------------+
23+
16| flags |
24+
+---------------+---------------+---------------+---------------+
25+
20| expiration |
26+
+---------------+---------------+---------------+---------------+
27+
24| lock_time |
28+
+---------------+---------------+---------------+---------------+
29+
28| NRU | deleted | durability |
30+
+---------------+---------------+---------------+
31+
Total 31 bytes
32+
33+
NRU is an internal field used by the server and may safely be ignored by other consumers.
34+
35+
The consumer should not send a reply to this command. The following example shows the breakdown of the message:
36+
37+
The following message is a prepare of key "hello" with durability level of 'Majority'
38+
39+
40+
Byte/ 0 | 1 | 2 | 3 |
41+
/ | | | |
42+
|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
43+
+---------------+---------------+---------------+---------------+
44+
0| 0x80 | 0x57 | 0x00 | 0x05 |
45+
+---------------+---------------+---------------+---------------+
46+
4| 0x1f | 0x00 | 0x02 | 0x10 |
47+
+---------------+---------------+---------------+---------------+
48+
8| 0x00 | 0x00 | 0x00 | 0x29 |
49+
+---------------+---------------+---------------+---------------+
50+
12| 0x00 | 0x00 | 0x12 | 0x10 |
51+
+---------------+---------------+---------------+---------------+
52+
16| 0x00 | 0x00 | 0x00 | 0x00 |
53+
+---------------+---------------+---------------+---------------+
54+
20| 0x00 | 0x00 | 0x00 | 0x00 |
55+
+---------------+---------------+---------------+---------------+
56+
24| 0x00 | 0x00 | 0x00 | 0x00 |
57+
+---------------+---------------+---------------+---------------+
58+
28| 0x00 | 0x00 | 0x00 | 0x04 |
59+
+---------------+---------------+---------------+---------------+
60+
32| 0x00 | 0x00 | 0x00 | 0x00 |
61+
+---------------+---------------+---------------+---------------+
62+
36| 0x00 | 0x00 | 0x00 | 0x01 |
63+
+---------------+---------------+---------------+---------------+
64+
40| 0x00 | 0x00 | 0x00 | 0x00 |
65+
+---------------+---------------+---------------+---------------+
66+
44| 0x00 | 0x00 | 0x00 | 0x00 |
67+
+---------------+---------------+---------------+---------------+
68+
48| 0x00 | 0x00 | 0x00 | 0x00 |
69+
+---------------+---------------+---------------+---------------+
70+
52| 0x00 | 0x00 | 0x01 | 0x68 ('h') |
71+
+---------------+---------------+---------------+---------------+
72+
56| 0x65 ('e') | 0x6c ('l') | 0x6c ('l') | 0x6f ('o') |
73+
+---------------+---------------+---------------+---------------+
74+
60| 0x77 ('w') | 0x6f ('o') | 0x72 ('r') | 0x6c ('l') |
75+
+---------------+---------------+---------------+---------------+
76+
64| 0x64 ('d') |
77+
+---------------+
78+
DCP_PREPARE command
79+
Field (offset) (value)
80+
Magic (0) : 0x80
81+
Opcode (1) : 0x60
82+
Key length (2,3) : 0x0005
83+
Extra length (4) : 0x1f
84+
Data type (5) : 0x00
85+
Vbucket (6,7) : 0x0210
86+
Total body (8-11) : 0x00000029
87+
Opaque (12-15): 0x00001210
88+
CAS (16-23): 0x0000000000000000
89+
by seqno (24-31): 0x0000000000000004
90+
rev seqno (32-39): 0x0000000000000001
91+
flags (40-43): 0x00000000
92+
expiration (44-47): 0x00000000
93+
lock time (48-51): 0x00000000
94+
nru (52) : 0
95+
deleted (53) : 0
96+
durability (54) : 1
97+
Key (55-59): hello
98+
Value (60-65): world
99+
100+
### Returns
101+
102+
This message will not return a response unless an error occurs.
103+
104+
### DCP buffer acknowledgement
105+
106+
When the producer sends a DCP prepare an issue (MB-46634) means extra 2 bytes are accounted for,
107+
these bytes are not sent to the consumer, but the consumer must include them
108+
in buffer acknowledgement messages.

engines/ep/src/dcp/passive_stream.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -466,10 +466,12 @@ process_items_error_t PassiveStream::processBufferedMessages(
466466
static_cast<MutationConsumerMessage*>(response.get()));
467467
break;
468468
case DcpResponse::Event::Commit:
469-
ret = processCommit(static_cast<CommitSyncWrite&>(*response));
469+
ret = processCommit(
470+
static_cast<CommitSyncWriteConsumer&>(*response));
470471
break;
471472
case DcpResponse::Event::Abort:
472-
ret = processAbort(dynamic_cast<AbortSyncWrite&>(*response));
473+
ret = processAbort(
474+
dynamic_cast<AbortSyncWriteConsumer&>(*response));
473475
break;
474476
case DcpResponse::Event::SnapshotMarker:
475477
processMarker(static_cast<SnapshotMarker*>(response.get()));

engines/ep/src/dcp/response.cc

Lines changed: 34 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -139,32 +139,32 @@ std::ostream& operator<<(std::ostream& os, const DcpResponse& r) {
139139
return os;
140140
}
141141

142-
CommitSyncWriteConsumer::CommitSyncWriteConsumer(uint32_t opaque,
143-
Vbid vbucket,
144-
uint64_t preparedSeqno,
145-
uint64_t commitSeqno,
146-
const DocKey& key)
147-
: DcpResponse(Event::Commit, opaque, cb::mcbp::DcpStreamId{}),
148-
vbucket(vbucket),
149-
key(key),
150-
payload(preparedSeqno, commitSeqno) {
151-
}
152-
153-
uint32_t CommitSyncWriteConsumer::getMessageSize() const {
154-
throw std::logic_error(
155-
"CommitSyncWriteConsumer::getMessageSize should not be called");
156-
}
157-
158142
CommitSyncWrite::CommitSyncWrite(uint32_t opaque,
159143
Vbid vbucket,
160144
uint64_t preparedSeqno,
161145
uint64_t commitSeqno,
162146
const DocKey& key,
163147
DocKeyEncodesCollectionId includeCollectionID)
164-
: CommitSyncWriteConsumer(opaque, vbucket, preparedSeqno, commitSeqno, key),
148+
: DcpResponse(Event::Commit, opaque, cb::mcbp::DcpStreamId{}),
149+
vbucket(vbucket),
150+
key(key),
151+
payload(preparedSeqno, commitSeqno),
165152
includeCollectionID(includeCollectionID) {
166153
}
167154

155+
CommitSyncWriteConsumer::CommitSyncWriteConsumer(uint32_t opaque,
156+
Vbid vbucket,
157+
uint64_t preparedSeqno,
158+
uint64_t commitSeqno,
159+
const DocKey& key)
160+
: CommitSyncWrite(opaque,
161+
vbucket,
162+
preparedSeqno,
163+
commitSeqno,
164+
key,
165+
key.getEncoding()) {
166+
}
167+
168168
uint32_t CommitSyncWrite::getMessageSize() const {
169169
auto size = commitBaseMsgBytes;
170170
if (includeCollectionID == DocKeyEncodesCollectionId::Yes) {
@@ -175,32 +175,32 @@ uint32_t CommitSyncWrite::getMessageSize() const {
175175
return size;
176176
}
177177

178-
AbortSyncWriteConsumer::AbortSyncWriteConsumer(uint32_t opaque,
179-
Vbid vbucket,
180-
const DocKey& key,
181-
uint64_t preparedSeqno,
182-
uint64_t abortSeqno)
183-
: DcpResponse(Event::Abort, opaque, cb::mcbp::DcpStreamId{}),
184-
vbucket(vbucket),
185-
key(key),
186-
payload(preparedSeqno, abortSeqno) {
187-
}
188-
189-
uint32_t AbortSyncWriteConsumer::getMessageSize() const {
190-
throw std::logic_error(
191-
"AbortSyncWriteConsumer::getMessageSize should not be called");
192-
}
193-
194178
AbortSyncWrite::AbortSyncWrite(uint32_t opaque,
195179
Vbid vbucket,
196180
const DocKey& key,
197181
uint64_t preparedSeqno,
198182
uint64_t abortSeqno,
199183
DocKeyEncodesCollectionId includeCollectionID)
200-
: AbortSyncWriteConsumer(opaque, vbucket, key, preparedSeqno, abortSeqno),
184+
: DcpResponse(Event::Abort, opaque, cb::mcbp::DcpStreamId{}),
185+
vbucket(vbucket),
186+
key(key),
187+
payload(preparedSeqno, abortSeqno),
201188
includeCollectionID(includeCollectionID) {
202189
}
203190

191+
AbortSyncWriteConsumer::AbortSyncWriteConsumer(uint32_t opaque,
192+
Vbid vbucket,
193+
const DocKey& key,
194+
uint64_t preparedSeqno,
195+
uint64_t abortSeqno)
196+
: AbortSyncWrite(opaque,
197+
vbucket,
198+
key,
199+
preparedSeqno,
200+
abortSeqno,
201+
key.getEncoding()) {
202+
}
203+
204204
uint32_t AbortSyncWrite::getMessageSize() const {
205205
auto size = abortBaseMsgBytes;
206206
if (includeCollectionID == DocKeyEncodesCollectionId::Yes) {

engines/ep/src/dcp/response.h

Lines changed: 43 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -642,17 +642,20 @@ class SeqnoAcknowledgement : public DcpResponse {
642642
};
643643

644644
/**
645-
* Represents the Commit of a prepared SyncWrite, consumer side. Cannot call
646-
* getMessageSize
645+
* Represents the Commit of a prepared SyncWrite, producer side. Producer side
646+
* requires the collection mode of the producer to determine how keys are
647+
* encoded.
647648
*/
648-
class CommitSyncWriteConsumer : public DcpResponse {
649+
class CommitSyncWrite : public DcpResponse {
649650
public:
650-
CommitSyncWriteConsumer(uint32_t opaque,
651-
Vbid vbucket,
652-
uint64_t preparedSeqno,
653-
uint64_t commitSeqno,
654-
const DocKey& key);
651+
CommitSyncWrite(uint32_t opaque,
652+
Vbid vbucket,
653+
uint64_t preparedSeqno,
654+
uint64_t commitSeqno,
655+
const DocKey& key,
656+
DocKeyEncodesCollectionId includeCollectionID);
655657

658+
public:
656659
OptionalSeqno getBySeqno() const override {
657660
return OptionalSeqno{payload.getCommitSeqno()};
658661
}
@@ -672,48 +675,44 @@ class CommitSyncWriteConsumer : public DcpResponse {
672675
uint64_t getCommitSeqno() const {
673676
return payload.getCommitSeqno();
674677
}
678+
679+
static constexpr uint32_t commitBaseMsgBytes =
680+
sizeof(protocol_binary_request_header) +
681+
sizeof(cb::mcbp::request::DcpCommitPayload);
675682
uint32_t getMessageSize() const override;
676683

677-
protected:
684+
private:
678685
Vbid vbucket;
679686
StoredDocKey key;
680687
cb::mcbp::request::DcpCommitPayload payload;
688+
DocKeyEncodesCollectionId includeCollectionID;
681689
};
682690

683691
/**
684-
* Represents the Commit of a prepared SyncWrite, producer side. Provides
685-
* getMessageSize
692+
* Represents the Commit of a prepared SyncWrite, consumer side. The key will
693+
* define how getMessageSize calculates the 'ack' size
686694
*/
687-
class CommitSyncWrite : public CommitSyncWriteConsumer {
688-
public:
689-
CommitSyncWrite(uint32_t opaque,
690-
Vbid vbucket,
691-
uint64_t preparedSeqno,
692-
uint64_t commitSeqno,
693-
const DocKey& key,
694-
DocKeyEncodesCollectionId includeCollectionID);
695-
695+
class CommitSyncWriteConsumer : public CommitSyncWrite {
696696
public:
697-
static constexpr uint32_t commitBaseMsgBytes =
698-
sizeof(protocol_binary_request_header) +
699-
sizeof(cb::mcbp::request::DcpCommitPayload);
700-
uint32_t getMessageSize() const override;
701-
702-
private:
703-
DocKeyEncodesCollectionId includeCollectionID;
697+
CommitSyncWriteConsumer(uint32_t opaque,
698+
Vbid vbucket,
699+
uint64_t preparedSeqno,
700+
uint64_t commitSeqno,
701+
const DocKey& key);
704702
};
705703

706704
/**
707-
* Represents the Abort of a prepared SyncWrite. Consumer side, cannot call
708-
* getMessageSize
705+
* Represents the Abort of a prepared SyncWrite. Producer side requires
706+
* the collection mode of the producer to determine how keys are encoded.
709707
*/
710-
class AbortSyncWriteConsumer : public DcpResponse {
708+
class AbortSyncWrite : public DcpResponse {
711709
public:
712-
AbortSyncWriteConsumer(uint32_t opaque,
713-
Vbid vbucket,
714-
const DocKey& key,
715-
uint64_t preparedSeqno,
716-
uint64_t abortSeqno);
710+
AbortSyncWrite(uint32_t opaque,
711+
Vbid vbucket,
712+
const DocKey& key,
713+
uint64_t preparedSeqno,
714+
uint64_t abortSeqno,
715+
DocKeyEncodesCollectionId includeCollectionID);
717716

718717
Vbid getVbucket() const {
719718
return vbucket;
@@ -741,33 +740,27 @@ class AbortSyncWriteConsumer : public DcpResponse {
741740

742741
uint32_t getMessageSize() const override;
743742

744-
protected:
743+
private:
745744
Vbid vbucket;
746745
StoredDocKey key;
747746
cb::mcbp::request::DcpAbortPayload payload;
747+
DocKeyEncodesCollectionId includeCollectionID;
748748
};
749749

750750
/**
751-
* Represents the Abort of a prepared SyncWrite. Producer side, can call
752-
* getMessageSize
751+
* Represents the Abort of a prepared SyncWrite.
753752
*/
754-
class AbortSyncWrite : public AbortSyncWriteConsumer {
753+
class AbortSyncWriteConsumer : public AbortSyncWrite {
755754
public:
756-
AbortSyncWrite(uint32_t opaque,
757-
Vbid vbucket,
758-
const DocKey& key,
759-
uint64_t preparedSeqno,
760-
uint64_t abortSeqno,
761-
DocKeyEncodesCollectionId includeCollectionID);
755+
AbortSyncWriteConsumer(uint32_t opaque,
756+
Vbid vbucket,
757+
const DocKey& key,
758+
uint64_t preparedSeqno,
759+
uint64_t abortSeqno);
762760

763761
static constexpr uint32_t abortBaseMsgBytes =
764762
sizeof(protocol_binary_request_header) +
765763
sizeof(cb::mcbp::request::DcpAbortPayload);
766-
767-
uint32_t getMessageSize() const override;
768-
769-
private:
770-
DocKeyEncodesCollectionId includeCollectionID;
771764
};
772765

773766
/**

engines/ep/tests/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ ADD_EXECUTABLE(ep-engine_ep_unit_tests
5858
module_tests/defragmenter_test.cc
5959
module_tests/dcp_durability_stream_test.cc
6060
module_tests/dcp_reflection_test.cc
61+
module_tests/dcp_response_test.cc
6162
module_tests/dcp_single_threaded_test.cc
6263
module_tests/dcp_stream_test.cc
6364
module_tests/dcp_stream_ephemeral_test.cc

0 commit comments

Comments
 (0)