Skip to content

Commit 986c539

Browse files
committed
GODRIVER-2962 Merge insert operation into mongo package.
1 parent ad40e61 commit 986c539

File tree

6 files changed

+210
-445
lines changed

6 files changed

+210
-445
lines changed

mongo/bulk_write.go

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -168,54 +168,64 @@ func (bw *bulkWrite) runBatch(ctx context.Context, batch bulkWriteBatch) (BulkWr
168168
return batchRes, batchErr, nil
169169
}
170170

171-
func (bw *bulkWrite) runInsert(ctx context.Context, batch bulkWriteBatch) (operation.InsertResult, error) {
171+
func (bw *bulkWrite) runInsert(ctx context.Context, batch bulkWriteBatch) (insertResult, error) {
172172
docs := make([]bsoncore.Document, len(batch.models))
173173
for i, model := range batch.models {
174174
converted := model.(*InsertOneModel)
175175
doc, err := marshal(converted.Document, bw.collection.bsonOpts, bw.collection.registry)
176176
if err != nil {
177-
return operation.InsertResult{}, err
177+
return insertResult{}, err
178178
}
179179
doc, _, err = ensureID(doc, bson.NilObjectID, bw.collection.bsonOpts, bw.collection.registry)
180180
if err != nil {
181-
return operation.InsertResult{}, err
181+
return insertResult{}, err
182182
}
183183

184184
docs[i] = doc
185185
}
186186

187-
op := operation.NewInsert(docs...).
188-
Session(bw.session).WriteConcern(bw.writeConcern).CommandMonitor(bw.collection.client.monitor).
189-
ServerSelector(bw.selector).ClusterClock(bw.collection.client.clock).
190-
Database(bw.collection.db.name).Collection(bw.collection.name).
191-
Deployment(bw.collection.client.deployment).Crypt(bw.collection.client.cryptFLE).
192-
ServerAPI(bw.collection.client.serverAPI).Timeout(bw.collection.client.timeout).
193-
Logger(bw.collection.client.logger).Authenticator(bw.collection.client.authenticator)
187+
op := insert{
188+
documents: docs,
189+
session: bw.session,
190+
writeConcern: bw.writeConcern,
191+
monitor: bw.collection.client.monitor,
192+
selector: bw.selector,
193+
clock: bw.collection.client.clock,
194+
database: bw.collection.db.name,
195+
collection: bw.collection.name,
196+
deployment: bw.collection.client.deployment,
197+
crypt: bw.collection.client.cryptFLE,
198+
serverAPI: bw.collection.client.serverAPI,
199+
timeout: bw.collection.client.timeout,
200+
logger: bw.collection.client.logger,
201+
authenticator: bw.collection.client.authenticator,
202+
}
203+
194204
if bw.comment != nil {
195205
comment, err := marshalValue(bw.comment, bw.collection.bsonOpts, bw.collection.registry)
196206
if err != nil {
197207
return op.Result(), err
198208
}
199-
op.Comment(comment)
209+
op.comment = comment
200210
}
201211
if bw.bypassDocumentValidation != nil && *bw.bypassDocumentValidation {
202-
op = op.BypassDocumentValidation(*bw.bypassDocumentValidation)
212+
op.bypassDocumentValidation = bw.bypassDocumentValidation
203213
}
204214
if bw.ordered != nil {
205-
op = op.Ordered(*bw.ordered)
215+
op.ordered = bw.ordered
206216
}
207217

208218
retry := driver.RetryNone
209219
if bw.collection.client.retryWrites && batch.canRetry {
210220
retry = driver.RetryOncePerCommand
211221
}
212-
op = op.Retry(retry)
222+
op.retry = &retry
213223

214224
if bw.rawData != nil {
215-
op.RawData(*bw.rawData)
225+
op.rawData = bw.rawData
216226
}
217227
if len(bw.additionalCmd) > 0 {
218-
op.AdditionalCmd(bw.additionalCmd)
228+
op.additionalCmd = bw.additionalCmd
219229
}
220230

221231
err := op.Execute(ctx)

mongo/collection.go

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"go.mongodb.org/mongo-driver/v2/internal/csfle"
1919
"go.mongodb.org/mongo-driver/v2/internal/mongoutil"
2020
"go.mongodb.org/mongo-driver/v2/internal/optionsutil"
21+
"go.mongodb.org/mongo-driver/v2/internal/ptrutil"
2122
"go.mongodb.org/mongo-driver/v2/internal/serverselector"
2223
"go.mongodb.org/mongo-driver/v2/mongo/options"
2324
"go.mongodb.org/mongo-driver/v2/mongo/readconcern"
@@ -306,42 +307,53 @@ func (coll *Collection) insert(
306307

307308
selector := makePinnedSelector(sess, coll.writeSelector)
308309

309-
op := operation.NewInsert(docs...).
310-
Session(sess).WriteConcern(wc).CommandMonitor(coll.client.monitor).
311-
ServerSelector(selector).ClusterClock(coll.client.clock).
312-
Database(coll.db.name).Collection(coll.name).
313-
Deployment(coll.client.deployment).Crypt(coll.client.cryptFLE).Ordered(true).
314-
ServerAPI(coll.client.serverAPI).Timeout(coll.client.timeout).Logger(coll.client.logger).Authenticator(coll.client.authenticator)
310+
op := insert{
311+
documents: docs,
312+
session: sess,
313+
writeConcern: wc,
314+
monitor: coll.client.monitor,
315+
selector: selector,
316+
clock: coll.client.clock,
317+
database: coll.db.name,
318+
collection: coll.name,
319+
deployment: coll.client.deployment,
320+
crypt: coll.client.cryptFLE,
321+
ordered: ptrutil.Ptr(true),
322+
serverAPI: coll.client.serverAPI,
323+
timeout: coll.client.timeout,
324+
logger: coll.client.logger,
325+
authenticator: coll.client.authenticator,
326+
}
315327

316328
args, err := mongoutil.NewOptions[options.InsertManyOptions](opts...)
317329
if err != nil {
318330
return nil, fmt.Errorf("failed to construct options from builder: %w", err)
319331
}
320332

321333
if args.BypassDocumentValidation != nil && *args.BypassDocumentValidation {
322-
op = op.BypassDocumentValidation(*args.BypassDocumentValidation)
334+
op.bypassDocumentValidation = args.BypassDocumentValidation
323335
}
324336
if args.Comment != nil {
325337
comment, err := marshalValue(args.Comment, coll.bsonOpts, coll.registry)
326338
if err != nil {
327339
return nil, err
328340
}
329-
op = op.Comment(comment)
341+
op.comment = comment
330342
}
331343
if args.Ordered != nil {
332-
op = op.Ordered(*args.Ordered)
344+
op.ordered = args.Ordered
333345
}
334346
if rawData, ok := optionsutil.Value(args.Internal, "rawData").(bool); ok {
335-
op = op.RawData(rawData)
347+
op.rawData = &rawData
336348
}
337349
if additionalCmd, ok := optionsutil.Value(args.Internal, "addCommandFields").(bson.D); ok {
338-
op = op.AdditionalCmd(additionalCmd)
350+
op.additionalCmd = additionalCmd
339351
}
340352
retry := driver.RetryNone
341353
if coll.client.retryWrites {
342354
retry = driver.RetryOncePerCommand
343355
}
344-
op = op.Retry(retry)
356+
op.retry = &retry
345357

346358
err = op.Execute(ctx)
347359
var wce driver.WriteCommandError

mongo/insert.go

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
// Copyright (C) MongoDB, Inc. 2019-present.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License"); you may
4+
// not use this file except in compliance with the License. You may obtain
5+
// a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
6+
7+
package mongo
8+
9+
import (
10+
"context"
11+
"errors"
12+
"fmt"
13+
"time"
14+
15+
"go.mongodb.org/mongo-driver/v2/bson"
16+
"go.mongodb.org/mongo-driver/v2/event"
17+
"go.mongodb.org/mongo-driver/v2/internal/driverutil"
18+
"go.mongodb.org/mongo-driver/v2/internal/logger"
19+
"go.mongodb.org/mongo-driver/v2/mongo/writeconcern"
20+
"go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore"
21+
"go.mongodb.org/mongo-driver/v2/x/mongo/driver"
22+
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/description"
23+
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/session"
24+
)
25+
26+
// insert performs an insert operation.
27+
type insert struct {
28+
authenticator driver.Authenticator
29+
bypassDocumentValidation *bool
30+
comment bsoncore.Value
31+
documents []bsoncore.Document
32+
ordered *bool
33+
session *session.Client
34+
clock *session.ClusterClock
35+
collection string
36+
monitor *event.CommandMonitor
37+
crypt driver.Crypt
38+
database string
39+
deployment driver.Deployment
40+
selector description.ServerSelector
41+
writeConcern *writeconcern.WriteConcern
42+
retry *driver.RetryMode
43+
result insertResult
44+
serverAPI *driver.ServerAPIOptions
45+
timeout *time.Duration
46+
rawData *bool
47+
additionalCmd bson.D
48+
logger *logger.Logger
49+
}
50+
51+
// insertResult represents an insert result returned by the server.
52+
type insertResult struct {
53+
// Number of documents successfully inserted.
54+
N int64
55+
}
56+
57+
func buildInsertResult(response bsoncore.Document) (insertResult, error) {
58+
elements, err := response.Elements()
59+
if err != nil {
60+
return insertResult{}, err
61+
}
62+
ir := insertResult{}
63+
for _, element := range elements {
64+
if element.Key() == "n" {
65+
var ok bool
66+
ir.N, ok = element.Value().AsInt64OK()
67+
if !ok {
68+
return ir, fmt.Errorf("response field 'n' is type int32 or int64, but received BSON type %s", element.Value().Type)
69+
}
70+
}
71+
}
72+
return ir, nil
73+
}
74+
75+
// Result returns the result of executing this operation.
76+
func (i *insert) Result() insertResult { return i.result }
77+
78+
func (i *insert) processResponse(_ context.Context, resp bsoncore.Document, _ driver.ResponseInfo) error {
79+
ir, err := buildInsertResult(resp)
80+
i.result.N += ir.N
81+
return err
82+
}
83+
84+
// Execute runs this operations and returns an error if the operation did not execute successfully.
85+
func (i *insert) Execute(ctx context.Context) error {
86+
if i.deployment == nil {
87+
return errors.New("the Insert operation must have a Deployment set before Execute can be called")
88+
}
89+
batches := &driver.Batches{
90+
Identifier: "documents",
91+
Documents: i.documents,
92+
Ordered: i.ordered,
93+
}
94+
95+
return driver.Operation{
96+
CommandFn: i.command,
97+
ProcessResponseFn: i.processResponse,
98+
Batches: batches,
99+
RetryMode: i.retry,
100+
Type: driver.Write,
101+
Client: i.session,
102+
Clock: i.clock,
103+
CommandMonitor: i.monitor,
104+
Crypt: i.crypt,
105+
Database: i.database,
106+
Deployment: i.deployment,
107+
Selector: i.selector,
108+
WriteConcern: i.writeConcern,
109+
ServerAPI: i.serverAPI,
110+
Timeout: i.timeout,
111+
Logger: i.logger,
112+
Name: driverutil.InsertOp,
113+
Authenticator: i.authenticator,
114+
}.Execute(ctx)
115+
116+
}
117+
118+
func (i *insert) command(dst []byte, desc description.SelectedServer) ([]byte, error) {
119+
dst = bsoncore.AppendStringElement(dst, "insert", i.collection)
120+
if i.bypassDocumentValidation != nil && (desc.WireVersion != nil &&
121+
driverutil.VersionRangeIncludes(*desc.WireVersion, 4)) {
122+
123+
dst = bsoncore.AppendBooleanElement(dst, "bypassDocumentValidation", *i.bypassDocumentValidation)
124+
}
125+
if i.comment.Type != bsoncore.Type(0) {
126+
dst = bsoncore.AppendValueElement(dst, "comment", i.comment)
127+
}
128+
if i.ordered != nil {
129+
dst = bsoncore.AppendBooleanElement(dst, "ordered", *i.ordered)
130+
}
131+
// Set rawData for 8.2+ servers.
132+
if i.rawData != nil && desc.WireVersion != nil && driverutil.VersionRangeIncludes(*desc.WireVersion, 27) {
133+
dst = bsoncore.AppendBooleanElement(dst, "rawData", *i.rawData)
134+
}
135+
if len(i.additionalCmd) > 0 {
136+
doc, err := bson.Marshal(i.additionalCmd)
137+
if err != nil {
138+
return nil, err
139+
}
140+
dst = append(dst, doc[4:len(doc)-1]...)
141+
}
142+
return dst, nil
143+
}

x/mongo/driver/integration/insert_test.go

Lines changed: 0 additions & 56 deletions
This file was deleted.

x/mongo/driver/integration/main_test.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import (
1717
"go.mongodb.org/mongo-driver/v2/internal/integtest"
1818
"go.mongodb.org/mongo-driver/v2/internal/require"
1919
"go.mongodb.org/mongo-driver/v2/internal/serverselector"
20+
"go.mongodb.org/mongo-driver/v2/mongo"
21+
"go.mongodb.org/mongo-driver/v2/mongo/options"
2022
"go.mongodb.org/mongo-driver/v2/mongo/writeconcern"
2123
"go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore"
2224
"go.mongodb.org/mongo-driver/v2/x/mongo/driver"
@@ -145,12 +147,20 @@ func autoInsertDocs(t *testing.T, writeConcern *writeconcern.WriteConcern, docs
145147

146148
// insertDocs inserts the docs into the test cluster.
147149
func insertDocs(t *testing.T, dbname, colname string, writeConcern *writeconcern.WriteConcern, docs ...bsoncore.Document) {
148-
err := operation.NewInsert(docs...).
149-
Collection(colname).
150-
Database(dbname).
151-
Deployment(integtest.Topology(t)).
152-
ServerSelector(&serverselector.Write{}).
153-
WriteConcern(writeConcern).
154-
Execute(context.Background())
150+
t.Helper()
151+
152+
// The initial call to integtest.Topology drops the database used by the
153+
// tests, so we have to call it here first to prevent the existing test code
154+
// from dropping the database after we've inserted data.
155+
integtest.Topology(t)
156+
157+
client, err := mongo.Connect(options.Client().ApplyURI(connectionString.Original).SetWriteConcern(writeConcern))
158+
require.NoError(t, err)
159+
defer func() {
160+
_ = client.Disconnect(context.Background())
161+
}()
162+
163+
coll := client.Database(dbname).Collection(colname)
164+
_, err = coll.InsertMany(context.Background(), docs)
155165
require.NoError(t, err)
156166
}

0 commit comments

Comments
 (0)