Skip to content
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
bb51db3
GODRIVER-3322 Apply client-level timeout to UploadFromStreamWithID
prestonvasquez Aug 30, 2024
b8a04d9
GODRIVER-3322 Add cluster URI
prestonvasquez Aug 30, 2024
3ad5a92
GODRIVER-3322 Add cluster URI
prestonvasquez Aug 30, 2024
420420e
GODRIVER-3322 Incease timeouts
prestonvasquez Aug 30, 2024
9d9476c
Merge branch 'master' into GODRIVER-3322
prestonvasquez Aug 30, 2024
403ad7b
GODRIVER-3322 Refactor context cancelation to close
prestonvasquez Aug 31, 2024
03b0666
Merge branch 'GODRIVER-3322' of github.com:prestonvasquez/mongo-go-dr…
prestonvasquez Aug 31, 2024
411d635
GODRIVER-3322 Update time from S to MS
prestonvasquez Aug 31, 2024
5e316f9
GODRIVER-3322 Lower from 500 to 10 ms
prestonvasquez Aug 31, 2024
40f5d47
GODRIVER-3322 allow more time for client-specific tests
prestonvasquez Aug 31, 2024
401fb07
GODRIVER-3322 Update t to mt
prestonvasquez Aug 31, 2024
a444deb
GODRIVER-3322 Revert timeouts back to prose
prestonvasquez Aug 31, 2024
b879cb1
GODRIVER-3322 Revert timeouts back to prose
prestonvasquez Aug 31, 2024
24a80e5
GODRIVER-3322 Add abort test
prestonvasquez Sep 3, 2024
fe97083
GODRIVER-3322 Use large timeouts for test 6
prestonvasquez Sep 3, 2024
9b0badb
GODRIVER-3322 Use mt over t
prestonvasquez Sep 3, 2024
b397aa2
GODRIVER-3322 Clean up comments
prestonvasquez Sep 3, 2024
ee199ea
GODRIVER-3322 Use only one mongos
prestonvasquez Sep 4, 2024
632a7f7
GODRIVER-3322 Use prose-specific timeouts
prestonvasquez Sep 4, 2024
7c38b9a
GODRIVER-3322 Raise timeout limits
prestonvasquez Sep 4, 2024
1e1787b
GODRIVER-3322 increase timeout
prestonvasquez Sep 4, 2024
caa5bfd
GODRIVER-3322 increase timeout
prestonvasquez Sep 4, 2024
bf50559
Merge branch 'master' into GODRIVER-3322
prestonvasquez Sep 5, 2024
93c3e45
Merge branch 'master' into GODRIVER-3322
prestonvasquez Sep 5, 2024
bbd0a16
GODRIVER-3322 De-parallelize CSOT tests
prestonvasquez Sep 5, 2024
a3e16d8
Merge branch 'GODRIVER-3322' of github.com:prestonvasquez/mongo-go-dr…
prestonvasquez Sep 5, 2024
cf49d4a
GODRIVER-3322 Increase more timeouts
prestonvasquez Sep 5, 2024
d45177e
Merge branch 'master' into GODRIVER-3322
prestonvasquez Sep 5, 2024
a83b295
GODRIVER-3322 Make GridFS it's own test
prestonvasquez Sep 5, 2024
dd9e3e7
Merge branch 'GODRIVER-3322' of github.com:prestonvasquez/mongo-go-dr…
prestonvasquez Sep 5, 2024
5230e01
GODRIVER-3322 Clean up test comments
prestonvasquez Sep 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 274 additions & 0 deletions internal/integration/csot_prose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package integration

import (
"bytes"
"context"
"strings"
"testing"
Expand All @@ -17,6 +18,7 @@ import (
"go.mongodb.org/mongo-driver/v2/internal/assert"
"go.mongodb.org/mongo-driver/v2/internal/integration/mtest"
"go.mongodb.org/mongo-driver/v2/internal/integtest"
"go.mongodb.org/mongo-driver/v2/internal/require"
"go.mongodb.org/mongo-driver/v2/mongo"
"go.mongodb.org/mongo-driver/v2/mongo/options"
)
Expand Down Expand Up @@ -83,6 +85,278 @@ func TestCSOTProse(t *testing.T) {
assert.Equal(mt, started[1].CommandName,
"insert", "expected a second insert event, got %v", started[1].CommandName)
})

