Skip to content

Commit d740744

Browse files
committed
GODRIVER-1715 sum up results from batches (#506)
1 parent 35af242 commit d740744

26 files changed

+159
-39
lines changed

mongo/integration/collection_test.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515

1616
"go.mongodb.org/mongo-driver/bson"
1717
"go.mongodb.org/mongo-driver/bson/primitive"
18+
"go.mongodb.org/mongo-driver/event"
1819
"go.mongodb.org/mongo-driver/internal/testutil/assert"
1920
"go.mongodb.org/mongo-driver/mongo"
2021
"go.mongodb.org/mongo-driver/mongo/integration/mtest"
@@ -1154,6 +1155,105 @@ func TestCollection(t *testing.T) {
11541155
mt.Fatalf("expected BulkWrite error %v, got %v", mongo.ErrUnacknowledgedWrite, err)
11551156
}
11561157
})
1158+
mt.Run("insert and delete with batches", func(mt *mtest.T) {
1159+
// grouped together because delete requires the documents to be inserted
1160+
numDocs := 100050
1161+
var insertModels []mongo.WriteModel
1162+
var deleteModels []mongo.WriteModel
1163+
for i := 0; i < numDocs; i++ {
1164+
d := bson.D{
1165+
{"a", int32(i)},
1166+
{"b", int32(i * 2)},
1167+
{"c", int32(i * 3)},
1168+
}
1169+
insertModels = append(insertModels, mongo.NewInsertOneModel().SetDocument(d))
1170+
deleteModels = append(deleteModels, mongo.NewDeleteOneModel().SetFilter(bson.D{}))
1171+
}
1172+
mt.ClearEvents()
1173+
res, err := mt.Coll.BulkWrite(mtest.Background, insertModels)
1174+
assert.Nil(mt, err, "BulkWrite error: %v", err)
1175+
assert.Equal(mt, int64(numDocs), res.InsertedCount, "expected %v inserted documents, got %v", numDocs, res.InsertedCount)
1176+
mt.FilterStartedEvents(func(evt *event.CommandStartedEvent) bool {
1177+
return evt.CommandName == "insert"
1178+
})
1179+
// MaxWriteBatchSize changed between 3.4 and 3.6, so there isn't a given number of batches that this will be split into
1180+
inserts := len(mt.GetAllStartedEvents())
1181+
assert.True(mt, inserts > 1, "expected multiple batches, got %v", inserts)
1182+
1183+
mt.ClearEvents()
1184+
res, err = mt.Coll.BulkWrite(mtest.Background, deleteModels)
1185+
assert.Nil(mt, err, "BulkWrite error: %v", err)
1186+
assert.Equal(mt, int64(numDocs), res.DeletedCount, "expected %v deleted documents, got %v", numDocs, res.DeletedCount)
1187+
mt.FilterStartedEvents(func(evt *event.CommandStartedEvent) bool {
1188+
return evt.CommandName == "delete"
1189+
})
1190+
// MaxWriteBatchSize changed between 3.4 and 3.6, so there isn't a given number of batches that this will be split into
1191+
deletes := len(mt.GetAllStartedEvents())
1192+
assert.True(mt, deletes > 1, "expected multiple batches, got %v", deletes)
1193+
})
1194+
mt.Run("update with batches", func(mt *mtest.T) {
1195+
var models []mongo.WriteModel
1196+
numModels := 100050
1197+
1198+
// it's significantly faster to upsert one model and modify the rest than to upsert all of them
1199+
for i := 0; i < numModels-1; i++ {
1200+
update := bson.D{
1201+
{"$set", bson.D{
1202+
{"a", int32(i + 1)},
1203+
{"b", int32(i * 2)},
1204+
{"c", int32(i * 3)},
1205+
}},
1206+
}
1207+
model := mongo.NewUpdateOneModel().
1208+
SetFilter(bson.D{{"a", int32(i)}}).
1209+
SetUpdate(update).SetUpsert(true)
1210+
models = append(models, model)
1211+
}
1212+
// add one last upsert
1213+
models = append(models, mongo.NewUpdateOneModel().
1214+
SetFilter(bson.D{{"x", int32(1)}}).
1215+
SetUpdate(bson.D{{"$set", bson.D{{"x", int32(1)}}}}).
1216+
SetUpsert(true),
1217+
)
1218+
1219+
mt.ClearEvents()
1220+
res, err := mt.Coll.BulkWrite(mtest.Background, models)
1221+
assert.Nil(mt, err, "BulkWrite error: %v", err)
1222+
1223+
mt.FilterStartedEvents(func(evt *event.CommandStartedEvent) bool {
1224+
return evt.CommandName == "update"
1225+
})
1226+
// MaxWriteBatchSize changed between 3.4 and 3.6, so there isn't a given number of batches that this will be split into
1227+
updates := len(mt.GetAllStartedEvents())
1228+
assert.True(mt, updates > 1, "expected multiple batches, got %v", updates)
1229+
1230+
assert.Equal(mt, int64(numModels-2), res.ModifiedCount, "expected %v modified documents, got %v", numModels-2, res.ModifiedCount)
1231+
assert.Equal(mt, int64(numModels-2), res.MatchedCount, "expected %v matched documents, got %v", numModels-2, res.ModifiedCount)
1232+
assert.Equal(mt, int64(2), res.UpsertedCount, "expected %v upserted documents, got %v", 2, res.UpsertedCount)
1233+
assert.Equal(mt, 2, len(res.UpsertedIDs), "expected %v upserted ids, got %v", 2, len(res.UpsertedIDs))
1234+
1235+
// find the upserted documents and check their contents
1236+
id1, ok := res.UpsertedIDs[0]
1237+
assert.True(mt, ok, "expected id at key 0")
1238+
id2, ok := res.UpsertedIDs[int64(numModels-1)]
1239+
assert.True(mt, ok, "expected id at key %v", numModels-1)
1240+
1241+
doc, err := mt.Coll.FindOne(mtest.Background, bson.D{{"_id", id1}}).DecodeBytes()
1242+
a, ok := doc.Lookup("a").Int32OK()
1243+
assert.True(mt, ok, "expected a to be an int32")
1244+
assert.Equal(mt, int32(numModels-1), a, "expected a value %v, got %v", numModels-1, a)
1245+
b, ok := doc.Lookup("b").Int32OK()
1246+
assert.True(mt, ok, "expected b to be an int32")
1247+
assert.Equal(mt, int32((numModels-2)*2), b, "expected b value %v, got %v", (numModels-2)*2, b)
1248+
c, ok := doc.Lookup("c").Int32OK()
1249+
assert.True(mt, ok, "expected c to be an int32")
1250+
assert.Equal(mt, int32((numModels-2)*3), c, "expected b value %v, got %v", (numModels-2)*3, c)
1251+
1252+
doc, err = mt.Coll.FindOne(mtest.Background, bson.D{{"_id", id2}}).DecodeBytes()
1253+
x, ok := doc.Lookup("x").Int32OK()
1254+
assert.True(mt, ok, "expected x to be an int32")
1255+
assert.Equal(mt, int32(1), x, "expected a value 1, got %v", x)
1256+
})
11571257
})
11581258
}
11591259

