Skip to content

Commit f626372

Browse files
RUST-1395 / RUST-1402 GridFS sync API and documentation (#780)
1 parent 71de04b commit f626372

File tree

12 files changed

+625
-100
lines changed

12 files changed

+625
-100
lines changed

src/db/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -584,7 +584,7 @@ impl Database {
584584
.await
585585
}
586586

587-
/// Creates a new GridFsBucket in the database with the given options.
587+
/// Creates a new [`GridFsBucket`] in the database with the given options.
588588
pub fn gridfs_bucket(&self, options: impl Into<Option<GridFsBucketOptions>>) -> GridFsBucket {
589589
GridFsBucket::new(self.clone(), options.into().unwrap_or_default())
590590
}

src/error.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -815,7 +815,8 @@ pub enum GridFsErrorKind {
815815
#[non_exhaustive]
816816
MissingChunk { n: u32 },
817817

818-
/// An operation was attempted on a `GridFsUploadStream` that has already been shut down.
818+
/// An operation was attempted on a [`GridFsUploadStream`](crate::gridfs::GridFsUploadStream)
819+
/// that has already been shut down.
819820
UploadStreamClosed,
820821

821822
/// The chunk was the incorrect size.
@@ -843,7 +844,8 @@ pub enum GridFsErrorKind {
843844
delete_error: Error,
844845
},
845846

846-
/// A close operation was attempted on a [`GridFsUploadStream`] while a write was still in
847+
/// A close operation was attempted on a
848+
/// [`GridFsUploadStream`](crate::gridfs::GridFsUploadStream) while a write was still in
847849
/// progress.
848850
WriteInProgress,
849851
}

src/gridfs.rs

Lines changed: 51 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
// TODO(RUST-1395) Remove these allows.
2-
#![allow(dead_code, unused_variables, missing_docs)]
1+
//! Contains the functionality for GridFS operations.
32
43
mod download;
5-
pub mod options;
4+
pub(crate) mod options;
65
mod upload;
76

87
use std::sync::{atomic::AtomicBool, Arc};
@@ -20,13 +19,13 @@ use crate::{
2019
};
2120

2221
pub use download::GridFsDownloadStream;
23-
pub use options::*;
22+
pub(crate) use options::*;
2423
pub use upload::GridFsUploadStream;
2524

26-
pub const DEFAULT_BUCKET_NAME: &str = "fs";
27-
pub const DEFAULT_CHUNK_SIZE_BYTES: u32 = 255 * 1024;
25+
const DEFAULT_BUCKET_NAME: &str = "fs";
26+
const DEFAULT_CHUNK_SIZE_BYTES: u32 = 255 * 1024;
2827

29-
// Contained in a "chunks" collection for each user file
28+
/// A model for the documents stored in the chunks collection.
3029
#[derive(Debug, Deserialize, Serialize)]
3130
pub(crate) struct Chunk<'a> {
3231
#[serde(rename = "_id")]
@@ -37,45 +36,57 @@ pub(crate) struct Chunk<'a> {
3736
data: RawBinaryRef<'a>,
3837
}
3938

40-
/// A collection in which information about stored files is stored. There will be one files
41-
/// collection document per stored file.
39+
/// A model for the documents stored in a GridFS bucket's files
40+
/// collection.
4241
#[derive(Clone, Debug, Deserialize, Serialize)]
4342
#[serde(rename_all = "camelCase")]
4443
#[skip_serializing_none]
4544
#[non_exhaustive]
4645
pub struct FilesCollectionDocument {
46+
/// The file's unique identifier.
4747
#[serde(rename = "_id")]
4848
pub id: Bson,
49+
50+
/// The length of the file in bytes.
4951
pub length: u64,
50-
pub chunk_size: u32,
52+
53+
/// The size of the file's chunks in bytes.
54+
#[serde(rename = "chunkSize")]
55+
pub chunk_size_bytes: u32,
56+
57+
/// The time at which the file was uploaded.
5158
pub upload_date: DateTime,
59+
60+
/// The name of the file.
5261
pub filename: Option<String>,
62+
63+
/// User-provided metadata associated with the file.
5364
pub metadata: Option<Document>,
5465
}
5566

