Skip to content

Commit 8781988

Browse files
authored
RUST-947 Implement Index Management API (#420)
1 parent d5d43a8 commit 8781988

File tree

14 files changed

+1426
-3
lines changed

14 files changed

+1426
-3
lines changed

src/coll/mod.rs

Lines changed: 131 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ pub mod options;
22

33
use std::{borrow::Borrow, collections::HashSet, fmt, fmt::Debug, sync::Arc};
44

5-
use futures_util::stream::StreamExt;
5+
use futures_util::{
6+
future,
7+
stream::{StreamExt, TryStreamExt},
8+
};
69
use serde::{
710
de::{DeserializeOwned, Error as DeError},
811
Deserialize,
@@ -17,19 +20,30 @@ use crate::{
1720
client::session::TransactionState,
1821
concern::{ReadConcern, WriteConcern},
1922
error::{convert_bulk_errors, BulkWriteError, BulkWriteFailure, Error, ErrorKind, Result},
23+
index::IndexModel,
2024
operation::{
2125
Aggregate,
2226
Count,
2327
CountDocuments,
28+
CreateIndexes,
2429
Delete,
2530
Distinct,
2631
DropCollection,
32+
DropIndexes,
2733
Find,
2834
FindAndModify,
2935
Insert,
36+
ListIndexes,
3037
Update,
3138
},
32-
results::{DeleteResult, InsertManyResult, InsertOneResult, UpdateResult},
39+
results::{
40+
CreateIndexResult,
41+
CreateIndexesResult,
42+
DeleteResult,
43+
InsertManyResult,
44+
InsertOneResult,
45+
UpdateResult,
46+
},
3347
selection_criteria::SelectionCriteria,
3448
Client,
3549
ClientSession,
@@ -313,6 +327,46 @@ impl<T> Collection<T> {
313327
self.client().execute_operation(delete, session).await
314328
}
315329

330+
async fn create_indexes_common(
331+
&self,
332+
indexes: impl IntoIterator<Item = IndexModel>,
333+
options: impl Into<Option<CreateIndexOptions>>,
334+
session: impl Into<Option<&mut ClientSession>>,
335+
) -> Result<CreateIndexesResult> {
336+
let session = session.into();
337+
338+
let mut options = options.into();
339+
resolve_write_concern_with_session!(self, options, session.as_ref())?;
340+
341+
let indexes: Vec<IndexModel> = indexes.into_iter().collect();
342+
343+
let create_indexes = CreateIndexes::new(self.namespace(), indexes, options);
344+
self.client()
345+
.execute_operation(create_indexes, session)
346+
.await
347+
}
348+
349+
/// Creates the given index on this collection.
350+
pub async fn create_index(
351+
&self,
352+
index: IndexModel,
353+
options: impl Into<Option<CreateIndexOptions>>,
354+
) -> Result<CreateIndexResult> {
355+
let response = self
356+
.create_indexes_common(vec![index], options, None)
357+
.await?;
358+
Ok(response.into())
359+
}
360+
361+
/// Creates the given indexes on this collection.
362+
pub async fn create_indexes(
363+
&self,
364+
indexes: impl IntoIterator<Item = IndexModel>,
365+
options: impl Into<Option<CreateIndexOptions>>,
366+
) -> Result<CreateIndexesResult> {
367+
self.create_indexes_common(indexes, options, None).await
368+
}
369+
316370
/// Deletes all documents stored in the collection matching `query`.
317371
pub async fn delete_many(
318372
&self,
@@ -423,6 +477,81 @@ impl<T> Collection<T> {
423477
.await
424478
}
425479

480+
async fn drop_index_common(
481+
&self,
482+
name: impl Into<Option<&str>>,
483+
options: impl Into<Option<DropIndexOptions>>,
484+
session: impl Into<Option<&mut ClientSession>>,
485+
) -> Result<()> {
486+
let session = session.into();
487+
488+
let mut options = options.into();
489+
resolve_write_concern_with_session!(self, options, session.as_ref())?;
490+
491+
// If there is no provided name, that means we should drop all indexes.
492+
let index_name = name.into().unwrap_or("*").to_string();
493+
494+
let drop_index = DropIndexes::new(self.namespace(), index_name, options);
495+
self.client().execute_operation(drop_index, session).await
496+
}
497+
498+
/// Drops the index specified by `name` from this collection.
499+
pub async fn drop_index(
500+
&self,
501+
name: impl AsRef<str>,
502+
options: impl Into<Option<DropIndexOptions>>,
503+
) -> Result<()> {
504+
let name = name.as_ref();
505+
if name == "*" {
506+
return Err(ErrorKind::InvalidArgument {
507+
message: "Cannot pass name \"*\" to drop_index since more than one index would be \
508+
dropped."
509+
.to_string(),
510+
}
511+
.into());
512+
}
513+
self.drop_index_common(name, options, None).await
514+
}
515+
516+
/// Drops all indexes associated with this collection.
517+
pub async fn drop_indexes(&self, options: impl Into<Option<DropIndexOptions>>) -> Result<()> {
518+
self.drop_index_common(None, options, None).await
519+
}
520+
521+
/// Lists all indexes on this collection.
522+
pub async fn list_indexes(
523+
&self,
524+
options: impl Into<Option<ListIndexOptions>>,
525+
) -> Result<Cursor<IndexModel>> {
526+
let list_indexes = ListIndexes::new(self.namespace(), options.into());
527+
let client = self.client();
528+
client
529+
.execute_cursor_operation(list_indexes)
530+
.await
531+
.map(|(spec, session)| Cursor::new(client.clone(), spec, session))
532+
}
533+
534+
async fn list_index_names_common(
535+
&self,
536+
cursor: impl TryStreamExt<Ok = IndexModel, Error = Error>,
537+
) -> Result<Vec<String>> {
538+
cursor
539+
.try_filter_map(|index| future::ok(index.get_name()))
540+
.try_collect()
541+
.await
542+
}
543+
544+
/// Gets the names of all indexes on the collection.
545+
pub async fn list_index_names(&self) -> Result<Vec<String>> {
546+
let list_indexes = ListIndexes::new(self.namespace(), None);
547+
let client = self.client();
548+
let cursor: Cursor<IndexModel> = client
549+
.execute_cursor_operation(list_indexes)
550+
.await
551+
.map(|(spec, session)| Cursor::new(client.clone(), spec, session))?;
552+
self.list_index_names_common(cursor).await
553+
}
554+
426555
async fn update_many_common(
427556
&self,
428557
query: Document,

src/coll/options.rs

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::time::Duration;
22

3+
use bson::serde_helpers;
34
use serde::{de::Error, Deserialize, Deserializer, Serialize, Serializer};
45
use serde_with::skip_serializing_none;
56
use typed_builder::TypedBuilder;
@@ -831,6 +832,36 @@ pub struct FindOneOptions {
831832
pub sort: Option<Document>,
832833
}
833834

835+
/// Specifies the options to a
836+
/// [`Collection::create_index`](../struct.Collection.html#method.create_index) or [`Collection::
837+
/// create_indexes`](../struct.Collection.html#method.create_indexes) operation.
838+
///
839+
/// For more information, see [`createIndexes`](https://docs.mongodb.com/manual/reference/command/createIndexes/).
840+
#[serde_with::skip_serializing_none]
841+
#[derive(Clone, Debug, Default, TypedBuilder, Serialize)]
842+
#[builder(field_defaults(default, setter(into)))]
843+
#[serde(rename_all = "camelCase", deny_unknown_fields)]
844+
#[non_exhaustive]
845+
pub struct CreateIndexOptions {
846+
/// Specify the commit quorum needed to mark an `index` as ready.
847+
pub commit_quorum: Option<CommitQuorum>,
848+
849+
/// The maximum amount of time to allow the index to build.
850+
///
851+
/// This option maps to the `maxTimeMS` MongoDB query option, so the duration will be sent
852+
/// across the wire as an integer number of milliseconds.
853+
#[serde(
854+
rename = "maxTimeMS",
855+
default,
856+
serialize_with = "bson_util::serialize_duration_option_as_int_millis",
857+
deserialize_with = "bson_util::deserialize_duration_option_from_u64_millis"
858+
)]
859+
pub max_time: Option<Duration>,
860+
861+
/// The write concern for the operation.
862+
pub write_concern: Option<WriteConcern>,
863+
}
864+
834865
/// Specifies the options to a [`Collection::drop`](../struct.Collection.html#method.drop)
835866
/// operation.
836867
#[serde_with::skip_serializing_none]
@@ -842,3 +873,114 @@ pub struct DropCollectionOptions {
842873
/// The write concern for the operation.
843874
pub write_concern: Option<WriteConcern>,
844875
}
876+
877+
/// Specifies the options to a
878+
/// [`Collection::drop_index`](../struct.Collection.html#method.drop_index) or
879+
/// [`Collection::drop_indexes`](../struct.Collection.html#method.drop_indexes) operation.
880+
#[serde_with::skip_serializing_none]
881+
#[derive(Clone, Debug, Default, Deserialize, TypedBuilder, Serialize)]
882+
#[serde(rename_all = "camelCase", deny_unknown_fields)]
883+
#[builder(field_defaults(default, setter(into)))]
884+
#[non_exhaustive]
885+
pub struct DropIndexOptions {
886+
/// The maximum amount of time to allow the index to drop.
887+
///
888+
/// This option maps to the `maxTimeMS` MongoDB query option, so the duration will be sent
889+
/// across the wire as an integer number of milliseconds.
890+
#[serde(
891+
rename = "maxTimeMS",
892+
default,
893+
serialize_with = "bson_util::serialize_duration_option_as_int_millis",
894+
deserialize_with = "bson_util::deserialize_duration_option_from_u64_millis"
895+
)]
896+
pub max_time: Option<Duration>,
897+
898+
/// The write concern for the operation.
899+
pub write_concern: Option<WriteConcern>,
900+
}
901+
902+
/// Specifies the options to a
903+
/// [`Collection::list_indexes`](../struct.Collection.html#method.list_indexes) operation.
904+
#[serde_with::skip_serializing_none]
905+
#[derive(Clone, Debug, Default, Deserialize, TypedBuilder, Serialize)]
906+
#[serde(rename_all = "camelCase", deny_unknown_fields)]
907+
#[builder(field_defaults(default, setter(into)))]
908+
#[non_exhaustive]
909+
pub struct ListIndexOptions {
910+
/// The maximum amount of time to search for the index.
911+
///
912+
/// This option maps to the `maxTimeMS` MongoDB query option, so the duration will be sent
913+
/// across the wire as an integer number of milliseconds.
914+
#[serde(
915+
rename = "maxTimeMS",
916+
default,
917+
serialize_with = "bson_util::serialize_duration_option_as_int_millis",
918+
deserialize_with = "bson_util::deserialize_duration_option_from_u64_millis"
919+
)]
920+
pub max_time: Option<Duration>,
921+
922+
/// The number of indexes the server should return per cursor batch.
923+
#[serde(default, serialize_with = "bson_util::serialize_u32_option_as_i32")]
924+
pub batch_size: Option<u32>,
925+
}
926+
927+
/// The minimum number of data-bearing voting replica set members (i.e. commit quorum), including
928+
/// the primary, that must report a successful index build before the primary marks the indexes as
929+
/// ready.
930+
///
931+
/// For more information, see the [documentation](https://docs.mongodb.com/manual/reference/command/createIndexes/#definition)
932+
#[derive(Clone, Debug, PartialEq)]
933+
#[non_exhaustive]
934+
pub enum CommitQuorum {
935+
/// A specific number of voting replica set members. When set to 0, disables quorum voting.
936+
Nodes(u32),
937+
938+
/// All data-bearing voting replica set members (default).
939+
VotingMembers,
940+
941+
/// A simple majority of voting members.
942+
Majority,
943+
944+
/// A replica set tag name.
945+
Custom(String),
946+
}
947+
948+
impl Serialize for CommitQuorum {
949+
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
950+
where
951+
S: Serializer,
952+
{
953+
match self {
954+
CommitQuorum::Nodes(n) => serde_helpers::serialize_u32_as_i32(n, serializer),
955+
CommitQuorum::VotingMembers => serializer.serialize_str("votingMembers"),
956+
CommitQuorum::Majority => serializer.serialize_str("majority"),
957+
CommitQuorum::Custom(s) => serializer.serialize_str(s),
958+
}
959+
}
960+
}
961+
962+
impl<'de> Deserialize<'de> for CommitQuorum {
963+
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
964+
where
965+
D: Deserializer<'de>,
966+
{
967+
#[derive(Deserialize)]
968+
#[serde(untagged)]
969+
enum IntOrString {
970+
Int(u32),
971+
String(String),
972+
}
973+
match IntOrString::deserialize(deserializer)? {
974+
IntOrString::String(s) => {
975+
if s == "votingMembers" {
976+
Ok(CommitQuorum::VotingMembers)
977+
} else if s == "majority" {
978+
Ok(CommitQuorum::Majority)
979+
} else {
980+
Ok(CommitQuorum::Custom(s))
981+
}
982+
}
983+
IntOrString::Int(i) => Ok(CommitQuorum::Nodes(i)),
984+
}
985+
}
986+
}

