Skip to content

Commit 7b62ff3

Browse files
RUST-1478 GridFS upload methods (#751)
1 parent 6164660 commit 7b62ff3

File tree

4 files changed

+202
-60
lines changed

4 files changed

+202
-60
lines changed

src/gridfs.rs

Lines changed: 1 addition & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
mod download;
55
pub mod options;
6+
mod upload;
67

78
use core::task::{Context, Poll};
89
use std::{
@@ -241,64 +242,6 @@ impl GridFsBucket {
241242
todo!()
242243
}
243244

244-
/// Uploads a user file to a GridFS bucket. The application supplies a custom file id. Uses the
245-
/// `tokio` crate's `AsyncRead` trait for the `source`.
246-
pub async fn upload_from_tokio_reader_with_id(
247-
&self,
248-
id: Bson,
249-
filename: String,
250-
source: impl tokio::io::AsyncRead,
251-
options: impl Into<Option<GridFsUploadOptions>>,
252-
) {
253-
todo!()
254-
}
255-
256-
/// Uploads a user file to a GridFS bucket. The application supplies a custom file id. Uses the
257-
/// `futures-0.3` crate's `AsyncRead` trait for the `source`.
258-
pub async fn upload_from_futures_0_3_reader_with_id(
259-
&self,
260-
id: Bson,
261-
filename: String,
262-
source: impl futures_util::AsyncRead,
263-
options: impl Into<Option<GridFsUploadOptions>>,
264-
) {
265-
todo!()
266-
}
267-
268-
/// Uploads a user file to a GridFS bucket. The driver generates a unique [`Bson::ObjectId`] for
269-
/// the file id. Uses the `tokio` crate's `AsyncRead` trait for the `source`.
270-
pub async fn upload_from_tokio_reader(
271-
&self,
272-
filename: String,
273-
source: impl tokio::io::AsyncRead,
274-
options: impl Into<Option<GridFsUploadOptions>>,
275-
) {
276-
self.upload_from_tokio_reader_with_id(
277-
Bson::ObjectId(ObjectId::new()),
278-
filename,
279-
source,
280-
options,
281-
)
282-
.await
283-
}
284-
285-
/// Uploads a user file to a GridFS bucket. The driver generates a unique [`Bson::ObjectId`] for
286-
/// the file id. Uses the `futures-0.3` crate's `AsyncRead` trait for the `source`.
287-
pub async fn upload_from_futures_0_3_reader(
288-
&self,
289-
filename: String,
290-
source: impl futures_util::AsyncRead,
291-
options: impl Into<Option<GridFsUploadOptions>>,
292-
) {
293-
self.upload_from_futures_0_3_reader_with_id(
294-
Bson::ObjectId(ObjectId::new()),
295-
filename,
296-
source,
297-
options,
298-
)
299-
.await
300-
}
301-
302245
/// Opens a [`GridFsUploadStream`] that the application can write the contents of the file to.
303246
/// The driver generates a unique [`Bson::ObjectId`] for the file id.
304247
///

src/gridfs/upload.rs

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
use std::{marker::Unpin, sync::atomic::Ordering};
2+
3+
use futures_util::{
4+
io::{AsyncRead, AsyncReadExt},
5+
stream::TryStreamExt,
6+
};
7+
8+
use super::{options::GridFsUploadOptions, Chunk, FilesCollectionDocument, GridFsBucket};
9+
use crate::{
10+
bson::{doc, oid::ObjectId, spec::BinarySubtype, Bson, DateTime, Document, RawBinaryRef},
11+
bson_util::get_int,
12+
error::{ErrorKind, Result},
13+
index::IndexModel,
14+
options::{FindOneOptions, ReadPreference, SelectionCriteria},
15+
Collection,
16+
};
17+
18+
impl GridFsBucket {
19+
/// Uploads a user file to a GridFS bucket. Bytes are read from `source` and stored in chunks in
20+
/// the bucket's chunks collection. After all the chunks have been uploaded, a corresponding
21+
/// [`FilesCollectionDocument`] is stored in the bucket's files collection.
22+
///
23+
/// This method generates an [`ObjectId`] for the `files_id` field of the
24+
/// [`FilesCollectionDocument`] and returns it.
25+
pub async fn upload_from_futures_0_3_reader<T>(
26+
&self,
27+
filename: impl AsRef<str>,
28+
source: T,
29+
options: impl Into<Option<GridFsUploadOptions>>,
30+
) -> Result<ObjectId>
31+
where
32+
T: AsyncRead + Unpin,
33+
{
34+
let id = ObjectId::new();
35+
self.upload_from_futures_0_3_reader_with_id(id.into(), filename, source, options)
36+
.await?;
37+
Ok(id)
38+
}
39+
40+
/// Uploads a user file to a GridFS bucket with the given `files_id`. Bytes are read from
41+
/// `source` and stored in chunks in the bucket's chunks collection. After all the chunks have
42+
/// been uploaded, a corresponding [`FilesCollectionDocument`] is stored in the bucket's files
43+
/// collection.
44+
pub async fn upload_from_futures_0_3_reader_with_id<T>(
45+
&self,
46+
files_id: Bson,
47+
filename: impl AsRef<str>,
48+
mut source: T,
49+
options: impl Into<Option<GridFsUploadOptions>>,
50+
) -> Result<()>
51+
where
52+
T: AsyncRead + Unpin,
53+
{
54+
let options = options.into();
55+
56+
self.create_indexes().await?;
57+
58+
let chunk_size = options
59+
.as_ref()
60+
.and_then(|opts| opts.chunk_size_bytes)
61+
.unwrap_or_else(|| self.chunk_size_bytes());
62+
let mut length = 0u64;
63+
let mut n = 0;
64+
65+
let mut buf = vec![0u8; chunk_size as usize];
66+
loop {
67+
let bytes_read = match source.read(&mut buf).await {
68+
Ok(0) => break,
69+
Ok(n) => n,
70+
Err(error) => {
71+
self.chunks()
72+
.delete_many(doc! { "files_id": &files_id }, None)
73+
.await?;
74+
return Err(ErrorKind::Io(error.into()).into());
75+
}
76+
};
77+
78+
let chunk = Chunk {
79+
id: ObjectId::new(),
80+
files_id: files_id.clone(),
81+
n,
82+
data: RawBinaryRef {
83+
subtype: BinarySubtype::Generic,
84+
bytes: &buf[..bytes_read],
85+
},
86+
};
87+
self.chunks().insert_one(chunk, None).await?;
88+
89+
length += bytes_read as u64;
90+
n += 1;
91+
}
92+
93+
let file = FilesCollectionDocument {
94+
id: files_id,
95+
length,
96+
chunk_size,
97+
upload_date: DateTime::now(),
98+
filename: Some(filename.as_ref().to_string()),
99+
metadata: options.and_then(|opts| opts.metadata),
100+
};
101+
self.files().insert_one(file, None).await?;
102+
103+
Ok(())
104+
}
105+
106+
async fn create_indexes(&self) -> Result<()> {
107+
if !self.inner.created_indexes.load(Ordering::SeqCst) {
108+
let find_options = FindOneOptions::builder()
109+
.selection_criteria(SelectionCriteria::ReadPreference(ReadPreference::Primary))
110+
.projection(doc! { "_id": 1 })
111+
.build();
112+
if self
113+
.files()
114+
.clone_with_type::<Document>()
115+
.find_one(None, find_options)
116+
.await?
117+
.is_none()
118+
{
119+
Self::create_index(self.files(), doc! { "filename": 1, "uploadDate": 1 }).await?;
120+
Self::create_index(self.chunks(), doc! { "files_id": 1, "n": 1 }).await?;
121+
}
122+
self.inner.created_indexes.store(true, Ordering::SeqCst);
123+
}
124+
125+
Ok(())
126+
}
127+
128+
async fn create_index<T>(coll: &Collection<T>, keys: Document) -> Result<()> {
129+
// From the spec: Drivers MUST check whether the indexes already exist before attempting to
130+
// create them.
131+
let mut indexes = coll.list_indexes(None).await?;
132+
'outer: while let Some(index_model) = indexes.try_next().await? {
133+
if index_model.keys.len() != keys.len() {
134+
continue;
135+
}
136+
// Indexes should be considered equivalent regardless of numeric value type.
137+
// e.g. { "filename": 1, "uploadDate": 1 } is equivalent to
138+
// { "filename": 1.0, "uploadDate": 1.0 }
139+
let number_matches = |key: &str, value: &Bson| {
140+
if let Some(model_value) = index_model.keys.get(key) {
141+
match get_int(value) {
142+
Some(num) => get_int(model_value) == Some(num),
143+
None => model_value == value,
144+
}
145+
} else {
146+
false
147+
}
148+
};
149+
for (key, value) in keys.iter() {
150+
if !number_matches(key, value) {
151+
continue 'outer;
152+
}
153+
}
154+
return Ok(());
155+
}
156+
157+
let index_model = IndexModel::builder().keys(keys).build();
158+
coll.create_index(index_model, None).await?;
159+
160+
Ok(())
161+
}
162+
}

src/test/spec/unified_runner/operation.rs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ use crate::{
3838
coll::options::Hint,
3939
collation::Collation,
4040
error::{ErrorKind, Result},
41-
gridfs::options::GridFsDownloadByNameOptions,
41+
gridfs::options::{GridFsDownloadByNameOptions, GridFsUploadOptions},
4242
options::{
4343
AggregateOptions,
4444
CountOptions,
@@ -348,6 +348,7 @@ impl<'de> Deserialize<'de> for Operation {
348348
"download" => deserialize_op::<Download>(definition.arguments),
349349
"downloadByName" => deserialize_op::<DownloadByName>(definition.arguments),
350350
"delete" => deserialize_op::<Delete>(definition.arguments),
351+
"upload" => deserialize_op::<Upload>(definition.arguments),
351352
_ => Ok(Box::new(UnimplementedOperation) as Box<dyn TestOperation>),
352353
}
353354
.map_err(|e| serde::de::Error::custom(format!("{}", e)))?;
@@ -2730,6 +2731,43 @@ impl TestOperation for Delete {
27302731
.boxed()
27312732
}
27322733
}
2734+
#[derive(Debug, Deserialize)]
2735+
#[serde(rename_all = "camelCase", deny_unknown_fields)]
2736+
pub(super) struct Upload {
2737+
source: Document,
2738+
filename: String,
2739+
// content_type and disableMD5 are deprecated and no longer supported.
2740+
// Options included for deserialization.
2741+
#[serde(rename = "contentType")]
2742+
_content_type: Option<String>,
2743+
#[serde(rename = "disableMD5")]
2744+
_disable_md5: Option<bool>,
2745+
#[serde(flatten)]
2746+
options: GridFsUploadOptions,
2747+
}
2748+
2749+
impl TestOperation for Upload {
2750+
fn execute_entity_operation<'a>(
2751+
&'a self,
2752+
id: &'a str,
2753+
test_runner: &'a TestRunner,
2754+
) -> BoxFuture<'a, Result<Option<Entity>>> {
2755+
async move {
2756+
let bucket = test_runner.get_bucket(id).await;
2757+
let hex_string = self.source.get("$$hexBytes").unwrap().as_str().unwrap();
2758+
let bytes = hex::decode(hex_string).unwrap();
2759+
let id = bucket
2760+
.upload_from_futures_0_3_reader(
2761+
self.filename.clone(),
2762+
&bytes[..],
2763+
self.options.clone(),
2764+
)
2765+
.await?;
2766+
Ok(Some(Entity::Bson(id.into())))
2767+
}
2768+
.boxed()
2769+
}
2770+
}
27332771

27342772
#[derive(Debug, Deserialize)]
27352773
pub(super) struct UnimplementedOperation;

src/test/spec/unified_runner/test_runner.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ const SKIPPED_OPERATIONS: &[&str] = &[
6767
"listCollectionObjects",
6868
"listDatabaseObjects",
6969
"mapReduce",
70-
"upload",
7170
"watch",
7271
];
7372

0 commit comments

Comments
 (0)