Skip to content

Commit c9c4e3f

Browse files
RUST-1400 GridFS download methods (#747)
1 parent 87c3b97 commit c9c4e3f

25 files changed

+3951
-129
lines changed

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ bson = { git = "https://github.com/mongodb/bson-rust", branch = "main" }
7878
chrono = { version = "0.4.7", default-features = false, features = ["clock", "std"] }
7979
derivative = "2.1.1"
8080
flate2 = { version = "1.0", optional = true }
81+
futures-io = "0.3.21"
8182
futures-core = "0.3.14"
8283
futures-util = { version = "0.3.14", features = ["io"] }
8384
futures-executor = "0.3.14"
@@ -150,7 +151,7 @@ features = ["dangerous_configuration"]
150151

151152
[dependencies.tokio-util]
152153
version = "0.7.0"
153-
features = ["io"]
154+
features = ["io", "compat"]
154155

155156
[dependencies.uuid]
156157
version = "1.1.2"
@@ -163,6 +164,7 @@ ctrlc = "3.2.2"
163164
derive_more = "0.99.13"
164165
function_name = "0.2.1"
165166
futures = "0.3"
167+
hex = "0.4"
166168
home = "0.5"
167169
lambda_runtime = "0.6.0"
168170
pretty_assertions = "1.3.0"

src/client/csfle/state_machine.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ impl CryptExecutor {
184184
State::NeedKmsCredentials => {
185185
// TODO(RUST-1314, RUST-1417): support fetching KMS credentials.
186186
return Err(Error::internal("KMS credentials are not yet supported"));
187-
},
187+
}
188188
State::Ready => {
189189
let (tx, rx) = oneshot::channel();
190190
let mut thread_ctx = std::mem::replace(

src/db/mod.rs

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,7 @@ use crate::{
1919
concern::{ReadConcern, WriteConcern},
2020
cursor::Cursor,
2121
error::{Error, ErrorKind, Result},
22-
gridfs::{
23-
options::GridFsBucketOptions,
24-
GridFsBucket,
25-
DEFAULT_BUCKET_NAME,
26-
DEFAULT_CHUNK_SIZE_BYTES,
27-
},
22+
gridfs::{options::GridFsBucketOptions, GridFsBucket},
2823
operation::{Aggregate, AggregateTarget, Create, DropDatabase, ListCollections, RunCommand},
2924
options::{
3025
AggregateOptions,
@@ -573,23 +568,6 @@ impl Database {
573568

574569
/// Creates a new GridFsBucket in the database with the given options.
575570
pub fn gridfs_bucket(&self, options: impl Into<Option<GridFsBucketOptions>>) -> GridFsBucket {
576-
let mut options = options.into().unwrap_or_default();
577-
options.read_concern = options
578-
.read_concern
579-
.or_else(|| self.read_concern().cloned());
580-
options.write_concern = options
581-
.write_concern
582-
.or_else(|| self.write_concern().cloned());
583-
options.selection_criteria = options
584-
.selection_criteria
585-
.or_else(|| self.selection_criteria().cloned());
586-
options.bucket_name = options
587-
.bucket_name
588-
.or_else(|| Some(DEFAULT_BUCKET_NAME.to_string()));
589-
options.chunk_size_bytes = options.chunk_size_bytes.or(Some(DEFAULT_CHUNK_SIZE_BYTES));
590-
GridFsBucket {
591-
db: self.clone(),
592-
options,
593-
}
571+
GridFsBucket::new(self.clone(), options.into().unwrap_or_default())
594572
}
595573
}

src/error.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,11 @@ pub enum ErrorKind {
457457
#[non_exhaustive]
458458
DnsResolve { message: String },
459459

460+
/// A GridFS error occurred.
461+
#[error("{0:?}")]
462+
#[non_exhaustive]
463+
GridFs(GridFsErrorKind),
464+
460465
#[error("Internal error: {message}")]
461466
#[non_exhaustive]
462467
Internal { message: String },
@@ -693,6 +698,49 @@ impl WriteFailure {
693698
}
694699
}
695700

701+
/// An error that occurred during a GridFS operation.
702+
#[derive(Clone, Debug)]
703+
#[allow(missing_docs)]
704+
#[non_exhaustive]
705+
pub enum GridFsErrorKind {
706+
/// The file with the given identifier was not found.
707+
#[non_exhaustive]
708+
FileNotFound { identifier: GridFsFileIdentifier },
709+
710+
/// The file with the given revision was not found.
711+
#[non_exhaustive]
712+
RevisionNotFound { revision: i32 },
713+
714+
/// The chunk at index 'n' was missing.
715+
#[non_exhaustive]
716+
MissingChunk { n: u32 },
717+
718+
/// The chunk was the incorrect size.
719+
#[non_exhaustive]
720+
WrongSizeChunk {
721+
actual_size: u32,
722+
expected_size: u32,
723+
},
724+
725+
/// An incorrect number of chunks was present for the file.
726+
#[non_exhaustive]
727+
WrongNumberOfChunks {
728+
actual_number: u32,
729+
expected_number: u32,
730+
},
731+
}
732+
733+
/// An identifier for a file stored in a GridFS bucket.
734+
#[derive(Clone, Debug)]
735+
#[non_exhaustive]
736+
pub enum GridFsFileIdentifier {
737+
/// The name of the file. Not guaranteed to be unique.
738+
Filename(String),
739+
740+
/// The file's unique [`Bson`] ID.
741+
Id(Bson),
742+
}
743+
696744
/// Translates ErrorKind::BulkWriteError cases to ErrorKind::WriteErrors, leaving all other errors
697745
/// untouched.
698746
pub(crate) fn convert_bulk_errors(error: Error) -> Error {

src/gridfs.rs

Lines changed: 101 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,72 @@
1-
#![allow(dead_code, unused_variables)]
21
// TODO(RUST-1395) Remove these allows.
2+
#![allow(dead_code, unused_variables)]
33

4+
mod download;
45
pub mod options;
56

67
use core::task::{Context, Poll};
7-
use std::pin::Pin;
8+
use std::{
9+
pin::Pin,
10+
sync::{atomic::AtomicBool, Arc},
11+
};
12+
13+
use serde::{Deserialize, Serialize};
14+
use tokio::io::ReadBuf;
815

916
use crate::{
17+
bson::{doc, oid::ObjectId, Bson, DateTime, Document, RawBinaryRef},
1018
concern::{ReadConcern, WriteConcern},
1119
cursor::Cursor,
1220
error::Result,
13-
selection_criteria::SelectionCriteria,
21+
options::SelectionCriteria,
22+
Collection,
1423
Database,
1524
};
16-
use bson::{oid::ObjectId, Bson, DateTime, Document};
25+
1726
use options::*;
18-
use serde::{Deserialize, Serialize};
19-
use tokio::io::ReadBuf;
2027

2128
pub const DEFAULT_BUCKET_NAME: &str = "fs";
2229
pub const DEFAULT_CHUNK_SIZE_BYTES: u32 = 255 * 1024;
2330

2431
// Contained in a "chunks" collection for each user file
25-
struct Chunk {
32+
#[derive(Debug, Deserialize, Serialize)]
33+
struct Chunk<'a> {
34+
#[serde(rename = "_id")]
2635
id: ObjectId,
2736
files_id: Bson,
2837
n: u32,
29-
// default size is 255 KiB
30-
data: Vec<u8>,
38+
#[serde(borrow)]
39+
data: RawBinaryRef<'a>,
3140
}
3241

3342
/// A collection in which information about stored files is stored. There will be one files
3443
/// collection document per stored file.
35-
#[derive(Serialize, Deserialize)]
44+
#[derive(Debug, Deserialize, Serialize)]
45+
#[serde(rename_all = "camelCase")]
46+
#[non_exhaustive]
3647
pub struct FilesCollectionDocument {
37-
id: Bson,
38-
length: i64,
39-
chunk_size: u32,
40-
upload_date: DateTime,
41-
filename: String,
42-
metadata: Document,
48+
#[serde(rename = "_id")]
49+
pub id: Bson,
50+
pub length: u64,
51+
pub chunk_size: u32,
52+
pub upload_date: DateTime,
53+
pub filename: Option<String>,
54+
#[serde(skip_serializing_if = "Option::is_none")]
55+
pub metadata: Option<Document>,
56+
}
57+
58+
#[derive(Debug)]
59+
struct GridFsBucketInner {
60+
options: GridFsBucketOptions,
61+
files: Collection<FilesCollectionDocument>,
62+
chunks: Collection<Chunk<'static>>,
63+
created_indexes: AtomicBool,
4364
}
4465

4566
/// Struct for storing GridFS managed files within a [`Database`].
67+
#[derive(Debug, Clone)]
4668
pub struct GridFsBucket {
47-
// Contains a "chunks" collection
48-
pub(crate) db: Database,
49-
pub(crate) options: GridFsBucketOptions,
69+
inner: Arc<GridFsBucketInner>,
5070
}
5171

5272
// TODO: RUST-1395 Add documentation and example code for this struct.
@@ -134,30 +154,67 @@ impl tokio::io::AsyncRead for GridFsDownloadStream {
134154
}
135155
}
136156

137-
impl futures_util::io::AsyncRead for GridFsDownloadStream {
138-
fn poll_read(
139-
self: Pin<&mut Self>,
140-
cx: &mut Context<'_>,
141-
buf: &mut [u8],
142-
) -> Poll<core::result::Result<usize, futures_util::io::Error>> {
143-
todo!()
157+
impl GridFsBucket {
158+
pub(crate) fn new(db: Database, mut options: GridFsBucketOptions) -> GridFsBucket {
159+
if options.read_concern.is_none() {
160+
options.read_concern = db.read_concern().cloned();
161+
}
162+
if options.write_concern.is_none() {
163+
options.write_concern = db.write_concern().cloned();
164+
}
165+
if options.selection_criteria.is_none() {
166+
options.selection_criteria = db.selection_criteria().cloned();
167+
}
168+
169+
let bucket_name = options
170+
.bucket_name
171+
.as_deref()
172+
.unwrap_or(DEFAULT_BUCKET_NAME);
173+
174+
let files = db.collection::<FilesCollectionDocument>(&format!("{}.files", bucket_name));
175+
let chunks = db.collection::<Chunk>(&format!("{}.chunks", bucket_name));
176+
177+
GridFsBucket {
178+
inner: Arc::new(GridFsBucketInner {
179+
options,
180+
files,
181+
chunks,
182+
created_indexes: AtomicBool::new(false),
183+
}),
184+
}
144185
}
145-
}
146186

147-
impl GridFsBucket {
148187
/// Gets the read concern of the [`GridFsBucket`].
149188
pub fn read_concern(&self) -> Option<&ReadConcern> {
150-
self.options.read_concern.as_ref()
189+
self.inner.options.read_concern.as_ref()
151190
}
152191

153192
/// Gets the write concern of the [`GridFsBucket`].
154193
pub fn write_concern(&self) -> Option<&WriteConcern> {
155-
self.options.write_concern.as_ref()
194+
self.inner.options.write_concern.as_ref()
156195
}
157196

158197
/// Gets the selection criteria of the [`GridFsBucket`].
159198
pub fn selection_criteria(&self) -> Option<&SelectionCriteria> {
160-
self.options.selection_criteria.as_ref()
199+
self.inner.options.selection_criteria.as_ref()
200+
}
201+
202+
/// Gets the chunk size in bytes for the [`GridFsBucket`].
203+
fn chunk_size_bytes(&self) -> u32 {
204+
self.inner
205+
.options
206+
.chunk_size_bytes
207+
.unwrap_or(DEFAULT_CHUNK_SIZE_BYTES)
208+
}
209+
210+
/// Gets a handle to the files collection for the [`GridFsBucket`].
211+
fn files(&self) -> &Collection<FilesCollectionDocument> {
212+
&self.inner.files
213+
}
214+
215+
/// Gets a handle to the chunks collection for the [`GridFsBucket`].
216+
fn chunks(&self) -> &Collection<Chunk> {
217+
&self.inner.chunks
161218
}
162219

163220
/// Opens a [`GridFsUploadStream`] that the application can write the contents of the file to.
@@ -173,19 +230,6 @@ impl GridFsBucket {
173230
todo!()
174231
}
175232

176-
/// Opens a [`GridFsUploadStream`] that the application can write the contents of the file to.
177-
/// The driver generates a unique [`Bson::ObjectId`] for the file id.
178-
///
179-
/// Returns a [`GridFsUploadStream`] to which the application will write the contents.
180-
pub async fn open_upload_stream(
181-
&self,
182-
filename: String,
183-
options: impl Into<Option<GridFsUploadOptions>>,
184-
) -> Result<GridFsUploadStream> {
185-
self.open_upload_stream_with_id(Bson::ObjectId(ObjectId::new()), filename, options)
186-
.await
187-
}
188-
189233
/// Uploads a user file to a GridFS bucket. The application supplies a custom file id. Uses the
190234
/// `tokio` crate's `AsyncRead` trait for the `source`.
191235
pub async fn upload_from_tokio_reader_with_id(
@@ -244,6 +288,19 @@ impl GridFsBucket {
244288
.await
245289
}
246290

291+
/// Opens a [`GridFsUploadStream`] that the application can write the contents of the file to.
292+
/// The driver generates a unique [`Bson::ObjectId`] for the file id.
293+
///
294+
/// Returns a [`GridFsUploadStream`] to which the application will write the contents.
295+
pub async fn open_upload_stream(
296+
&self,
297+
filename: String,
298+
options: impl Into<Option<GridFsUploadOptions>>,
299+
) -> Result<GridFsUploadStream> {
300+
self.open_upload_stream_with_id(Bson::ObjectId(ObjectId::new()), filename, options)
301+
.await
302+
}
303+
247304
/// Opens and returns a [`GridFsDownloadStream`] from which the application can read
248305
/// the contents of the stored file specified by `id`.
249306
pub async fn open_download_stream(&self, id: Bson) -> Result<GridFsDownloadStream> {
@@ -261,52 +318,6 @@ impl GridFsBucket {
261318
todo!()
262319
}
263320

264-
/// Downloads the contents of the stored file specified by `id` and writes
265-
/// the contents to the `destination`. Uses the `tokio` crate's `AsyncWrite`
266-
/// trait for the `destination`.
267-
pub async fn download_to_tokio_writer(
268-
&self,
269-
id: Bson,
270-
destination: impl tokio::io::AsyncWrite,
271-
) {
272-
todo!()
273-
}
274-
275-
/// Downloads the contents of the stored file specified by `id` and writes
276-
/// the contents to the `destination`. Uses the `futures-0.3` crate's `AsyncWrite`
277-
/// trait for the `destination`.
278-
pub async fn download_to_futures_0_3_writer(
279-
&self,
280-
id: Bson,
281-
destination: impl futures_util::AsyncWrite,
282-
) {
283-
todo!()
284-
}
285-
286-
/// Downloads the contents of the stored file specified by `filename` and by
287-
/// the revision in `options` and writes the contents to the `destination`. Uses the
288-
/// `tokio` crate's `AsyncWrite` trait for the `destination`.
289-
pub async fn download_to_tokio_writer_by_name(
290-
&self,
291-
filename: String,
292-
destination: impl tokio::io::AsyncWrite,
293-
options: impl Into<Option<GridFsDownloadByNameOptions>>,
294-
) {
295-
todo!()
296-
}
297-
298-
/// Downloads the contents of the stored file specified by `filename` and by
299-
/// the revision in `options` and writes the contents to the `destination`. Uses the
300-
/// `futures-0.3` crate's `AsyncWrite` trait for the `destination`.
301-
pub async fn download_to_futures_0_3_writer_by_name(
302-
&self,
303-
filename: String,
304-
destination: impl futures_util::AsyncWrite,
305-
options: impl Into<Option<GridFsDownloadByNameOptions>>,
306-
) {
307-
todo!()
308-
}
309-
310321
/// Given an `id`, deletes the stored file's files collection document and
311322
/// associated chunks from a [`GridFsBucket`].
312323
pub async fn delete(&self, id: Bson) {

0 commit comments

Comments
 (0)