@@ -73,8 +73,8 @@ func marshalCommand(cmd *bson.Document) (bson.Reader, error) {
73
73
return cmd .MarshalBSON ()
74
74
}
75
75
76
- // add a session ID to a BSON doc representing a command
77
- func addSessionID (cmd * bson.Document , desc description.SelectedServer , client * session.Client ) error {
76
+ // adds session related fields to a BSON doc representing a command
77
+ func addSessionFields (cmd * bson.Document , desc description.SelectedServer , client * session.Client ) error {
78
78
if client == nil || ! description .SessionsSupported (desc .WireVersion ) || desc .SessionTimeoutMinutes == 0 {
79
79
return nil
80
80
}
@@ -88,9 +88,27 @@ func addSessionID(cmd *bson.Document, desc description.SelectedServer, client *s
88
88
}
89
89
90
90
cmd .Append (bson .EC .SubDocument ("lsid" , client .SessionID ))
91
+
92
+ if client .TransactionRunning () ||
93
+ client .RetryingCommit {
94
+ addTransaction (cmd , client )
95
+ }
96
+
97
+ client .ApplyCommand () // advance the state machine based on a command executing
98
+
91
99
return nil
92
100
}
93
101
102
+ // if in a transaction, add the transaction fields
103
+ func addTransaction (cmd * bson.Document , client * session.Client ) {
104
+ cmd .Append (bson .EC .Int64 ("txnNumber" , client .TxnNumber ))
105
+ if client .TransactionStarting () {
106
+ // When starting transaction, always transition to the next state, even on error
107
+ cmd .Append (bson .EC .Boolean ("startTransaction" , true ))
108
+ }
109
+ cmd .Append (bson .EC .Boolean ("autocommit" , false ))
110
+ }
111
+
94
112
func addClusterTime (cmd * bson.Document , desc description.SelectedServer , sess * session.Client , clock * session.ClusterClock ) error {
95
113
if (clock == nil && sess == nil ) || ! description .SessionsSupported (desc .WireVersion ) {
96
114
return nil
@@ -122,6 +140,16 @@ func addClusterTime(cmd *bson.Document, desc description.SelectedServer, sess *s
122
140
123
141
// add a read concern to a BSON doc representing a command
124
142
func addReadConcern (cmd * bson.Document , desc description.SelectedServer , rc * readconcern.ReadConcern , sess * session.Client ) error {
143
+ // Starting transaction's read concern overrides all others
144
+ if sess != nil && sess .TransactionStarting () && sess .CurrentRc != nil {
145
+ rc = sess .CurrentRc
146
+ }
147
+
148
+ // start transaction must append afterclustertime IF causally consistent and operation time exists
149
+ if rc == nil && sess != nil && sess .TransactionStarting () && sess .Consistent && sess .OperationTime != nil {
150
+ rc = readconcern .New ()
151
+ }
152
+
125
153
if rc == nil {
126
154
return nil
127
155
}
@@ -168,6 +196,25 @@ func addWriteConcern(cmd *bson.Document, wc *writeconcern.WriteConcern) error {
168
196
return nil
169
197
}
170
198
199
+ // Get the error labels from a command response
200
+ func getErrorLabels (rdr * bson.Reader ) ([]string , error ) {
201
+ var labels []string
202
+ labelsElem , err := rdr .Lookup ("errorLabels" )
203
+ if err != bson .ErrElementNotFound {
204
+ return nil , err
205
+ }
206
+ if labelsElem != nil {
207
+ labelsIt , err := labelsElem .Value ().ReaderArray ().Iterator ()
208
+ if err != nil {
209
+ return nil , err
210
+ }
211
+ for labelsIt .Next () {
212
+ labels = append (labels , labelsIt .Element ().Value ().StringValue ())
213
+ }
214
+ }
215
+ return labels , nil
216
+ }
217
+
171
218
// Remove command arguments for insert, update, and delete commands from the BSON document so they can be encoded
172
219
// as a Section 1 payload in OP_MSG
173
220
func opmsgRemoveArray (cmdDoc * bson.Document ) (* bson.Array , string ) {
0 commit comments