mt.RunOpts("6. gridfs - upload", mtest.NewOptions().MinServerVersion("4.4"), func(mt *mtest.T) {
mt.Run("uploads via openUploadStream can be timed out", func(mt *mtest.T) {
// Drop and re-create the db.fs.files and db.fs.chunks collections.
err := mt.Client.Database("db").Collection("fs.files").Drop(context.Background())
assert.NoError(mt, err, "failed to drop files")

err = mt.Client.Database("db").Collection("fs.chunks").Drop(context.Background())
assert.NoError(mt, err, "failed to drop chunks")

// Set a blocking "insert" fail point.
mt.SetFailPoint(mtest.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: mtest.FailPointMode{
Times: 1,
},
Data: mtest.FailPointData{
FailCommands: []string{"insert"},
BlockConnection: true,
BlockTimeMS: 500,
},
})

// Create a new MongoClient with timeoutMS=250.
cliOptions := options.Client().SetTimeout(250 * time.Millisecond).ApplyURI(mtest.ClusterURI())
integtest.AddTestServerAPIVersion(cliOptions)

client, err := mongo.Connect(cliOptions)
assert.NoError(mt, err, "failed to connect to server")

// Create a GridFS bucket that wraps the db database.
bucket := client.Database("db").GridFSBucket()

uploadStream, err := bucket.OpenUploadStream(context.Background(), "filename")
require.NoError(mt, err, "failed to open upload stream")

_, err = uploadStream.Write([]byte{0x12})
require.NoError(mt, err, "failed to write to upload stream")

err = uploadStream.Close()
assert.Error(t, err, context.DeadlineExceeded)
})

mt.Run("Aborting an upload stream can be timed out", func(mt *mtest.T) {
// Drop and re-create the db.fs.files and db.fs.chunks collections.
err := mt.Client.Database("db").Collection("fs.files").Drop(context.Background())
assert.NoError(mt, err, "failed to drop files")

err = mt.Client.Database("db").Collection("fs.chunks").Drop(context.Background())
assert.NoError(mt, err, "failed to drop chunks")

// Set a blocking "delete" fail point.
mt.SetFailPoint(mtest.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: mtest.FailPointMode{
Times: 1,
},
Data: mtest.FailPointData{
FailCommands: []string{"delete"},
BlockConnection: true,
BlockTimeMS: 500,
},
})

// Create a new MongoClient with timeoutMS=250.
cliOptions := options.Client().SetTimeout(250 * time.Millisecond).ApplyURI(mtest.ClusterURI())
integtest.AddTestServerAPIVersion(cliOptions)

client, err := mongo.Connect(cliOptions)
assert.NoError(mt, err, "failed to connect to server")

// Create a GridFS bucket that wraps the db database.
bucket := client.Database("db").GridFSBucket(options.GridFSBucket().SetChunkSizeBytes(2))

// Call bucket.open_upload_stream() with the filename filename to create
// an upload stream (referred to as uploadStream).
uploadStream, err := bucket.OpenUploadStream(context.Background(), "filename")
require.NoError(mt, err)

// Using uploadStream, upload the bytes [0x01, 0x02, 0x03, 0x04].
_, err = uploadStream.Write([]byte{0x01, 0x02, 0x03, 0x04})
require.NoError(mt, err)

err = uploadStream.Abort()
assert.Error(mt, err, context.DeadlineExceeded)
})
})

