Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions mongo/bulk_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type bulkWrite struct {
writeConcern *writeconcern.WriteConcern
result BulkWriteResult
let interface{}
bypassEmptyTsReplacement *bool
}

func (bw *bulkWrite) execute(ctx context.Context) error {
Expand Down Expand Up @@ -207,6 +208,10 @@ func (bw *bulkWrite) runInsert(ctx context.Context, batch bulkWriteBatch) (opera
}
op = op.Retry(retry)

if bw.bypassEmptyTsReplacement != nil {
op.BypassEmptyTsReplacement(*bw.bypassEmptyTsReplacement)
}

err := op.Execute(ctx)

return op.Result(), err
Expand Down Expand Up @@ -415,6 +420,10 @@ func (bw *bulkWrite) runUpdate(ctx context.Context, batch bulkWriteBatch) (opera
}
op = op.Retry(retry)

if bw.bypassEmptyTsReplacement != nil {
op.BypassEmptyTsReplacement(*bw.bypassEmptyTsReplacement)
}

err := op.Execute(ctx)

return op.Result(), err
Expand Down
17 changes: 17 additions & 0 deletions mongo/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ func (coll *Collection) BulkWrite(ctx context.Context, models []WriteModel,
selector: selector,
writeConcern: wc,
let: bwo.Let,
bypassEmptyTsReplacement: bwo.BypassEmptyTsReplacement,
}

err = op.execute(ctx)
Expand Down Expand Up @@ -307,6 +308,9 @@ func (coll *Collection) insert(ctx context.Context, documents []interface{},
if imo.Ordered != nil {
op = op.Ordered(*imo.Ordered)
}
if imo.BypassEmptyTsReplacement != nil {
op = op.BypassEmptyTsReplacement(*imo.BypassEmptyTsReplacement)
}
retry := driver.RetryNone
if coll.client.retryWrites {
retry = driver.RetryOncePerCommand
Expand Down Expand Up @@ -355,6 +359,9 @@ func (coll *Collection) InsertOne(ctx context.Context, document interface{},
if ioOpts.Comment != nil {
imOpts.SetComment(ioOpts.Comment)
}
if ioOpts.BypassEmptyTsReplacement != nil {
imOpts.BypassEmptyTsReplacement = ioOpts.BypassEmptyTsReplacement
}
res, err := coll.insert(ctx, []interface{}{document}, imOpts)

rr, err := processWriteError(err)
Expand Down Expand Up @@ -609,6 +616,9 @@ func (coll *Collection) updateOrReplace(ctx context.Context, filter bsoncore.Doc
}
op = op.Comment(comment)
}
if uo.BypassEmptyTsReplacement != nil {
op.BypassEmptyTsReplacement(*uo.BypassEmptyTsReplacement)
}
retry := driver.RetryNone
// retryable writes are only enabled updateOne/replaceOne operations
if !multi && coll.client.retryWrites {
Expand Down Expand Up @@ -760,6 +770,7 @@ func (coll *Collection) ReplaceOne(ctx context.Context, filter interface{},
uOpts.Hint = opt.Hint
uOpts.Let = opt.Let
uOpts.Comment = opt.Comment
uOpts.BypassEmptyTsReplacement = opt.BypassEmptyTsReplacement
updateOptions = append(updateOptions, uOpts)
}

Expand Down Expand Up @@ -1659,6 +1670,9 @@ func (coll *Collection) FindOneAndReplace(ctx context.Context, filter interface{
}
op = op.Let(let)
}
if fo.BypassEmptyTsReplacement != nil {
op = op.BypassEmptyTsReplacement(*fo.BypassEmptyTsReplacement)
}

return coll.findAndModify(ctx, op)
}
Expand Down Expand Up @@ -1765,6 +1779,9 @@ func (coll *Collection) FindOneAndUpdate(ctx context.Context, filter interface{}
}
op = op.Let(let)
}
if fo.BypassEmptyTsReplacement != nil {
op = op.BypassEmptyTsReplacement(*fo.BypassEmptyTsReplacement)
}

return coll.findAndModify(ctx, op)
}
Expand Down
Loading
Loading