Skip to content
Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions mongo/bulk_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,7 @@ func (bw *bulkWrite) runBatch(ctx context.Context, batch bulkWriteBatch) (BulkWr

func (bw *bulkWrite) runInsert(ctx context.Context, batch bulkWriteBatch) (operation.InsertResult, error) {
docs := make([]bsoncore.Document, len(batch.models))
var i int
for _, model := range batch.models {
for i, model := range batch.models {
converted := model.(*InsertOneModel)
doc, err := marshal(converted.Document, bw.collection.bsonOpts, bw.collection.registry)
if err != nil {
Expand All @@ -177,7 +176,6 @@ func (bw *bulkWrite) runInsert(ctx context.Context, batch bulkWriteBatch) (opera
}

docs[i] = doc
i++
}

op := operation.NewInsert(docs...).
Expand Down
84 changes: 74 additions & 10 deletions mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,15 @@ type Client struct {
logger *logger.Logger

// client-side encryption fields
keyVaultClientFLE *Client
keyVaultCollFLE *Collection
mongocryptdFLE *mongocryptdClient
cryptFLE driver.Crypt
metadataClientFLE *Client
internalClientFLE *Client
encryptedFieldsMap map[string]interface{}
authenticator driver.Authenticator
isAutoEncryptionSet bool
keyVaultClientFLE *Client
keyVaultCollFLE *Collection
mongocryptdFLE *mongocryptdClient
cryptFLE driver.Crypt
metadataClientFLE *Client
internalClientFLE *Client
encryptedFieldsMap map[string]interface{}
authenticator driver.Authenticator
}

// Connect creates a new Client and then initializes it using the Connect method. This is equivalent to calling
Expand Down Expand Up @@ -194,6 +195,7 @@ func NewClient(opts ...*options.ClientOptions) (*Client, error) {
}
// AutoEncryptionOptions
if clientOpt.AutoEncryptionOptions != nil {
client.isAutoEncryptionSet = true
if err := client.configureAutoEncryption(clientOpt); err != nil {
return nil, err
}
Expand Down Expand Up @@ -424,8 +426,6 @@ func (c *Client) StartSession(opts ...*options.SessionOptions) (Session, error)
return nil, replaceErrors(err)
}

// Writes are not retryable on standalones, so let operation determine whether to retry
sess.RetryWrite = false
sess.RetryRead = c.retryReads

return &sessionImpl{
Expand Down Expand Up @@ -851,6 +851,70 @@ func (c *Client) createBaseCursorOptions() driver.CursorOptions {
}
}

// BulkWrite performs a client-levelbulk write operation.
func (c *Client) BulkWrite(ctx context.Context, models *ClientWriteModels,
opts ...*options.ClientBulkWriteOptions) (*ClientBulkWriteResult, error) {
if c.isAutoEncryptionSet {
return nil, errors.New("bulkWrite does not currently support automatic encryption")
}
bwo := options.MergeClientBulkWriteOptions(opts...)

if ctx == nil {
ctx = context.Background()
}

sess := sessionFromContext(ctx)
if sess == nil && c.sessionPool != nil {
sess = session.NewImplicitClientSession(c.sessionPool, c.id)
defer sess.EndSession()
}

err := c.validSession(sess)
if err != nil {
return nil, err
}

transactionRunning := sess.TransactionRunning()
wc := c.writeConcern
if transactionRunning {
wc = nil
}
if bwo.WriteConcern != nil {
if transactionRunning {
return nil, errors.New("cannot set write concern after starting a transaction")
}
wc = bwo.WriteConcern
}
if !writeconcern.AckWrite(wc) {
sess = nil
}

writeSelector := description.CompositeSelector([]description.ServerSelector{
description.WriteSelector(),
description.LatencySelector(c.localThreshold),
})
selector := makePinnedSelector(sess, writeSelector)

op := clientBulkWrite{
models: models.models,
ordered: bwo.Ordered,
bypassDocumentValidation: bwo.BypassDocumentValidation,
comment: bwo.Comment,
let: bwo.Let,
session: sess,
client: c,
selector: selector,
writeConcern: wc,
}
if bwo.VerboseResults == nil || !(*bwo.VerboseResults) {
op.errorsOnly = true
}
if err = op.execute(ctx); err != nil {
return nil, replaceErrors(err)
}
return &op.result, nil
}

// newLogger will use the LoggerOptions to create an internal logger and publish
// messages using a LogSink.
func newLogger(opts *options.LoggerOptions) (*logger.Logger, error) {
Expand Down
Loading
Loading