Skip to content

Commit 498dcf6

Browse files
committed
Allow GridFS fileID to be any type
GODRIVER-815 Change-Id: I93b413198e0bdd949e6cbccb9a9436e93e6c3cbf
1 parent 233bb4b commit 498dcf6

File tree

3 files changed

+80
-25
lines changed

3 files changed

+80
-25
lines changed

mongo/gridfs/bucket.go

Lines changed: 53 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/mongodb/mongo-go-driver/mongo/readpref"
2525
"github.com/mongodb/mongo-go-driver/mongo/writeconcern"
2626
"github.com/mongodb/mongo-go-driver/x/bsonx"
27+
"github.com/mongodb/mongo-go-driver/x/bsonx/bsoncore"
2728
)
2829

2930
// TODO: add sessions options
@@ -116,7 +117,7 @@ func (b *Bucket) OpenUploadStream(filename string, opts ...*options.UploadOption
116117
}
117118

118119
// OpenUploadStreamWithID creates a new upload stream for a file given the file ID and filename.
119-
func (b *Bucket) OpenUploadStreamWithID(fileID primitive.ObjectID, filename string, opts ...*options.UploadOptions) (*UploadStream, error) {
120+
func (b *Bucket) OpenUploadStreamWithID(fileID interface{}, filename string, opts ...*options.UploadOptions) (*UploadStream, error) {
120121
ctx, cancel := deadlineContext(b.writeDeadline)
121122
if cancel != nil {
122123
defer cancel()
@@ -142,7 +143,7 @@ func (b *Bucket) UploadFromStream(filename string, source io.Reader, opts ...*op
142143
}
143144

144145
// UploadFromStreamWithID uploads a file given a source stream.
145-
func (b *Bucket) UploadFromStreamWithID(fileID primitive.ObjectID, filename string, source io.Reader, opts ...*options.UploadOptions) error {
146+
func (b *Bucket) UploadFromStreamWithID(fileID interface{}, filename string, source io.Reader, opts ...*options.UploadOptions) error {
146147
us, err := b.OpenUploadStreamWithID(fileID, filename, opts...)
147148
if err != nil {
148149
return err
@@ -177,15 +178,19 @@ func (b *Bucket) UploadFromStreamWithID(fileID primitive.ObjectID, filename stri
177178
}
178179

179180
// OpenDownloadStream creates a stream from which the contents of the file can be read.
180-
func (b *Bucket) OpenDownloadStream(fileID primitive.ObjectID) (*DownloadStream, error) {
181+
func (b *Bucket) OpenDownloadStream(fileID interface{}) (*DownloadStream, error) {
182+
id, err := convertFileID(fileID)
183+
if err != nil {
184+
return nil, err
185+
}
181186
return b.openDownloadStream(bsonx.Doc{
182-
{"_id", bsonx.ObjectID(fileID)},
187+
{"_id", id},
183188
})
184189
}
185190

186191
// DownloadToStream downloads the file with the specified fileID and writes it to the provided io.Writer.
187192
// Returns the number of bytes written to the steam and an error, or nil if there was no error.
188-
func (b *Bucket) DownloadToStream(fileID primitive.ObjectID, stream io.Writer) (int64, error) {
193+
func (b *Bucket) DownloadToStream(fileID interface{}, stream io.Writer) (int64, error) {
189194
ds, err := b.OpenDownloadStream(fileID)
190195
if err != nil {
191196
return 0, err
@@ -225,15 +230,19 @@ func (b *Bucket) DownloadToStreamByName(filename string, stream io.Writer, opts
225230
}
226231

227232
// Delete deletes all chunks and metadata associated with the file with the given file ID.
228-
func (b *Bucket) Delete(fileID primitive.ObjectID) error {
233+
func (b *Bucket) Delete(fileID interface{}) error {
229234
// delete document in files collection and then chunks to minimize race conditions
230235

231236
ctx, cancel := deadlineContext(b.writeDeadline)
232237
if cancel != nil {
233238
defer cancel()
234239
}
235240

236-
res, err := b.filesColl.DeleteOne(ctx, bsonx.Doc{{"_id", bsonx.ObjectID(fileID)}})
241+
id, err := convertFileID(fileID)
242+
if err != nil {
243+
return err
244+
}
245+
res, err := b.filesColl.DeleteOne(ctx, bsonx.Doc{{"_id", id}})
237246
if err == nil && res.DeletedCount == 0 {
238247
err = ErrFileNotFound
239248
}
@@ -277,14 +286,18 @@ func (b *Bucket) Find(filter interface{}, opts ...*options.GridFSFindOptions) (*
277286
}
278287

279288
// Rename renames the stored file with the specified file ID.
280-
func (b *Bucket) Rename(fileID primitive.ObjectID, newFilename string) error {
289+
func (b *Bucket) Rename(fileID interface{}, newFilename string) error {
281290
ctx, cancel := deadlineContext(b.writeDeadline)
282291
if cancel != nil {
283292
defer cancel()
284293
}
285294

295+
id, err := convertFileID(fileID)
296+
if err != nil {
297+
return err
298+
}
286299
res, err := b.filesColl.UpdateOne(ctx,
287-
bsonx.Doc{{"_id", bsonx.ObjectID(fileID)}},
300+
bsonx.Doc{{"_id", id}},
288301
bsonx.Doc{{"$set", bsonx.Document(bsonx.Doc{{"filename", bsonx.String(newFilename)}})}},
289302
)
290303
if err != nil {
@@ -369,8 +382,12 @@ func (b *Bucket) downloadToStream(ds *DownloadStream, stream io.Writer) (int64,
369382
return copied, ds.Close()
370383
}
371384

372-
func (b *Bucket) deleteChunks(ctx context.Context, fileID primitive.ObjectID) error {
373-
_, err := b.chunksColl.DeleteMany(ctx, bsonx.Doc{{"files_id", bsonx.ObjectID(fileID)}})
385+
func (b *Bucket) deleteChunks(ctx context.Context, fileID interface{}) error {
386+
id, err := convertFileID(fileID)
387+
if err != nil {
388+
return err
389+
}
390+
_, err = b.chunksColl.DeleteMany(ctx, bsonx.Doc{{"files_id", id}})
374391
return err
375392
}
376393

@@ -388,9 +405,13 @@ func (b *Bucket) findFile(ctx context.Context, filter interface{}, opts ...*opti
388405
return cursor, nil
389406
}
390407

391-
func (b *Bucket) findChunks(ctx context.Context, fileID primitive.ObjectID) (*mongo.Cursor, error) {
408+
func (b *Bucket) findChunks(ctx context.Context, fileID interface{}) (*mongo.Cursor, error) {
409+
id, err := convertFileID(fileID)
410+
if err != nil {
411+
return nil, err
412+
}
392413
chunksCursor, err := b.chunksColl.Find(ctx,
393-
bsonx.Doc{{"files_id", bsonx.ObjectID(fileID)}},
414+
bsonx.Doc{{"files_id", id}},
394415
options.Find().SetSort(bsonx.Doc{{"n", bsonx.Int32(1)}})) // sort by chunk index
395416
if err != nil {
396417
return nil, err
@@ -510,3 +531,22 @@ func (b *Bucket) parseUploadOptions(opts ...*options.UploadOptions) (*Upload, er
510531

511532
return upload, nil
512533
}
534+
535+
type _convertFileID struct {
536+
ID interface{} `bson:"_id"`
537+
}
538+
539+
func convertFileID(fileID interface{}) (bsonx.Val, error) {
540+
id := _convertFileID{
541+
ID: fileID,
542+
}
543+
544+
b, err := bson.Marshal(id)
545+
if err != nil {
546+
return bsonx.Val{}, err
547+
}
548+
val := bsoncore.Document(b).Lookup("_id")
549+
var res bsonx.Val
550+
err = res.UnmarshalBSONValue(val.Type, val.Data)
551+
return res, err
552+
}

mongo/gridfs/gridfs_spec_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ func compareValues(expected bsonx.Val, actual bsonx.Val) bool {
268268
return true // shouldn't get here
269269
}
270270

271-
func compareGfsDoc(t *testing.T, expected bsonx.Doc, actual bsonx.Doc, filesID primitive.ObjectID) {
271+
func compareGfsDoc(t *testing.T, expected bsonx.Doc, actual bsonx.Doc, filesID interface{}) {
272272
for _, elem := range expected {
273273
key := elem.Key
274274

@@ -289,8 +289,10 @@ func compareGfsDoc(t *testing.T, expected bsonx.Doc, actual bsonx.Doc, filesID p
289289
expectedBytes := make([]byte, 12)
290290
actualBytes := make([]byte, 12)
291291

292-
err = (&filesID).UnmarshalJSON(expectedBytes)
292+
var oid primitive.ObjectID
293+
err = (&oid).UnmarshalJSON(expectedBytes)
293294
testhelpers.RequireNil(t, err, "error unmarshalling expected bytes: %s", err)
295+
filesID = oid
294296
actualID := actualVal.ObjectID()
295297
err = (&actualID).UnmarshalJSON(actualBytes)
296298
testhelpers.RequireNil(t, err, "error unmarshalling actual bytes: %s", err)
@@ -314,7 +316,7 @@ func compareGfsDoc(t *testing.T, expected bsonx.Doc, actual bsonx.Doc, filesID p
314316
}
315317

316318
// compare chunks and expectedChunks collections
317-
func compareChunks(t *testing.T, filesID primitive.ObjectID) {
319+
func compareChunks(t *testing.T, filesID interface{}) {
318320
actualCursor, err := chunks.Find(ctx, emptyDoc)
319321
testhelpers.RequireNil(t, err, "error running Find for chunks: %s", err)
320322
expectedCursor, err := expectedChunks.Find(ctx, emptyDoc)
@@ -378,7 +380,7 @@ func msgToDoc(t *testing.T, msg json.RawMessage) bsonx.Doc {
378380
return doc
379381
}
380382

381-
func runUploadAssert(t *testing.T, test test, fileID primitive.ObjectID) {
383+
func runUploadAssert(t *testing.T, test test, fileID interface{}) {
382384
assert := test.Assert
383385

384386
for _, assertData := range assert.Data {

mongo/gridfs/upload_stream.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@ import (
1212
"context"
1313
"time"
1414

15+
"math"
16+
1517
"github.com/mongodb/mongo-go-driver/bson/primitive"
1618
"github.com/mongodb/mongo-go-driver/mongo"
1719
"github.com/mongodb/mongo-go-driver/x/bsonx"
18-
"math"
1920
)
2021

2122
// UploadBufferSize is the size in bytes of one stream batch. Chunks will be written to the db after the sum of chunk
@@ -28,7 +29,7 @@ var ErrStreamClosed = errors.New("stream is closed or aborted")
2829
// UploadStream is used to upload files in chunks.
2930
type UploadStream struct {
3031
*Upload // chunk size and metadata
31-
FileID primitive.ObjectID
32+
FileID interface{}
3233

3334
chunkIndex int
3435
chunksColl *mongo.Collection // collection to store file chunks
@@ -42,7 +43,7 @@ type UploadStream struct {
4243
}
4344

4445
// NewUploadStream creates a new upload stream.
45-
func newUploadStream(upload *Upload, fileID primitive.ObjectID, filename string, chunks *mongo.Collection, files *mongo.Collection) *UploadStream {
46+
func newUploadStream(upload *Upload, fileID interface{}, filename string, chunks, files *mongo.Collection) *UploadStream {
4647
return &UploadStream{
4748
Upload: upload,
4849
FileID: fileID,
@@ -134,7 +135,11 @@ func (us *UploadStream) Abort() error {
134135
defer cancel()
135136
}
136137

137-
_, err := us.chunksColl.DeleteMany(ctx, bsonx.Doc{{"files_id", bsonx.ObjectID(us.FileID)}})
138+
id, err := convertFileID(us.FileID)
139+
if err != nil {
140+
return err
141+
}
142+
_, err = us.chunksColl.DeleteMany(ctx, bsonx.Doc{{"files_id", id}})
138143
if err != nil {
139144
return err
140145
}
@@ -156,6 +161,10 @@ func (us *UploadStream) uploadChunks(ctx context.Context, uploadPartial bool) er
156161

157162
docs := make([]interface{}, int(numChunks))
158163

164+
id, err := convertFileID(us.FileID)
165+
if err != nil {
166+
return err
167+
}
159168
begChunkIndex := us.chunkIndex
160169
for i := 0; i < us.bufferIndex; i += int(us.chunkSize) {
161170
endIndex := i + int(us.chunkSize)
@@ -169,15 +178,15 @@ func (us *UploadStream) uploadChunks(ctx context.Context, uploadPartial bool) er
169178
chunkData := us.buffer[i:endIndex]
170179
docs[us.chunkIndex-begChunkIndex] = bsonx.Doc{
171180
{"_id", bsonx.ObjectID(primitive.NewObjectID())},
172-
{"files_id", bsonx.ObjectID(us.FileID)},
181+
{"files_id", id},
173182
{"n", bsonx.Int32(int32(us.chunkIndex))},
174183
{"data", bsonx.Binary(0x00, chunkData)},
175184
}
176185
us.chunkIndex++
177186
us.fileLen += int64(len(chunkData))
178187
}
179188

180-
_, err := us.chunksColl.InsertMany(ctx, docs)
189+
_, err = us.chunksColl.InsertMany(ctx, docs)
181190
if err != nil {
182191
return err
183192
}
@@ -192,8 +201,12 @@ func (us *UploadStream) uploadChunks(ctx context.Context, uploadPartial bool) er
192201
}
193202

194203
func (us *UploadStream) createFilesCollDoc(ctx context.Context) error {
204+
id, err := convertFileID(us.FileID)
205+
if err != nil {
206+
return err
207+
}
195208
doc := bsonx.Doc{
196-
{"_id", bsonx.ObjectID(us.FileID)},
209+
{"_id", id},
197210
{"length", bsonx.Int64(us.fileLen)},
198211
{"chunkSize", bsonx.Int32(us.chunkSize)},
199212
{"uploadDate", bsonx.DateTime(time.Now().UnixNano() / int64(time.Millisecond))},
@@ -204,7 +217,7 @@ func (us *UploadStream) createFilesCollDoc(ctx context.Context) error {
204217
doc = append(doc, bsonx.Elem{"metadata", bsonx.Document(us.metadata)})
205218
}
206219

207-
_, err := us.filesColl.InsertOne(ctx, doc)
220+
_, err = us.filesColl.InsertOne(ctx, doc)
208221
if err != nil {
209222
return err
210223
}

0 commit comments

Comments
 (0)