5667
impl FilesCollectionDocument {
5768
/// Returns the total number of chunks expected to be in the file.
5869
fn n(&self) -> u32 {
59-
Self::n_from_vals(self.length, self.chunk_size)
70+
Self::n_from_vals(self.length, self.chunk_size_bytes)
6071
}
6172

62-
fn n_from_vals(length: u64, chunk_size: u32) -> u32 {
63-
let chunk_size = chunk_size as u64;
64-
let n = length / chunk_size + u64::from(length % chunk_size != 0);
73+
fn n_from_vals(length: u64, chunk_size_bytes: u32) -> u32 {
74+
let chunk_size_bytes = chunk_size_bytes as u64;
75+
let n = length / chunk_size_bytes + u64::from(length % chunk_size_bytes != 0);
6576
n as u32
6677
}
6778

6879
/// Returns the expected length of a chunk given its index.
6980
fn expected_chunk_length(&self, n: u32) -> u32 {
70-
Self::expected_chunk_length_from_vals(self.length, self.chunk_size, n)
81+
Self::expected_chunk_length_from_vals(self.length, self.chunk_size_bytes, n)
7182
}
7283

73-
fn expected_chunk_length_from_vals(length: u64, chunk_size: u32, n: u32) -> u32 {
74-
let remainder = length % (chunk_size as u64);
75-
if n == Self::n_from_vals(length, chunk_size) - 1 && remainder != 0 {
84+
fn expected_chunk_length_from_vals(length: u64, chunk_size_bytes: u32, n: u32) -> u32 {
85+
let remainder = length % (chunk_size_bytes as u64);
86+
if n == Self::n_from_vals(length, chunk_size_bytes) - 1 && remainder != 0 {
7687
remainder as u32
7788
} else {
78-
chunk_size
89+
chunk_size_bytes
7990
}
8091
}
8192
}
@@ -89,7 +100,15 @@ struct GridFsBucketInner {
89100
created_indexes: AtomicBool,
90101
}
91102

92-
/// Struct for storing GridFS managed files within a [`Database`].
103+
/// A `GridFsBucket` provides the functionality for storing and retrieving binary BSON data that
104+
/// exceeds the 16 MiB size limit of a MongoDB document. Users may upload and download large amounts
105+
/// of data, called files, to the bucket. When a file is uploaded, its contents are divided into
106+
/// chunks and stored in a chunks collection. A corresponding [`FilesCollectionDocument`] is also
107+
/// stored in a files collection. When a user downloads a file, the bucket finds and returns the
108+
/// data stored in its chunks.
109+
///
110+
/// `GridFsBucket` uses [`std::sync::Arc`] internally, so it can be shared safely across threads or
111+
/// async tasks.
93112
#[derive(Debug, Clone)]
94113
pub struct GridFsBucket {
95114
inner: Arc<GridFsBucketInner>,
@@ -142,41 +161,42 @@ impl GridFsBucket {
142161
self.inner.files.client()
143162
}
144163

145-
/// Gets the read concern of the [`GridFsBucket`].
164+
/// Gets the read concern of the bucket.
146165
pub fn read_concern(&self) -> Option<&ReadConcern> {
147166
self.inner.options.read_concern.as_ref()
148167
}
149168

150-
/// Gets the write concern of the [`GridFsBucket`].
169+
/// Gets the write concern of the bucket.
151170
pub fn write_concern(&self) -> Option<&WriteConcern> {
152171
self.inner.options.write_concern.as_ref()
153172
}
154173

155-
/// Gets the selection criteria of the [`GridFsBucket`].
174+
/// Gets the selection criteria of the bucket.
156175
pub fn selection_criteria(&self) -> Option<&SelectionCriteria> {
157176
self.inner.options.selection_criteria.as_ref()
158177
}
159178

160-
/// Gets the chunk size in bytes for the [`GridFsBucket`].
179+
/// Gets the chunk size in bytes for the bucket.
161180
fn chunk_size_bytes(&self) -> u32 {
162181
self.inner
163182
.options
164183
.chunk_size_bytes
165184
.unwrap_or(DEFAULT_CHUNK_SIZE_BYTES)
166185
}
167186

168-
/// Gets a handle to the files collection for the [`GridFsBucket`].
187+
/// Gets a handle to the files collection for the bucket.
169188
pub(crate) fn files(&self) -> &Collection<FilesCollectionDocument> {
170189
&self.inner.files
171190
}
172191

173-
/// Gets a handle to the chunks collection for the [`GridFsBucket`].
192+
/// Gets a handle to the chunks collection for the bucket.
174193
pub(crate) fn chunks(&self) -> &Collection<Chunk<'static>> {
175194
&self.inner.chunks
176195
}
177196

178-
/// Deletes the [`FilesCollectionDocument`] with the given `id `and its associated chunks from
179-
/// this bucket.
197+
/// Deletes the [`FilesCollectionDocument`] with the given `id` and its associated chunks from
198+
/// this bucket. This method returns an error if the `id` does not match any files in the
199+
/// bucket.
180200
pub async fn delete(&self, id: Bson) -> Result<()> {
181201
let delete_result = self
182202
.files()
@@ -209,7 +229,8 @@ impl GridFsBucket {
209229
self.files().find(filter, find_options).await
210230
}
211231

212-
/// Renames the file with the given 'id' to the provided `new_filename`.
232+
/// Renames the file with the given 'id' to the provided `new_filename`. This method returns an
233+
/// error if the `id` does not match any files in the bucket.
213234
pub async fn rename(&self, id: Bson, new_filename: impl AsRef<str>) -> Result<()> {
214235
self.files()
215236
.update_one(
@@ -222,7 +243,7 @@ impl GridFsBucket {
222243
Ok(())
223244
}
224245

225-
/// Drops all of the files and their associated chunks in this bucket.
246+
/// Removes all of the files and their associated chunks from this bucket.
226247
pub async fn drop(&self) -> Result<()> {
227248
self.files().drop(None).await?;
228249
self.chunks().drop(None).await?;

src/gridfs/download.rs

Lines changed: 93 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,29 @@ impl GridFsBucket {
7474

7575
// User functions for downloading to writers.
7676
impl GridFsBucket {
77-
/// Downloads the contents of the stored file specified by `id` and writes
78-
/// the contents to the `destination`.
77+
/// Downloads the contents of the stored file specified by `id` and writes the contents to the
78+
/// `destination`, which may be any type that implements the [`futures_io::AsyncWrite`] trait.
79+
///
80+
/// To download to a type that implements [`tokio::io::AsyncWrite`], use the
81+
/// [`tokio_util::compat`] module to convert between types.
82+
///
83+
/// ```rust
84+
/// # use mongodb::{bson::Bson, error::Result, gridfs::GridFsBucket};
85+
/// # async fn compat_example(
86+
/// # bucket: GridFsBucket,
87+
/// # tokio_writer: impl tokio::io::AsyncWrite + Unpin,
88+
/// # id: Bson,
89+
/// # ) -> Result<()> {
90+
/// use tokio_util::compat::TokioAsyncWriteCompatExt;
91+
///
92+
/// let futures_writer = tokio_writer.compat_write();
93+
/// bucket.download_to_futures_0_3_writer(id, futures_writer).await?;
94+
/// # Ok(())
95+
/// # }
96+
/// ```
97+
///
98+
/// Note that once an `AsyncWrite` trait is stabilized in the standard library, this method will
99+
/// be deprecated in favor of one that accepts a `std::io::AsyncWrite` source.
79100
pub async fn download_to_futures_0_3_writer<T>(&self, id: Bson, destination: T) -> Result<()>
80101
where
81102
T: AsyncWrite + Unpin,
@@ -85,9 +106,34 @@ impl GridFsBucket {
85106
}
86107

87108
/// Downloads the contents of the stored file specified by `filename` and writes the contents to
88-
/// the `destination`. If there are multiple files with the same filename, the `revision` in the
89-
/// options provided is used to determine which one to download. If no `revision` is specified,
90-
/// the most recent file with the given filename is chosen.
109+
/// the `destination`, which may be any type that implements the [`futures_io::AsyncWrite`]
110+
/// trait.
111+
///
112+
/// If there are multiple files in the bucket with the given filename, the `revision` in the
113+
/// options provided is used to determine which one to download. See the documentation for
114+
/// [`GridFsDownloadByNameOptions`] for details on how to specify a revision. If no revision is
115+
/// provided, the file with `filename` most recently uploaded will be downloaded.
116+
///
117+
/// To download to a type that implements [`tokio::io::AsyncWrite`], use the
118+
/// [`tokio_util::compat`] module to convert between types.
119+
///
120+
/// ```rust
121+
/// # use mongodb::{bson::Bson, error::Result, gridfs::GridFsBucket};
122+
/// # async fn compat_example(
123+
/// # bucket: GridFsBucket,
124+
/// # tokio_writer: impl tokio::io::AsyncWrite + Unpin,
125+
/// # id: Bson,
126+
/// # ) -> Result<()> {
127+
/// use tokio_util::compat::TokioAsyncWriteCompatExt;
128+
///
129+
/// let futures_writer = tokio_writer.compat_write();
130+
/// bucket.download_to_futures_0_3_writer_by_name("example_file", futures_writer, None).await?;
131+
/// # Ok(())
132+
/// # }
133+
/// ```
134+
///
135+
/// Note that once an `AsyncWrite` trait is stabilized in the standard library, this method will
136+
/// be deprecated in favor of one that accepts a `std::io::AsyncWrite` source.
91137
pub async fn download_to_futures_0_3_writer_by_name<T>(
92138
&self,
93139
filename: impl AsRef<str>,
@@ -154,6 +200,38 @@ impl GridFsBucket {
154200
}
155201
}
156202

203+
/// A stream from which a file stored in a GridFS bucket can be downloaded.
204+
///
205+
/// # Downloading from the Stream
206+
/// The `GridFsDownloadStream` type implements [`futures_io::AsyncRead`]. It is recommended that
207+
/// users call the utility methods in [`AsyncReadExt`](futures_util::io::AsyncReadExt) to interact
208+
/// with the stream.
209+
///
210+
/// ```rust
211+
/// # use mongodb::{bson::Bson, error::Result, gridfs::{GridFsBucket, GridFsDownloadStream}};
212+
/// # async fn download_example(bucket: GridFsBucket, id: Bson) -> Result<()> {
213+
/// use futures_util::io::AsyncReadExt;
214+
///
215+
/// let mut buf = Vec::new();
216+
/// let mut download_stream = bucket.open_download_stream(id).await?;
217+
/// download_stream.read_to_end(&mut buf).await?;
218+
/// # Ok(())
219+
/// # }
220+
/// ```
221+
///
222+
/// # Using [`tokio::io::AsyncRead`]
223+
/// Users who prefer to use tokio's `AsyncRead` trait can use the [`tokio_util::compat`] module.
224+
///
225+
/// ```rust
226+
/// # use mongodb::{bson::Bson, error::Result, gridfs::{GridFsBucket, GridFsUploadStream}};
227+
/// # async fn compat_example(bucket: GridFsBucket, id: Bson) -> Result<()> {
228+
/// use tokio_util::compat::FuturesAsyncReadCompatExt;
229+
///
230+
/// let futures_upload_stream = bucket.open_download_stream(id).await?;
231+
/// let tokio_upload_stream = futures_upload_stream.compat();
232+
/// # Ok(())
233+
/// # }
234+
/// ```
157235
pub struct GridFsDownloadStream {
158236
state: State,
159237
current_n: u32,
@@ -206,11 +284,6 @@ impl GridFsDownloadStream {
206284
file,
207285
})
208286
}
209-
210-
/// Gets the file `id` for the stream.
211-
pub fn files_id(&self) -> &Bson {
212-
&self.file.id
213-
}
214287
}
215288

216289
impl AsyncRead for GridFsDownloadStream {
@@ -230,7 +303,7 @@ impl AsyncRead for GridFsDownloadStream {
230303
} else {
231304
let chunks_in_buf = FilesCollectionDocument::n_from_vals(
232305
buf.len() as u64,
233-
stream.file.chunk_size,
306+
stream.file.chunk_size_bytes,
234307
);
235308
// We should read from current_n to chunks_in_buf + current_n, or, if that would
236309
// exceed the total number of chunks in the file, to the last chunk in the file.
@@ -244,7 +317,7 @@ impl AsyncRead for GridFsDownloadStream {
244317
cursor,
245318
buffer,
246319
n_range,
247-
stream.file.chunk_size,
320+
stream.file.chunk_size_bytes,
248321
stream.file.length,
249322
)
250323
.boxed(),
@@ -289,7 +362,7 @@ async fn get_bytes(
289362
mut cursor: Box<Cursor<Chunk<'static>>>,
290363
mut buffer: Vec<u8>,
291364
n_range: Range<u32>,
292-
chunk_size: u32,
365+
chunk_size_bytes: u32,
293366
file_len: u64,
294367
) -> Result<(Vec<u8>, Box<Cursor<Chunk<'static>>>)> {
295368
for n in n_range {
@@ -305,7 +378,7 @@ async fn get_bytes(
305378
}
306379

307380
let expected_len =
308-
FilesCollectionDocument::expected_chunk_length_from_vals(file_len, chunk_size, n);
381+
FilesCollectionDocument::expected_chunk_length_from_vals(file_len, chunk_size_bytes, n);
309382
if chunk_bytes.len() != (expected_len as usize) {
310383
return Err(ErrorKind::GridFs(GridFsErrorKind::WrongSizeChunk {
311384
actual_size: chunk_bytes.len(),
@@ -330,8 +403,12 @@ impl GridFsBucket {
330403
}
331404

332405
/// Opens and returns a [`GridFsDownloadStream`] from which the application can read
333-
/// the contents of the stored file specified by `filename` and the revision
334-
/// in `options`.
406+
/// the contents of the stored file specified by `filename`.
407+
///
408+
/// If there are multiple files in the bucket with the given filename, the `revision` in the
409+
/// options provided is used to determine which one to download. See the documentation for
410+
/// [`GridFsDownloadByNameOptions`] for details on how to specify a revision. If no revision is
411+
/// provided, the file with `filename` most recently uploaded will be downloaded.
335412
pub async fn open_download_stream_by_name(
336413
&self,
337414
filename: impl AsRef<str>,

0 commit comments

Comments
 (0)