Skip to content

Commit 8c63264

Browse files
RUST-1399 Implement AsyncRead for GridFsDownloadStream (#764)
1 parent e619a0c commit 8c63264

File tree

8 files changed

+421
-131
lines changed

8 files changed

+421
-131
lines changed

src/cursor/common.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ where
9696
self.state.as_mut().unwrap()
9797
}
9898

99-
fn state(&self) -> &CursorState {
99+
pub(super) fn state(&self) -> &CursorState {
100100
self.state.as_ref().unwrap()
101101
}
102102

src/cursor/mod.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,10 +141,24 @@ impl<T> Cursor<T> {
141141
.and_then(|c| c.post_batch_resume_token())
142142
}
143143

144+
/// Whether this cursor has exhausted all of its getMore calls. The cursor may have more
145+
/// items remaining in the buffer.
144146
pub(crate) fn is_exhausted(&self) -> bool {
145147
self.wrapped_cursor.as_ref().unwrap().is_exhausted()
146148
}
147149

150+
/// Whether this cursor has any additional items to return.
151+
pub(crate) fn has_next(&self) -> bool {
152+
!self.is_exhausted()
153+
|| !self
154+
.wrapped_cursor
155+
.as_ref()
156+
.unwrap()
157+
.state()
158+
.buffer
159+
.is_empty()
160+
}
161+
148162
pub(crate) fn client(&self) -> &Client {
149163
&self.client
150164
}

src/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -818,7 +818,7 @@ pub enum GridFsErrorKind {
818818
/// The chunk was the incorrect size.
819819
#[non_exhaustive]
820820
WrongSizeChunk {
821-
actual_size: u32,
821+
actual_size: usize,
822822
expected_size: u32,
823823
},
824824

src/gridfs.rs

Lines changed: 34 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,13 @@ use std::{
1212
};
1313

1414
use serde::{Deserialize, Serialize};
15-
use tokio::io::ReadBuf;
15+
use serde_with::skip_serializing_none;
1616

1717
use crate::{
1818
bson::{doc, oid::ObjectId, Bson, DateTime, Document, RawBinaryRef},
19-
concern::{ReadConcern, WriteConcern},
2019
cursor::Cursor,
2120
error::{ErrorKind, GridFsErrorKind, GridFsFileIdentifier, Result},
22-
options::{CollectionOptions, FindOptions, SelectionCriteria},
21+
options::{CollectionOptions, FindOptions, ReadConcern, SelectionCriteria, WriteConcern},
2322
Collection,
2423
Database,
2524
};
@@ -42,8 +41,9 @@ struct Chunk<'a> {
4241

4342
/// A collection in which information about stored files is stored. There will be one files
4443
/// collection document per stored file.
45-
#[derive(Debug, Deserialize, Serialize)]
44+
#[derive(Clone, Debug, Deserialize, Serialize)]
4645
#[serde(rename_all = "camelCase")]
46+
#[skip_serializing_none]
4747
#[non_exhaustive]
4848
pub struct FilesCollectionDocument {
4949
#[serde(rename = "_id")]
@@ -52,12 +52,39 @@ pub struct FilesCollectionDocument {
5252
pub chunk_size: u32,
5353
pub upload_date: DateTime,
5454
pub filename: Option<String>,
55-
#[serde(skip_serializing_if = "Option::is_none")]
5655
pub metadata: Option<Document>,
5756
}
5857

58+
impl FilesCollectionDocument {
59+
/// Returns the total number of chunks expected to be in the file.
60+
fn n(&self) -> u32 {
61+
Self::n_from_vals(self.length, self.chunk_size)
62+
}
63+
64+
fn n_from_vals(length: u64, chunk_size: u32) -> u32 {
65+
let chunk_size = chunk_size as u64;
66+
let n = length / chunk_size + u64::from(length % chunk_size != 0);
67+
n as u32
68+
}
69+
70+
/// Returns the expected length of a chunk given its index.
71+
fn expected_chunk_length(&self, n: u32) -> u32 {
72+
Self::expected_chunk_length_from_vals(self.length, self.chunk_size, n)
73+
}
74+
75+
fn expected_chunk_length_from_vals(length: u64, chunk_size: u32, n: u32) -> u32 {
76+
let remainder = length % (chunk_size as u64);
77+
if n == Self::n_from_vals(length, chunk_size) - 1 && remainder != 0 {
78+
remainder as u32
79+
} else {
80+
chunk_size
81+
}
82+
}
83+
}
84+
5985
#[derive(Debug)]
6086
struct GridFsBucketInner {
87+
db: Database,
6188
options: GridFsBucketOptions,
6289
files: Collection<FilesCollectionDocument>,
6390
chunks: Collection<Chunk<'static>>,
@@ -134,27 +161,6 @@ impl futures_util::AsyncWrite for GridFsUploadStream {
134161
}
135162
}
136163

137-
pub struct GridFsDownloadStream {
138-
files_id: Bson,
139-
}
140-
141-
impl GridFsDownloadStream {
142-
/// Gets the file `id` for the stream.
143-
pub fn files_id(&self) -> &Bson {
144-
&self.files_id
145-
}
146-
}
147-
148-
impl tokio::io::AsyncRead for GridFsDownloadStream {
149-
fn poll_read(
150-
self: Pin<&mut Self>,
151-
cx: &mut Context<'_>,
152-
buf: &mut ReadBuf<'_>,
153-
) -> Poll<tokio::io::Result<()>> {
154-
todo!()
155-
}
156-
}
157-
158164
impl GridFsBucket {
159165
pub(crate) fn new(db: Database, mut options: GridFsBucketOptions) -> GridFsBucket {
160166
if options.read_concern.is_none() {
@@ -188,6 +194,7 @@ impl GridFsBucket {
188194

189195
GridFsBucket {
190196
inner: Arc::new(GridFsBucketInner {
197+
db: db.clone(),
191198
options,
192199
files,
193200
chunks,
@@ -230,7 +237,7 @@ impl GridFsBucket {
230237
}
231238

232239
/// Gets a handle to the chunks collection for the [`GridFsBucket`].
233-
fn chunks(&self) -> &Collection<Chunk> {
240+
fn chunks(&self) -> &Collection<Chunk<'static>> {
234241
&self.inner.chunks
235242
}
236243

@@ -260,23 +267,6 @@ impl GridFsBucket {
260267
.await
261268
}
262269

263-
/// Opens and returns a [`GridFsDownloadStream`] from which the application can read
264-
/// the contents of the stored file specified by `id`.
265-
pub async fn open_download_stream(&self, id: Bson) -> Result<GridFsDownloadStream> {
266-
todo!()
267-
}
268-
269-
/// Opens and returns a [`GridFsDownloadStream`] from which the application can read
270-
/// the contents of the stored file specified by `filename` and the revision
271-
/// in `options`.
272-
pub async fn open_download_stream_by_name(
273-
&self,
274-
filename: String,
275-
options: impl Into<Option<GridFsDownloadByNameOptions>>,
276-
) -> Result<GridFsDownloadStream> {
277-
todo!()
278-
}
279-
280270
/// Deletes the [`FilesCollectionDocument`] with the given `id `and its associated chunks from
281271
/// this bucket.
282272
pub async fn delete(&self, id: Bson) -> Result<()> {

0 commit comments

Comments
 (0)