Skip to content

Commit e9a8e2a

Browse files
RUST-1507 Implement AsyncWrite for GridFsUploadStream (#774)
1 parent e7ac897 commit e9a8e2a

File tree

9 files changed

+651
-118
lines changed

9 files changed

+651
-118
lines changed

src/error.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -815,6 +815,9 @@ pub enum GridFsErrorKind {
815815
#[non_exhaustive]
816816
MissingChunk { n: u32 },
817817

818+
/// An operation was attempted on a `GridFsUploadStream` that has already been shut down.
819+
UploadStreamClosed,
820+
818821
/// The chunk was the incorrect size.
819822
#[non_exhaustive]
820823
WrongSizeChunk {
@@ -828,6 +831,21 @@ pub enum GridFsErrorKind {
828831
actual_number: u32,
829832
expected_number: u32,
830833
},
834+
835+
/// An error occurred when aborting a file upload.
836+
#[non_exhaustive]
837+
AbortError {
838+
/// The original error. Only present if the abort occurred because of an error during a
839+
/// GridFS operation.
840+
original_error: Option<Error>,
841+
842+
/// The error that occurred when attempting to remove any orphaned chunks.
843+
delete_error: Error,
844+
},
845+
846+
/// A close operation was attempted on a [`GridFsUploadStream`] while a write was still in
847+
/// progress.
848+
WriteInProgress,
831849
}
832850

833851
/// An identifier for a file stored in a GridFS bucket.

src/gridfs.rs

Lines changed: 15 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,34 @@
11
// TODO(RUST-1395) Remove these allows.
2-
#![allow(dead_code, unused_variables)]
2+
#![allow(dead_code, unused_variables, missing_docs)]
33

44
mod download;
55
pub mod options;
66
mod upload;
77

8-
use core::task::{Context, Poll};
9-
use std::{
10-
pin::Pin,
11-
sync::{atomic::AtomicBool, Arc},
12-
};
8+
use std::sync::{atomic::AtomicBool, Arc};
139

1410
use serde::{Deserialize, Serialize};
1511
use serde_with::skip_serializing_none;
1612

1713
use crate::{
1814
bson::{doc, oid::ObjectId, Bson, DateTime, Document, RawBinaryRef},
1915
cursor::Cursor,
20-
error::{ErrorKind, GridFsErrorKind, GridFsFileIdentifier, Result},
16+
error::{Error, ErrorKind, GridFsErrorKind, GridFsFileIdentifier, Result},
2117
options::{CollectionOptions, FindOptions, ReadConcern, SelectionCriteria, WriteConcern},
2218
Collection,
2319
Database,
2420
};
2521

26-
use options::*;
22+
pub use download::GridFsDownloadStream;
23+
pub use options::*;
24+
pub use upload::GridFsUploadStream;
2725

2826
pub const DEFAULT_BUCKET_NAME: &str = "fs";
2927
pub const DEFAULT_CHUNK_SIZE_BYTES: u32 = 255 * 1024;
3028

3129
// Contained in a "chunks" collection for each user file
3230
#[derive(Debug, Deserialize, Serialize)]
33-
struct Chunk<'a> {
31+
pub(crate) struct Chunk<'a> {
3432
#[serde(rename = "_id")]
3533
id: ObjectId,
3634
files_id: Bson,
@@ -97,70 +95,6 @@ pub struct GridFsBucket {
9795
inner: Arc<GridFsBucketInner>,
9896
}
9997

100-
// TODO: RUST-1395 Add documentation and example code for this struct.
101-
pub struct GridFsUploadStream {
102-
files_id: Bson,
103-
}
104-
105-
impl GridFsUploadStream {
106-
/// Gets the file `id` for the stream.
107-
pub fn files_id(&self) -> &Bson {
108-
&self.files_id
109-
}
110-
111-
/// Consumes the stream and uploads data in the stream to the server.
112-
pub async fn finish(self) {
113-
todo!()
114-
}
115-
116-
/// Aborts the upload and discards the upload stream.
117-
pub async fn abort(self) {
118-
todo!()
119-
}
120-
}
121-
122-
impl tokio::io::AsyncWrite for GridFsUploadStream {
123-
fn poll_write(
124-
self: Pin<&mut Self>,
125-
cx: &mut Context<'_>,
126-
buf: &[u8],
127-
) -> Poll<tokio::io::Result<usize>> {
128-
todo!()
129-
}
130-
131-
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<tokio::io::Result<()>> {
132-
todo!()
133-
}
134-
135-
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<tokio::io::Result<()>> {
136-
todo!()
137-
}
138-
}
139-
140-
impl futures_util::AsyncWrite for GridFsUploadStream {
141-
fn poll_write(
142-
self: Pin<&mut Self>,
143-
cx: &mut Context<'_>,
144-
buf: &[u8],
145-
) -> Poll<core::result::Result<usize, futures_util::io::Error>> {
146-
todo!()
147-
}
148-
149-
fn poll_flush(
150-
self: Pin<&mut Self>,
151-
cx: &mut Context<'_>,
152-
) -> Poll<core::result::Result<(), futures_util::io::Error>> {
153-
todo!()
154-
}
155-
156-
fn poll_close(
157-
self: Pin<&mut Self>,
158-
cx: &mut Context<'_>,
159-
) -> Poll<core::result::Result<(), futures_util::io::Error>> {
160-
todo!()
161-
}
162-
}
163-
16498
impl GridFsBucket {
16599
pub(crate) fn new(db: Database, mut options: GridFsBucketOptions) -> GridFsBucket {
166100
if options.read_concern.is_none() {
@@ -232,41 +166,15 @@ impl GridFsBucket {
232166
}
233167

234168
/// Gets a handle to the files collection for the [`GridFsBucket`].
235-
fn files(&self) -> &Collection<FilesCollectionDocument> {
169+
pub(crate) fn files(&self) -> &Collection<FilesCollectionDocument> {
236170
&self.inner.files
237171
}
238172

239173
/// Gets a handle to the chunks collection for the [`GridFsBucket`].
240-
fn chunks(&self) -> &Collection<Chunk<'static>> {
174+
pub(crate) fn chunks(&self) -> &Collection<Chunk<'static>> {
241175
&self.inner.chunks
242176
}
243177

244-
/// Opens a [`GridFsUploadStream`] that the application can write the contents of the file to.
245-
/// The application provides a custom file id.
246-
///
247-
/// Returns a [`GridFsUploadStream`] to which the application will write the contents.
248-
pub async fn open_upload_stream_with_id(
249-
&self,
250-
id: Bson,
251-
filename: String,
252-
options: impl Into<Option<GridFsUploadOptions>>,
253-
) -> Result<GridFsUploadStream> {
254-
todo!()
255-
}
256-
257-
/// Opens a [`GridFsUploadStream`] that the application can write the contents of the file to.
258-
/// The driver generates a unique [`Bson::ObjectId`] for the file id.
259-
///
260-
/// Returns a [`GridFsUploadStream`] to which the application will write the contents.
261-
pub async fn open_upload_stream(
262-
&self,
263-
filename: String,
264-
options: impl Into<Option<GridFsUploadOptions>>,
265-
) -> Result<GridFsUploadStream> {
266-
self.open_upload_stream_with_id(Bson::ObjectId(ObjectId::new()), filename, options)
267-
.await
268-
}
269-
270178
/// Deletes the [`FilesCollectionDocument`] with the given `id `and its associated chunks from
271179
/// this bucket.
272180
pub async fn delete(&self, id: Bson) -> Result<()> {
@@ -322,3 +230,9 @@ impl GridFsBucket {
322230
Ok(())
323231
}
324232
}
233+
234+
impl Error {
235+
fn into_futures_io_error(self) -> futures_io::Error {
236+
futures_io::Error::new(futures_io::ErrorKind::Other, self)
237+
}
238+
}

src/gridfs/download.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,8 @@ pub struct GridFsDownloadStream {
163163
type GetBytesFuture = BoxFuture<'static, Result<(Vec<u8>, Box<Cursor<Chunk<'static>>>)>>;
164164

165165
enum State {
166-
// Idle stores these fields as options so that they can be moved into a GetBytesFuture
167-
// without requiring ownership of the state. They will always store values and can be
168-
// unwrapped safely.
166+
// Idle is stored as an option so that its fields can be moved into a GetBytesFuture
167+
// without requiring ownership of the state. It can always be unwrapped safely.
169168
Idle(Option<Idle>),
170169
Busy(GetBytesFuture),
171170
Done,
@@ -280,8 +279,7 @@ impl AsyncRead for GridFsDownloadStream {
280279
}
281280
Err(error) => {
282281
stream.state = State::Done;
283-
let error = futures_io::Error::new(futures_io::ErrorKind::Other, error);
284-
Poll::Ready(Err(error))
282+
Poll::Ready(Err(error.into_futures_io_error()))
285283
}
286284
}
287285
}

src/gridfs/options.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ pub struct GridFsBucketOptions {
3333
/// [`GridFsBucket`].
3434
#[derive(Clone, Debug, Default, Deserialize, TypedBuilder)]
3535
#[serde(rename_all = "camelCase")]
36-
#[builder(field_defaults(setter(into)))]
36+
#[builder(field_defaults(default, setter(into)))]
3737
#[non_exhaustive]
3838
pub struct GridFsUploadOptions {
3939
/// The number of bytes per chunk of this file. Defaults to the `chunk_size_bytes` specified
@@ -47,7 +47,7 @@ pub struct GridFsUploadOptions {
4747
/// Contains the options for creating a [`GridFsDownloadStream`] to retrieve a stored file
4848
/// from a [`GridFsBucket`].
4949
#[derive(Clone, Debug, Default, Deserialize, TypedBuilder)]
50-
#[builder(field_defaults(setter(into)))]
50+
#[builder(field_defaults(default, setter(into)))]
5151
#[non_exhaustive]
5252
pub struct GridFsDownloadByNameOptions {
5353
/// Which revision (documents with the same filename and different `upload_date`)
@@ -65,7 +65,7 @@ pub struct GridFsDownloadByNameOptions {
6565

6666
/// Contains the options for performing a find operation on a files collection.
6767
#[derive(Clone, Debug, Default, Deserialize, TypedBuilder)]
68-
#[builder(field_defaults(setter(into)))]
68+
#[builder(field_defaults(default, setter(into)))]
6969
#[non_exhaustive]
7070
pub struct GridFsFindOptions {
7171
/// Enables writing to temporary files on the server. When set to true, the

0 commit comments

Comments
 (0)