Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 37 additions & 2 deletions internal/integration/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ import (
"go.mongodb.org/mongo-driver/v2/mongo/writeconcern"
"go.mongodb.org/mongo-driver/v2/x/bsonx/bsoncore"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/description"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/wiremessage"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/xoptions"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -838,8 +840,41 @@ func TestClient_BulkWrite(t *testing.T) {
}

_, err := mt.Client.BulkWrite(context.Background(), writes)
require.NoError(t, err)
assert.Equal(t, 2, bulkWrites, "expected %d bulkWrites, got %d", 2, bulkWrites)
require.NoError(mt, err)
assert.Equal(mt, 2, bulkWrites, "expected %d bulkWrites, got %d", 2, bulkWrites)
})
mt.Run("test options callback", func(mt *mtest.T) {
mt.Parallel()

insertOneModel := mongo.NewClientInsertOneModel().SetDocument(bson.D{{"x", 1}})
writes := []mongo.ClientBulkWrite{{
Database: "foo",
Collection: "bar",
Model: insertOneModel,
}}

marshalValue := func(val interface{}) bson.RawValue {
t.Helper()

valType, data, err := bson.MarshalValue(val)
require.Nil(t, err, "MarshalValue error: %v", err)
return bson.RawValue{
Type: valType,
Value: data,
}
}

opts := options.ClientBulkWrite()
err := xoptions.SetInternalClientBulkWriteOptions(opts, "commandCallback", func(dst []byte, _ description.SelectedServer) ([]byte, error) {
dst = bsoncore.AppendStringElement(dst, "foo", "bar")
return dst, nil
})
require.NoError(mt, err)
_, _ = mt.Client.BulkWrite(context.Background(), writes, opts)
evt := mt.GetStartedEvent()
val := evt.Command.Lookup("foo")
expected := marshalValue("bar")
assert.Equal(mt, expected, val, "expected value to be %s", expected.String())
})
}

Expand Down
10 changes: 10 additions & 0 deletions mongo/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -963,6 +963,16 @@ func (c *Client) BulkWrite(ctx context.Context, writes []ClientBulkWrite,
op.rawData = &rawData
}
}
if bypassEmptyTsReplacementOpt := optionsutil.Value(bwo.Internal, "bypassEmptyTsReplacement"); bypassEmptyTsReplacementOpt != nil {
if bypassEmptyTsReplacement, ok := bypassEmptyTsReplacementOpt.(bool); ok {
op.bypassEmptyTsReplacement = &bypassEmptyTsReplacement
}
}
if commandCallbackOpt := optionsutil.Value(bwo.Internal, "commandCallback"); commandCallbackOpt != nil {
if commandCallback, ok := commandCallbackOpt.(func([]byte, description.SelectedServer) ([]byte, error)); ok {
op.commandCallback = commandCallback
}
}
if bwo.VerboseResults == nil || !(*bwo.VerboseResults) {
op.errorsOnly = true
} else if !acknowledged {
Expand Down
19 changes: 17 additions & 2 deletions mongo/client_bulk_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type clientBulkWrite struct {
selector description.ServerSelector
writeConcern *writeconcern.WriteConcern
rawData *bool
bypassEmptyTsReplacement *bool
commandCallback func([]byte, description.SelectedServer) ([]byte, error)

result ClientBulkWriteResult
}
Expand Down Expand Up @@ -122,7 +124,8 @@ func (bw *clientBulkWrite) execute(ctx context.Context) error {
}

func (bw *clientBulkWrite) newCommand() func([]byte, description.SelectedServer) ([]byte, error) {
return func(dst []byte, desc description.SelectedServer) ([]byte, error) {
return func(cmd []byte, desc description.SelectedServer) ([]byte, error) {
var dst []byte
dst = bsoncore.AppendInt32Element(dst, "bulkWrite", 1)

dst = bsoncore.AppendBooleanElement(dst, "errorsOnly", bw.errorsOnly)
Expand All @@ -148,7 +151,19 @@ func (bw *clientBulkWrite) newCommand() func([]byte, description.SelectedServer)
if bw.rawData != nil && desc.WireVersion != nil && driverutil.VersionRangeIncludes(*desc.WireVersion, 27) {
dst = bsoncore.AppendBooleanElement(dst, "rawData", *bw.rawData)
}
return dst, nil
if bw.bypassEmptyTsReplacement != nil {
dst = bsoncore.AppendBooleanElement(dst, "bypassEmptyTsReplacement", *bw.bypassEmptyTsReplacement)
}
if bw.commandCallback != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC this callback is designed to be a final mutator and so is kind of a dangerous addition to the code since it has to always come last in the function returned by newCommand. At the very least we should note that in a comment.

For long-term maintenance and to avoid bugs, it might be best to enforce this behavior. Perhaps we could use a builder pattern:

type clientBulkWrite struct {
	builder commandBuilder
}
type commandBuilder struct {
	base func([]byte, description.SelectedServer) ([]byte, error)
	callback func([]byte, description.ServerSelector) ([]byte, error)
}

func (b commandBuilder) build(dst []byte, desc description.SelectedServer) ([]byte, error) {
	out, err := b.base(dst, desc)
	if err != nil {
		return nil, err
	}

	// The callback is a final mutator.
	if b.callback != nil {
		out, err := b.callback(out, desc)
		if err != nil {
			return nil, err 
		}
	}

	return out, nil
}

Then in newCommand():

bw.builder.base = func(dst []byte, desc description.SelectedServer) ([]byte, error) {
	// ...
}

return bw.builder.build

var err error
dst, err = bw.commandCallback(dst, desc)
if err != nil {
return nil, err
}
}

cmd = append(cmd, dst...)
return cmd, nil
}
}

Expand Down
19 changes: 19 additions & 0 deletions x/mongo/driver/xoptions/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"go.mongodb.org/mongo-driver/v2/internal/optionsutil"
"go.mongodb.org/mongo-driver/v2/mongo/options"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/description"
)

// SetInternalClientOptions sets internal options for ClientOptions.
Expand Down Expand Up @@ -101,6 +102,24 @@ func SetInternalClientBulkWriteOptions(a *options.ClientBulkWriteOptionsBuilder,
opts.Internal = optionsutil.WithValue(opts.Internal, key, b)
return nil
})
case "bypassEmptyTsReplacement":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we skip adding bypassEmptyTsReplacement, instead deferring to the user to set it using the command callback?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Despite being internal, I recommend retaining helper functions like bypassEmptyTsReplacement since the command callback shouldn't be used frequently.

b, ok := option.(bool)
if !ok {
return typeErrFunc("bool")
}
a.Opts = append(a.Opts, func(opts *options.ClientBulkWriteOptions) error {
opts.Internal = optionsutil.WithValue(opts.Internal, key, b)
return nil
})
case "commandCallback":
cb, ok := option.(func([]byte, description.SelectedServer) ([]byte, error))
if !ok {
return typeErrFunc("func([]byte, description.SelectedServer) ([]byte, error)")
}
a.Opts = append(a.Opts, func(opts *options.ClientBulkWriteOptions) error {
opts.Internal = optionsutil.WithValue(opts.Internal, key, cb)
return nil
})
default:
return fmt.Errorf("unsupported option: %q", key)
}
Expand Down
Loading