x/mongo/driver/batch_cursor.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ func (bc *BatchCursor) getMore(ctx context.Context) {
266266
},
267267
Database: bc.database,
268268
Deployment: SingleServerDeployment{Server: bc.server},
269-
ProcessResponseFn: func(response bsoncore.Document, srvr Server, desc description.Server) error {
269+
ProcessResponseFn: func(response bsoncore.Document, srvr Server, desc description.Server, currIndex int) error {
270270
id, ok := response.Lookup("cursor", "id").Int64OK()
271271
if !ok {
272272
return fmt.Errorf("cursor.id should be an int64 but is a BSON %s", response.Lookup("cursor", "id").Type)

x/mongo/driver/operation.go

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ type Operation struct {
113113
// ProcessResponseFn is called after a response to the command is returned. The server is
114114
// provided for types like Cursor that are required to run subsequent commands using the same
115115
// server.
116-
ProcessResponseFn func(response bsoncore.Document, srvr Server, desc description.Server) error
116+
ProcessResponseFn func(response bsoncore.Document, srvr Server, desc description.Server, currIndex int) error
117117

118118
// Selector is the server selector that's used during both initial server selection and
119119
// subsequent selection for retries. Depending on the Deployment implementation, the
@@ -375,9 +375,6 @@ func (op Operation) Execute(ctx context.Context, scratch []byte) error {
375375
op.publishFinishedEvent(ctx, finishedInfo)
376376

377377
var perr error
378-
if op.ProcessResponseFn != nil {
379-
perr = op.ProcessResponseFn(res, srvr, desc.Server)
380-
}
381378
switch tt := err.(type) {
382379
case WriteCommandError:
383380
if e := err.(WriteCommandError); retryable && op.Type == Write && e.UnsupportedStorageEngine() {
@@ -418,6 +415,11 @@ func (op Operation) Execute(ctx context.Context, scratch []byte) error {
418415
continue
419416
}
420417

418+
// If the operation isn't being retried, process the response
419+
if op.ProcessResponseFn != nil {
420+
perr = op.ProcessResponseFn(res, srvr, desc.Server, currIndex)
421+
}
422+
421423
if batching && len(tt.WriteErrors) > 0 && currIndex > 0 {
422424
for i := range tt.WriteErrors {
423425
tt.WriteErrors[i].Index += int64(currIndex)
@@ -498,6 +500,11 @@ func (op Operation) Execute(ctx context.Context, scratch []byte) error {
498500
continue
499501
}
500502

503+
// If the operation isn't being retried, process the response
504+
if op.ProcessResponseFn != nil {
505+
perr = op.ProcessResponseFn(res, srvr, desc.Server, currIndex)
506+
}
507+
501508
if op.Client != nil && op.Client.Committing && (retryableErr || tt.Code == 50) {
502509
// If we got a retryable error or MaxTimeMSExpired error, we add UnknownTransactionCommitResult.
503510
tt.Labels = append(tt.Labels, UnknownTransactionCommitResult)
@@ -507,10 +514,16 @@ func (op Operation) Execute(ctx context.Context, scratch []byte) error {
507514
if moreToCome {
508515
return ErrUnacknowledgedWrite
509516
}
517+
if op.ProcessResponseFn != nil {
518+
perr = op.ProcessResponseFn(res, srvr, desc.Server, currIndex)
519+
}
510520
if perr != nil {
511521
return perr
512522
}
513523
default:
524+
if op.ProcessResponseFn != nil {
525+
perr = op.ProcessResponseFn(res, srvr, desc.Server, currIndex)
526+
}
514527
return err
515528
}
516529

x/mongo/driver/operation/abort_transaction.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x/mongo/driver/operation/aggregate.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x/mongo/driver/operation/command.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func (c *Command) Execute(ctx context.Context) error {
5858
CommandFn: func(dst []byte, desc description.SelectedServer) ([]byte, error) {
5959
return append(dst, c.command[4:len(c.command)-1]...), nil
6060
},
61-
ProcessResponseFn: func(resp bsoncore.Document, srvr driver.Server, desc description.Server) error {
61+
ProcessResponseFn: func(resp bsoncore.Document, srvr driver.Server, desc description.Server, currIndex int) error {
6262
c.result = resp
6363
c.srvr = srvr
6464
c.desc = desc

x/mongo/driver/operation/commit_transaction.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x/mongo/driver/operation/count.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x/mongo/driver/operation/create.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x/mongo/driver/operation/createIndexes.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)