Skip to content

Commit 1248c58

Browse files
committed
GODRIVER-3841 Support concurrent *GridFSBucket.OpenUploadStream calls.
1 parent fcfea70 commit 1248c58

File tree

2 files changed

+45
-9
lines changed

2 files changed

+45
-9
lines changed

internal/integration/gridfs_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,17 @@ import (
1111
"context"
1212
"io"
1313
"math/rand"
14+
"os"
1415
"runtime"
16+
"sync"
1517
"testing"
1618
"time"
1719

1820
"go.mongodb.org/mongo-driver/v2/bson"
1921
"go.mongodb.org/mongo-driver/v2/event"
2022
"go.mongodb.org/mongo-driver/v2/internal/assert"
2123
"go.mongodb.org/mongo-driver/v2/internal/integration/mtest"
24+
"go.mongodb.org/mongo-driver/v2/internal/integtest"
2225
"go.mongodb.org/mongo-driver/v2/internal/israce"
2326
"go.mongodb.org/mongo-driver/v2/internal/require"
2427
"go.mongodb.org/mongo-driver/v2/mongo"
@@ -529,6 +532,41 @@ func TestGridFS(x *testing.T) {
529532
})
530533
}
531534

535+
func TestOpenUploadStreamConcurrently(t *testing.T) {
536+
t.Parallel()
537+
538+
uri, err := integtest.MongoDBURI()
539+
require.NoError(t, err, "error getting URI: %v", err)
540+
opts := options.Client().ApplyURI(uri)
541+
if os.Getenv("REQUIRE_API_VERSION") == "true" {
542+
opts.SetServerAPIOptions(options.ServerAPI(options.ServerAPIVersion1))
543+
}
544+
client, err := mongo.Connect(opts)
545+
require.NoError(t, err, "Connect error: %v", err)
546+
defer func() {
547+
_ = client.Disconnect(context.Background())
548+
}()
549+
550+
db := client.Database(mtest.TestDB)
551+
bucket := db.GridFSBucket()
552+
defer func() {
553+
_ = bucket.Drop(context.Background())
554+
}()
555+
556+
const size = 10_000
557+
558+
wg := sync.WaitGroup{}
559+
wg.Add(size)
560+
for i := 0; i < size; i++ {
561+
go func() {
562+
defer wg.Done()
563+
_, err := bucket.OpenUploadStream(context.Background(), "foo")
564+
assert.NoError(t, err, "OpenUploadStream error: %v", err)
565+
}()
566+
}
567+
wg.Wait()
568+
}
569+
532570
func assertGridFSCollectionState(mt *mtest.T, coll *mongo.Collection, expectedName string, expectedNumDocuments int64) {
533571
mt.Helper()
534572

mongo/gridfs_bucket.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"errors"
1313
"fmt"
1414
"io"
15+
"sync/atomic"
1516

1617
"go.mongodb.org/mongo-driver/v2/bson"
1718
"go.mongodb.org/mongo-driver/v2/internal/csot"
@@ -37,6 +38,8 @@ var ErrMissingGridFSChunkSize = errors.New("files collection document does not c
3738

3839
// GridFSBucket represents a GridFS bucket.
3940
type GridFSBucket struct {
41+
firstWriteDone uint32
42+
4043
db *Database
4144
chunksColl *Collection // collection to store file chunks
4245
filesColl *Collection // collection to store file metadata
@@ -47,9 +50,8 @@ type GridFSBucket struct {
4750
rc *readconcern.ReadConcern
4851
rp *readpref.ReadPref
4952

50-
firstWriteDone bool
51-
readBuf []byte
52-
writeBuf []byte
53+
readBuf []byte
54+
writeBuf []byte
5355
}
5456

5557
// upload contains options to upload a file to a bucket.
@@ -531,14 +533,10 @@ func (b *GridFSBucket) createIndexes(ctx context.Context) error {
531533
}
532534

533535
func (b *GridFSBucket) checkFirstWrite(ctx context.Context) error {
534-
if !b.firstWriteDone {
536+
if atomic.CompareAndSwapUint32(&b.firstWriteDone, 0, 1) {
535537
// before the first write operation, must determine if files collection is empty
536538
// if so, create indexes if they do not already exist
537-
538-
if err := b.createIndexes(ctx); err != nil {
539-
return err
540-
}
541-
b.firstWriteDone = true
539+
return b.createIndexes(ctx)
542540
}
543541

544542
return nil

0 commit comments

Comments
 (0)