const test61 = "6.1 gridfs - upload and download with non-expiring client-level timeout"
mt.RunOpts(test61, mtest.NewOptions().MinServerVersion("4.4"), func(mt *mtest.T) {
// Drop and re-create the db.fs.files and db.fs.chunks collections.
err := mt.Client.Database("db").Collection("fs.files").Drop(context.Background())
assert.NoError(mt, err, "failed to drop files")

err = mt.Client.Database("db").Collection("fs.chunks").Drop(context.Background())
assert.NoError(mt, err, "failed to drop chunks")

// Create a new MongoClient with timeoutMS=500.
cliOptions := options.Client().SetTimeout(500 * time.Millisecond).ApplyURI(mtest.ClusterURI())
integtest.AddTestServerAPIVersion(cliOptions)

client, err := mongo.Connect(cliOptions)
assert.NoError(mt, err, "failed to connect to server")

// Create a GridFS bucket that wraps the db database.
bucket := client.Database("db").GridFSBucket()

mt.Run("UploadFromStream", func(mt *mtest.T) {
// Upload file and ensure it uploaded correctly.
fileID, err := bucket.UploadFromStream(context.Background(), "filename", bytes.NewReader([]byte{0x12}))
assert.NoError(mt, err, "failed to upload stream")

buf := bytes.Buffer{}

_, err = bucket.DownloadToStream(context.Background(), fileID, &buf)
assert.NoError(mt, err, "failed to download stream")
assert.Equal(mt, buf.Len(), 1)
assert.Equal(mt, buf.Bytes(), []byte{0x12})
})

mt.Run("OpenUploadStream", func(mt *mtest.T) {
// Upload file and ensure it uploaded correctly.
uploadStream, err := bucket.OpenUploadStream(context.Background(), "filename2")
require.NoError(mt, err, "failed to open upload stream")

_, err = uploadStream.Write([]byte{0x13})
require.NoError(mt, err, "failed to write data to upload stream")

err = uploadStream.Close()
require.NoError(mt, err, "failed to close upload stream")

buf := bytes.Buffer{}

_, err = bucket.DownloadToStream(context.Background(), uploadStream.FileID, &buf)
assert.NoError(mt, err, "failed to download stream")
assert.Equal(mt, buf.Len(), 1)
assert.Equal(mt, buf.Bytes(), []byte{0x13})
})
})

const test62 = "6.2 gridfs - upload with operation-level timeout"
mt.RunOpts(test62, mtest.NewOptions().MinServerVersion("4.4"), func(mt *mtest.T) {
// Drop and re-create the db.fs.files and db.fs.chunks collections.
err := mt.Client.Database("db").Collection("fs.files").Drop(context.Background())
assert.NoError(mt, err, "failed to drop files")

err = mt.Client.Database("db").Collection("fs.chunks").Drop(context.Background())
assert.NoError(mt, err, "failed to drop chunks")

// Set a blocking "insert" fail point.
mt.SetFailPoint(mtest.FailPoint{
ConfigureFailPoint: "failCommand",
Mode: mtest.FailPointMode{
Times: 1,
},
Data: mtest.FailPointData{
FailCommands: []string{"insert"},
BlockConnection: true,
BlockTimeMS: 15,
},
})

// Create a new MongoClient with timeoutMS=10.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be better to update the comment to describe the code more accurately.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

@qingyang-hu qingyang-hu Sep 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind, it looks good now.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may have missed something, but is the "timeoutMS=10" still accurate after bf50559? Where is the timeout set?

cliOptions := options.Client().SetTimeout(10 * time.Millisecond).ApplyURI(mtest.ClusterURI())
integtest.AddTestServerAPIVersion(cliOptions)

client, err := mongo.Connect(cliOptions)
assert.NoError(mt, err, "failed to connect to server")

// Create a GridFS bucket that wraps the db database.
bucket := client.Database("db").GridFSBucket()

mt.Run("UploadFromStream", func(mt *mtest.T) {

// If the operation-level context is not respected, then the client-level
// timeout will exceed deadline.
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()

// Upload file and ensure it uploaded correctly.
fileID, err := bucket.UploadFromStream(ctx, "filename", bytes.NewReader([]byte{0x12}))
require.NoError(mt, err, "failed to upload stream")

buf := bytes.Buffer{}

_, err = bucket.DownloadToStream(context.Background(), fileID, &buf)
assert.NoError(mt, err, "failed to download stream")
assert.Equal(mt, buf.Len(), 1)
assert.Equal(mt, buf.Bytes(), []byte{0x12})
})

mt.Run("OpenUploadStream", func(mt *mtest.T) {
// If the operation-level context is not respected, then the client-level
// timeout will exceed deadline.
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()

// Upload file and ensure it uploaded correctly.
uploadStream, err := bucket.OpenUploadStream(ctx, "filename2")
require.NoError(mt, err, "failed to open upload stream")

_, err = uploadStream.Write([]byte{0x13})
require.NoError(mt, err, "failed to write data to upload stream")

err = uploadStream.Close()
require.NoError(mt, err, "failed to close upload stream")

buf := bytes.Buffer{}

_, err = bucket.DownloadToStream(context.Background(), uploadStream.FileID, &buf)
assert.NoError(mt, err, "failed to download stream")
assert.Equal(mt, buf.Len(), 1)
assert.Equal(mt, buf.Bytes(), []byte{0x13})
})
})

