Skip to content

Commit 43fb9c2

Browse files
committed
Clarify logic.
1 parent b43f6e6 commit 43fb9c2

File tree

2 files changed

+32
-43
lines changed

2 files changed

+32
-43
lines changed

internal/integration/crud_prose_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,7 @@ func TestErrorsCodeNamePropagated(t *testing.T) {
416416
})
417417
}
418418

419-
func TestClientBulkWrite(t *testing.T) {
419+
func TestClientBulkWriteProse(t *testing.T) {
420420
mtOpts := mtest.NewOptions().MinServerVersion("8.0").AtlasDataLake(false).ClientType(mtest.Pinned)
421421
mt := mtest.New(t, mtOpts)
422422

@@ -918,6 +918,9 @@ func TestClientBulkWrite(t *testing.T) {
918918
})
919919

920920
mt.Run("13. MongoClient.bulkWrite returns an error if auto-encryption is configured", func(mt *mtest.T) {
921+
if !mtest.IsCSFLEEnabled() {
922+
mt.Skip("CSFLE is not enabled")
923+
}
921924
if os.Getenv("DOCKER_RUNNING") != "" {
922925
mt.Skip("skipping test in docker environment")
923926
}

x/mongo/driver/operation.go

Lines changed: 28 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ type startedInformation struct {
106106
cmd bsoncore.Document
107107
requestID int32
108108
cmdName string
109-
documentSequence []byte
109+
docArray bsoncore.Array
110110
processedBatches int
111111
connID string
112112
driverConnectionID int64
@@ -164,10 +164,10 @@ func redactStartedInformationCmd(info startedInformation) bson.Raw {
164164
cmdCopy = make([]byte, 0, len(info.cmd))
165165
cmdCopy = append(cmdCopy, info.cmd...)
166166

167-
if len(info.documentSequence) > 0 {
167+
if len(info.docArray) > 0 {
168168
// remove 0 byte at end
169169
cmdCopy = cmdCopy[:len(info.cmd)-1]
170-
cmdCopy = append(cmdCopy, info.documentSequence...)
170+
cmdCopy = append(cmdCopy, info.docArray...)
171171
// add back 0 byte and update length
172172
cmdCopy, _ = bsoncore.AppendDocumentEnd(cmdCopy, 0)
173173
}
@@ -1188,8 +1188,7 @@ func (op Operation) createLegacyHandshakeWireMessage(
11881188
maxTimeMS int64,
11891189
dst []byte,
11901190
desc description.SelectedServer,
1191-
cmdFn func([]byte, description.SelectedServer) ([]byte, error),
1192-
) ([]byte, []byte, error) {
1191+
) (int, []byte, []byte, error) {
11931192
flags := op.secondaryOK(desc)
11941193
dst = wiremessage.AppendQueryFlags(dst, flags)
11951194

@@ -1205,31 +1204,42 @@ func (op Operation) createLegacyHandshakeWireMessage(
12051204
wrapper := int32(-1)
12061205
rp, err := op.createReadPref(desc, true)
12071206
if err != nil {
1208-
return dst, nil, err
1207+
return 0, dst, nil, err
12091208
}
12101209
if len(rp) > 0 {
12111210
wrapper, dst = bsoncore.AppendDocumentStart(dst)
12121211
dst = bsoncore.AppendHeader(dst, bsoncore.TypeEmbeddedDocument, "$query")
12131212
}
12141213
idx, dst := bsoncore.AppendDocumentStart(dst)
1215-
dst, err = cmdFn(dst, desc)
1214+
1215+
dst, err = op.CommandFn(dst, desc)
12161216
if err != nil {
1217-
return dst, nil, err
1217+
return 0, dst, nil, err
1218+
}
1219+
var n int
1220+
if op.Batches != nil {
1221+
n, dst, err = op.Batches.AppendBatchArray(dst, int(desc.MaxBatchCount), int(desc.MaxMessageSize))
1222+
if err != nil {
1223+
return 0, dst, nil, err
1224+
}
1225+
if n == 0 {
1226+
return 0, dst, nil, ErrDocumentTooLarge
1227+
}
12181228
}
12191229

12201230
dst, err = op.addReadConcern(dst, desc)
12211231
if err != nil {
1222-
return dst, nil, err
1232+
return 0, dst, nil, err
12231233
}
12241234

12251235
dst, err = op.addWriteConcern(ctx, dst, desc)
12261236
if err != nil {
1227-
return dst, nil, err
1237+
return 0, dst, nil, err
12281238
}
12291239

12301240
dst, err = op.addSession(dst, desc, false)
12311241
if err != nil {
1232-
return dst, nil, err
1242+
return 0, dst, nil, err
12331243
}
12341244

12351245
dst = op.addClusterTime(dst, desc)
@@ -1248,11 +1258,11 @@ func (op Operation) createLegacyHandshakeWireMessage(
12481258
dst = bsoncore.AppendDocumentElement(dst, "$readPreference", rp)
12491259
dst, err = bsoncore.AppendDocumentEnd(dst, idx)
12501260
if err != nil {
1251-
return dst, nil, err
1261+
return 0, dst, nil, err
12521262
}
12531263
}
12541264

1255-
return dst, dst[idx:], nil
1265+
return n, dst, dst[idx:], nil
12561266
}
12571267

12581268
func (op Operation) createMsgWireMessage(
@@ -1345,13 +1355,9 @@ func (op Operation) createWireMessage(
13451355
isLegacy := isLegacyHandshake(op, desc)
13461356
switch {
13471357
case isLegacy:
1348-
cmdFn := func(dst []byte, desc description.SelectedServer) ([]byte, error) {
1349-
info.processedBatches, dst, err = op.addLegacyCommandFields(dst, desc)
1350-
return dst, err
1351-
}
13521358
requestID := wiremessage.NextRequestID()
13531359
wmindex, dst = wiremessage.AppendHeaderStart(dst, requestID, 0, wiremessage.OpQuery)
1354-
dst, info.cmd, err = op.createLegacyHandshakeWireMessage(ctx, maxTimeMS, dst, desc, cmdFn)
1360+
info.processedBatches, dst, info.cmd, err = op.createLegacyHandshakeWireMessage(ctx, maxTimeMS, dst, desc)
13551361
case op.shouldEncrypt():
13561362
if desc.WireVersion.Max < cryptMinWireVersion {
13571363
return dst, false, info, errors.New("auto-encryption requires a MongoDB version of 4.2")
@@ -1405,13 +1411,13 @@ func (op Operation) createWireMessage(
14051411
}
14061412
if err == nil && batchOffset > 0 {
14071413
for b := dst[batchOffset:]; len(b) > 0; /* nothing */ {
1408-
var seq []byte
1414+
var array bsoncore.Array
14091415
var ok bool
1410-
seq, b, ok = documentSequenceToArray(b)
1416+
array, b, ok = documentSequenceToArray(b)
14111417
if !ok {
14121418
break
14131419
}
1414-
info.documentSequence = append(info.documentSequence, seq...)
1420+
info.docArray = append(info.docArray, array...)
14151421
}
14161422
}
14171423
}
@@ -1471,26 +1477,6 @@ func (op Operation) addEncryptCommandFields(ctx context.Context, dst []byte, des
14711477
return n, dst, nil
14721478
}
14731479

1474-
func (op Operation) addLegacyCommandFields(dst []byte, desc description.SelectedServer) (int, []byte, error) {
1475-
var err error
1476-
dst, err = op.CommandFn(dst, desc)
1477-
if err != nil {
1478-
return 0, nil, err
1479-
}
1480-
if op.Batches == nil {
1481-
return 0, dst, nil
1482-
}
1483-
var n int
1484-
n, dst, err = op.Batches.AppendBatchArray(dst, int(desc.MaxBatchCount), int(desc.MaxMessageSize))
1485-
if err != nil {
1486-
return 0, nil, err
1487-
}
1488-
if n == 0 {
1489-
return 0, nil, ErrDocumentTooLarge
1490-
}
1491-
return n, dst, nil
1492-
}
1493-
14941480
// addServerAPI adds the relevant fields for server API specification to the wire message in dst.
14951481
func (op Operation) addServerAPI(dst []byte) []byte {
14961482
sa := op.ServerAPI
@@ -2233,7 +2219,7 @@ func retryWritesSupported(s description.Server) bool {
22332219
return s.SessionTimeoutMinutes != nil && s.Kind != description.ServerKindStandalone
22342220
}
22352221

2236-
func documentSequenceToArray(src []byte) (dst, rem []byte, ok bool) {
2222+
func documentSequenceToArray(src []byte) (bsoncore.Array, []byte, bool) {
22372223
stype, rem, ok := wiremessage.ReadMsgSectionType(src)
22382224
if !ok || stype != wiremessage.DocumentSequence {
22392225
return nil, src, false

0 commit comments

Comments
 (0)