-
Notifications
You must be signed in to change notification settings - Fork 918
GODRIVER-3322 Apply client-level timeout to UploadFromStreamWithID #1782
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
bb51db3
b8a04d9
3ad5a92
420420e
9d9476c
403ad7b
03b0666
411d635
5e316f9
40f5d47
401fb07
a444deb
b879cb1
24a80e5
fe97083
9b0badb
b397aa2
ee199ea
632a7f7
7c38b9a
1e1787b
caa5bfd
bf50559
93c3e45
bbd0a16
a3e16d8
cf49d4a
d45177e
a83b295
dd9e3e7
5230e01
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,7 @@ | |
package integration | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"strings" | ||
"testing" | ||
|
@@ -83,6 +84,126 @@ func TestCSOTProse(t *testing.T) { | |
assert.Equal(mt, started[1].CommandName, | ||
"insert", "expected a second insert event, got %v", started[1].CommandName) | ||
}) | ||
|
||
mt.RunOpts("6. gridfs - upload", mtest.NewOptions().MinServerVersion("4.4"), func(mt *mtest.T) { | ||
// Drop and re-create the db.fs.files and db.fs.chunks collections. | ||
err := mt.Client.Database("db").Collection("fs.files").Drop(context.Background()) | ||
assert.NoError(t, err, "failed to drop files") | ||
|
||
err = mt.Client.Database("db").Collection("fs.chunks").Drop(context.Background()) | ||
assert.NoError(t, err, "failed to drop chunks") | ||
|
||
// Set a blocking "insert" fail point. | ||
mt.SetFailPoint(mtest.FailPoint{ | ||
ConfigureFailPoint: "failCommand", | ||
Mode: mtest.FailPointMode{ | ||
Times: 1, | ||
}, | ||
Data: mtest.FailPointData{ | ||
FailCommands: []string{"insert"}, | ||
BlockConnection: true, | ||
BlockTimeMS: 15, | ||
}, | ||
}) | ||
|
||
// Create a new MongoClient with timeoutMS=10. | ||
cliOptions := options.Client().SetTimeout(10).ApplyURI(mtest.ClusterURI()) | ||
integtest.AddTestServerAPIVersion(cliOptions) | ||
|
||
client, err := mongo.Connect(cliOptions) | ||
assert.NoError(t, err, "failed to connect to server") | ||
|
||
// Create a GridFS bucket that wraps the db database. | ||
bucket := client.Database("db").GridFSBucket() | ||
|
||
// Note that UploadFromStream accounts for the following steps: | ||
// - Call bucket.open_upload_stream() | ||
// - Using uploadStream, upload a single 0x12 byte | ||
// - Call uploadStream.close() to flush the stream and insert chunks | ||
_, err = bucket.UploadFromStream(context.Background(), "filename", bytes.NewReader([]byte{0x12})) | ||
assert.ErrorIs(t, err, context.DeadlineExceeded) | ||
}) | ||
|
||
const test61 = "6.1 gridfs - upload and download with non-expiring client-level timeout" | ||
mt.RunOpts(test61, mtest.NewOptions().MinServerVersion("4.4"), func(mt *mtest.T) { | ||
// Drop and re-create the db.fs.files and db.fs.chunks collections. | ||
err := mt.Client.Database("db").Collection("fs.files").Drop(context.Background()) | ||
assert.NoError(t, err, "failed to drop files") | ||
|
||
err = mt.Client.Database("db").Collection("fs.chunks").Drop(context.Background()) | ||
assert.NoError(t, err, "failed to drop chunks") | ||
|
||
// Create a new MongoClient with timeoutMS=10. | ||
cliOptions := options.Client().SetTimeout(500 * time.Millisecond).ApplyURI(mtest.ClusterURI()) | ||
qingyang-hu marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
integtest.AddTestServerAPIVersion(cliOptions) | ||
|
||
client, err := mongo.Connect(cliOptions) | ||
assert.NoError(t, err, "failed to connect to server") | ||
|
||
// Create a GridFS bucket that wraps the db database. | ||
bucket := client.Database("db").GridFSBucket() | ||
|
||
// Upload file and ensure it uploaded correctly. | ||
fileID, err := bucket.UploadFromStream(context.Background(), "filename", bytes.NewReader([]byte{0x12})) | ||
assert.NoError(t, err, "failed to upload stream") | ||
|
||
buf := bytes.Buffer{} | ||
|
||
_, err = bucket.DownloadToStream(context.Background(), fileID, &buf) | ||
assert.NoError(t, err, "failed to download stream") | ||
assert.Equal(t, buf.Len(), 1) | ||
assert.Equal(t, buf.Bytes(), []byte{0x12}) | ||
}) | ||
|
||
const test62 = "6.2 gridfs - upload with operation-level timeout" | ||
mt.RunOpts(test62, mtest.NewOptions().MinServerVersion("4.4"), func(mt *mtest.T) { | ||
// Drop and re-create the db.fs.files and db.fs.chunks collections. | ||
err := mt.Client.Database("db").Collection("fs.files").Drop(context.Background()) | ||
assert.NoError(t, err, "failed to drop files") | ||
|
||
err = mt.Client.Database("db").Collection("fs.chunks").Drop(context.Background()) | ||
assert.NoError(t, err, "failed to drop chunks") | ||
|
||
// Set a blocking "insert" fail point. | ||
mt.SetFailPoint(mtest.FailPoint{ | ||
ConfigureFailPoint: "failCommand", | ||
Mode: mtest.FailPointMode{ | ||
Times: 1, | ||
}, | ||
Data: mtest.FailPointData{ | ||
FailCommands: []string{"insert"}, | ||
BlockConnection: true, | ||
BlockTimeMS: 15, | ||
}, | ||
}) | ||
|
||
// Create a new MongoClient with timeoutMS=10. | ||
|
||
cliOptions := options.Client().SetTimeout(10 * time.Second).ApplyURI(mtest.ClusterURI()) | ||
integtest.AddTestServerAPIVersion(cliOptions) | ||
|
||
client, err := mongo.Connect(cliOptions) | ||
assert.NoError(t, err, "failed to connect to server") | ||
|
||
// Create a GridFS bucket that wraps the db database. | ||
bucket := client.Database("db").GridFSBucket() | ||
|
||
// If the operation-level context is not respected, then the client-level | ||
// timeout will exceed deadline. | ||
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) | ||
|
||
defer cancel() | ||
|
||
// Upload file and ensure it uploaded correctly. | ||
fileID, err := bucket.UploadFromStream(ctx, "filename", bytes.NewReader([]byte{0x12})) | ||
assert.NoError(t, err, "failed to upload stream") | ||
|
||
buf := bytes.Buffer{} | ||
|
||
_, err = bucket.DownloadToStream(ctx, fileID, &buf) | ||
assert.NoError(t, err, "failed to download stream") | ||
assert.Equal(t, buf.Len(), 1) | ||
assert.Equal(t, buf.Bytes(), []byte{0x12}) | ||
}) | ||
|
||
mt.Run("8. server selection", func(mt *mtest.T) { | ||
cliOpts := options.Client().ApplyURI("mongodb://invalid/?serverSelectionTimeoutMS=100") | ||
mtOpts := mtest.NewOptions().ClientOptions(cliOpts).CreateCollection(false) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did you mean
?