const test63 = "6.3 gridfs - cancel context mid-stream"
mt.RunOpts(test63, mtest.NewOptions().MinServerVersion("4.4"), func(mt *mtest.T) {
// Drop and re-create the db.fs.files and db.fs.chunks collections.
err := mt.Client.Database("db").Collection("fs.files").Drop(context.Background())
assert.NoError(mt, err, "failed to drop files")

err = mt.Client.Database("db").Collection("fs.chunks").Drop(context.Background())
assert.NoError(mt, err, "failed to drop chunks")

// Create a new MongoClient with timeoutMS=10.
cliOptions := options.Client().ApplyURI(mtest.ClusterURI())
integtest.AddTestServerAPIVersion(cliOptions)

client, err := mongo.Connect(cliOptions)
assert.NoError(mt, err, "failed to connect to server")

// Create a GridFS bucket that wraps the db database.
bucket := client.Database("db").GridFSBucket()

mt.Run("Upload#Close", func(mt *mtest.T) {
// Upload file and ensure it uploaded correctly.
uploadStream, err := bucket.OpenUploadStream(context.Background(), "filename")
require.NoError(mt, err)

_ = uploadStream.Close()

_, err = uploadStream.Write([]byte{0x13})
assert.Error(mt, err, context.Canceled)
})

mt.Run("Upload#Abort", func(mt *mtest.T) {
// Upload file and ensure it uploaded correctly.
uploadStream, err := bucket.OpenUploadStream(context.Background(), "filename2")
require.NoError(mt, err)

_ = uploadStream.Abort()

_, err = uploadStream.Write([]byte{0x13})
assert.Error(mt, err, context.Canceled)
})

mt.Run("Download#Close", func(mt *mtest.T) {
// Upload file and ensure it uploaded correctly.
fileID, err := bucket.UploadFromStream(context.Background(), "filename3", bytes.NewReader([]byte{0x12}))
require.NoError(mt, err, "failed to upload stream")

downloadStream, err := bucket.OpenDownloadStream(context.Background(), fileID)
assert.NoError(mt, err)

_ = downloadStream.Close()

_, err = downloadStream.Read([]byte{})
assert.Error(mt, err, context.Canceled)
})
})