src/index/mod.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
pub mod options;
2+
3+
use crate::bson::Document;
4+
5+
use self::options::*;
6+
use serde::{Deserialize, Serialize};
7+
8+
use typed_builder::TypedBuilder;
9+
10+
/// Specifies the fields and options for an index. For more information, see the [documentation](https://docs.mongodb.com/manual/indexes/).
11+
#[derive(Clone, Debug, Default, Deserialize, TypedBuilder, Serialize)]
12+
#[builder(field_defaults(default, setter(into)))]
13+
#[serde(rename_all = "camelCase")]
14+
#[non_exhaustive]
15+
pub struct IndexModel {
16+
/// Specifies the index’s fields. For each field, specify a key-value pair in which the key is
17+
/// the name of the field to index and the value is index type.
18+
#[serde(rename = "key")]
19+
keys: Document,
20+
21+
/// The options for the index.
22+
#[serde(flatten)]
23+
options: Option<IndexOptions>,
24+
}
25+
26+
impl IndexModel {
27+
/// If the client did not specify a name, generate and set it. Otherwise, do nothing.
28+
pub(crate) fn update_name(&mut self) {
29+
if self
30+
.options
31+
.as_ref()
32+
.and_then(|o| o.name.as_ref())
33+
.is_none()
34+
{
35+
let key_names: Vec<String> = self
36+
.keys
37+
.iter()
38+
.map(|(k, v)| format!("{}_{}", k, v.to_string()))
39+
.collect();
40+
self.options.get_or_insert(IndexOptions::default()).name = Some(key_names.join("_"));
41+
}
42+
}
43+
44+
pub(crate) fn get_name(&self) -> Option<String> {
45+
self.options.as_ref().and_then(|o| o.name.as_ref()).cloned()
46+
}
47+
48+
#[cfg(test)]
49+
pub(crate) fn is_unique(&self) -> bool {
50+
self.options
51+
.as_ref()
52+
.and_then(|o| o.unique)
53+
.unwrap_or(false)
54+
}
55+
}

0 commit comments

Comments
 (0)