mt.Run("8. server selection", func(mt *mtest.T) {
cliOpts := options.Client().ApplyURI("mongodb://invalid/?serverSelectionTimeoutMS=100")
mtOpts := mtest.NewOptions().ClientOptions(cliOpts).CreateCollection(false)
Expand Down
11 changes: 6 additions & 5 deletions mongo/gridfs_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ func (b *GridFSBucket) OpenUploadStreamWithID(
opts ...options.Lister[options.GridFSUploadOptions],
) (*GridFSUploadStream, error) {
ctx, cancel := csot.WithTimeout(ctx, b.db.client.timeout)
defer cancel()

if err := b.checkFirstWrite(ctx); err != nil {
return nil, err
Expand All @@ -96,7 +95,7 @@ func (b *GridFSBucket) OpenUploadStreamWithID(
return nil, err
}

return newUploadStream(ctx, upload, fileID, filename, b.chunksColl, b.filesColl), nil
return newUploadStream(ctx, cancel, upload, fileID, filename, b.chunksColl, b.filesColl), nil
}

// UploadFromStream creates a fileID and uploads a file given a source stream.
Expand Down Expand Up @@ -135,6 +134,9 @@ func (b *GridFSBucket) UploadFromStreamWithID(
source io.Reader,
opts ...options.Lister[options.GridFSUploadOptions],
) error {
ctx, cancel := csot.WithTimeout(ctx, b.db.client.timeout)
defer cancel()

us, err := b.OpenUploadStreamWithID(ctx, fileID, filename, opts...)
if err != nil {
return err
Expand Down Expand Up @@ -350,7 +352,6 @@ func (b *GridFSBucket) openDownloadStream(
opts ...options.Lister[options.FindOneOptions],
) (*GridFSDownloadStream, error) {
ctx, cancel := csot.WithTimeout(ctx, b.db.client.timeout)
defer cancel()

result := b.filesColl.FindOne(ctx, filter, opts...)

Expand All @@ -369,7 +370,7 @@ func (b *GridFSBucket) openDownloadStream(
foundFile := newFileFromResponse(resp)

if foundFile.Length == 0 {
return newGridFSDownloadStream(ctx, nil, foundFile.ChunkSize, foundFile), nil
return newGridFSDownloadStream(ctx, cancel, nil, foundFile.ChunkSize, foundFile), nil
}

// For a file with non-zero length, chunkSize must exist so we know what size to expect when downloading chunks.
Expand All @@ -384,7 +385,7 @@ func (b *GridFSBucket) openDownloadStream(

// The chunk size can be overridden for individual files, so the expected chunk size should be the "chunkSize"
// field from the files collection document, not the bucket's chunk size.
return newGridFSDownloadStream(ctx, chunksCursor, foundFile.ChunkSize, foundFile), nil
return newGridFSDownloadStream(ctx, cancel, chunksCursor, foundFile.ChunkSize, foundFile), nil
}

func (b *GridFSBucket) downloadToStream(ds *GridFSDownloadStream, stream io.Writer) (int64, error) {
Expand Down
16 changes: 15 additions & 1 deletion mongo/gridfs_download_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type GridFSDownloadStream struct {
expectedChunk int32 // index of next expected chunk
fileLen int64
ctx context.Context
cancel context.CancelFunc

// The pointer returned by GetFile. This should not be used in the actual GridFSDownloadStream code outside of the
// newGridFSDownloadStream constructor because the values can be mutated by the user after calling GetFile. Instead,
Expand Down Expand Up @@ -95,7 +96,13 @@ func newFileFromResponse(resp findFileResponse) *GridFSFile {
}
}

func newGridFSDownloadStream(ctx context.Context, cursor *Cursor, chunkSize int32, file *GridFSFile) *GridFSDownloadStream {
func newGridFSDownloadStream(
ctx context.Context,
cancel context.CancelFunc,
cursor *Cursor,
chunkSize int32,
file *GridFSFile,
) *GridFSDownloadStream {
numChunks := int32(math.Ceil(float64(file.Length) / float64(chunkSize)))

return &GridFSDownloadStream{
Expand All @@ -107,11 +114,18 @@ func newGridFSDownloadStream(ctx context.Context, cursor *Cursor, chunkSize int3
fileLen: file.Length,
file: file,
ctx: ctx,
cancel: cancel,
}
}

// Close closes this download stream.
func (ds *GridFSDownloadStream) Close() error {
defer func() {
if ds.cancel != nil {
ds.cancel()
}
}()

if ds.closed {
return ErrStreamClosed
}
Expand Down
Loading
Loading