Skip to content

Commit e782397

Browse files
committed
RUST-289 Expose operations' write concerns
1 parent 77c8ca9 commit e782397

File tree

15 files changed

+244
-15
lines changed

15 files changed

+244
-15
lines changed

src/client/executor.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ impl Client {
9393
op: &T,
9494
connection: &mut Connection,
9595
) -> Result<T::O> {
96+
if let Some(wc) = op.write_concern() {
97+
wc.validate()?;
98+
}
99+
96100
let mut cmd = op.build(connection.stream_description()?)?;
97101
self.inner
98102
.topology

src/concern.rs renamed to src/concern/mod.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
//! Contains the types for read concerns and write concerns.
22
3+
#[cfg(test)]
4+
mod test;
5+
36
use std::time::Duration;
47

58
use bson::doc;
6-
use serde::{Serialize, Serializer};
9+
use serde::{Deserialize, Deserializer, Serialize, Serializer};
710
use serde_with::skip_serializing_none;
811
use typed_builder::TypedBuilder;
912

@@ -72,7 +75,7 @@ impl Serialize for ReadConcern {
7275
/// See the documentation [here](https://docs.mongodb.com/manual/reference/write-concern/) for more
7376
/// information about write concerns.
7477
#[skip_serializing_none]
75-
#[derive(Clone, Debug, Default, PartialEq, TypedBuilder, Serialize)]
78+
#[derive(Clone, Debug, Default, PartialEq, TypedBuilder, Serialize, Deserialize)]
7679
pub struct WriteConcern {
7780
/// Requests acknowledgement that the operation has propagated to a specific number or variety
7881
/// of servers.
@@ -88,6 +91,8 @@ pub struct WriteConcern {
8891
#[builder(default)]
8992
#[serde(rename = "wtimeout")]
9093
#[serde(serialize_with = "bson_util::serialize_duration_as_i64_millis")]
94+
#[serde(deserialize_with = "bson_util::deserialize_duration_from_u64_millis")]
95+
#[serde(default)]
9196
pub w_timeout: Option<Duration>,
9297

9398
/// Requests acknowledgement that the operation has propagated to the on-disk journal.
@@ -121,6 +126,24 @@ impl Serialize for Acknowledgment {
121126
}
122127
}
123128

129+
impl<'de> Deserialize<'de> for Acknowledgment {
130+
fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
131+
where
132+
D: Deserializer<'de>,
133+
{
134+
#[derive(Deserialize)]
135+
#[serde(untagged)]
136+
enum IntOrString {
137+
Int(i32),
138+
String(String),
139+
}
140+
match IntOrString::deserialize(deserializer)? {
141+
IntOrString::String(s) => Ok(s.into()),
142+
IntOrString::Int(i) => Ok(i.into()),
143+
}
144+
}
145+
}
146+
124147
impl From<i32> for Acknowledgment {
125148
fn from(i: i32) -> Self {
126149
Acknowledgment::Nodes(i)
@@ -152,6 +175,11 @@ impl Acknowledgment {
152175
}
153176

154177
impl WriteConcern {
178+
#[allow(dead_code)]
179+
pub(crate) fn is_acknowledged(&self) -> bool {
180+
self.w != Some(Acknowledgment::Nodes(0)) || self.journal == Some(true)
181+
}
182+
155183
/// Validates that the write concern. A write concern is invalid if the `w` field is 0
156184
/// and the `j` field is `true`.
157185
pub fn validate(&self) -> Result<()> {

src/concern/test.rs

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
use std::time::Duration;
2+
3+
use bson::{doc, Bson};
4+
5+
use crate::{
6+
error::ErrorKind,
7+
options::{Acknowledgment, InsertOneOptions, WriteConcern},
8+
test::{TestClient, LOCK},
9+
};
10+
11+
#[test]
12+
fn write_concern_is_acknowledged() {
13+
let w_1 = WriteConcern::builder()
14+
.w(Acknowledgment::Nodes(1))
15+
.journal(false)
16+
.build();
17+
assert!(w_1.is_acknowledged());
18+
19+
let w_majority = WriteConcern::builder()
20+
.w(Acknowledgment::Majority)
21+
.journal(false)
22+
.build();
23+
assert!(w_majority.is_acknowledged());
24+
25+
let w_0 = WriteConcern::builder()
26+
.w(Acknowledgment::Nodes(0))
27+
.journal(false)
28+
.build();
29+
assert!(!w_0.is_acknowledged());
30+
31+
let w_0 = WriteConcern::builder().w(Acknowledgment::Nodes(0)).build();
32+
assert!(!w_0.is_acknowledged());
33+
34+
let empty = WriteConcern::builder().build();
35+
assert!(empty.is_acknowledged());
36+
37+
let empty = WriteConcern::builder().journal(false).build();
38+
assert!(empty.is_acknowledged());
39+
40+
let empty = WriteConcern::builder().journal(true).build();
41+
assert!(empty.is_acknowledged());
42+
}
43+
44+
#[test]
45+
fn write_concern_deserialize() {
46+
let w_1 = doc! { "w": 1 };
47+
let wc: WriteConcern = bson::from_bson(Bson::Document(w_1)).unwrap();
48+
assert_eq!(
49+
wc,
50+
WriteConcern {
51+
w: Acknowledgment::Nodes(1).into(),
52+
w_timeout: None,
53+
journal: None
54+
}
55+
);
56+
57+
let w_majority = doc! { "w": "majority" };
58+
let wc: WriteConcern = bson::from_bson(Bson::Document(w_majority)).unwrap();
59+
assert_eq!(
60+
wc,
61+
WriteConcern {
62+
w: Acknowledgment::Majority.into(),
63+
w_timeout: None,
64+
journal: None
65+
}
66+
);
67+
68+
let w_timeout = doc! { "w": "majority", "wtimeout": 100 };
69+
let wc: WriteConcern = bson::from_bson(Bson::Document(w_timeout)).unwrap();
70+
assert_eq!(
71+
wc,
72+
WriteConcern {
73+
w: Acknowledgment::Majority.into(),
74+
w_timeout: Duration::from_millis(100).into(),
75+
journal: None
76+
}
77+
);
78+
79+
let journal = doc! { "w": "majority", "j": true };
80+
let wc: WriteConcern = bson::from_bson(Bson::Document(journal)).unwrap();
81+
assert_eq!(
82+
wc,
83+
WriteConcern {
84+
w: Acknowledgment::Majority.into(),
85+
w_timeout: None,
86+
journal: true.into()
87+
}
88+
);
89+
}
90+
91+
#[cfg_attr(feature = "tokio-runtime", tokio::test)]
92+
#[cfg_attr(feature = "async-std-runtime", async_std::test)]
93+
#[function_name::named]
94+
async fn inconsistent_write_concern_rejected() {
95+
let _guard = LOCK.run_concurrently().await;
96+
97+
let client = TestClient::new().await;
98+
let db = client.database(function_name!());
99+
let error = db
100+
.run_command(
101+
doc! {
102+
"insert": function_name!(),
103+
"documents": [ {} ],
104+
"writeConcern": { "w": 0, "j": true }
105+
},
106+
None,
107+
)
108+
.await
109+
.expect_err("insert should fail");
110+
assert!(matches!(error.kind.as_ref(), ErrorKind::ArgumentError { .. }));
111+
112+
let coll = db.collection(function_name!());
113+
let wc = WriteConcern {
114+
w: Acknowledgment::Nodes(0).into(),
115+
journal: true.into(),
116+
w_timeout: None,
117+
};
118+
let options = InsertOneOptions::builder().write_concern(wc).build();
119+
let error = coll
120+
.insert_one(doc! {}, options)
121+
.await
122+
.expect_err("insert should fail");
123+
assert!(matches!(error.kind.as_ref(), ErrorKind::ArgumentError { .. }));
124+
}

src/db/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ impl Database {
231231
command: Document,
232232
selection_criteria: impl Into<Option<SelectionCriteria>>,
233233
) -> Result<Document> {
234-
let operation = RunCommand::new(self.name().into(), command, selection_criteria.into());
234+
let operation = RunCommand::new(self.name().into(), command, selection_criteria.into())?;
235235
self.client().execute_operation(&operation, None).await
236236
}
237237

src/operation/aggregate/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::{
99
cursor::CursorSpecification,
1010
error::Result,
1111
operation::{append_options, CursorBody, Operation, WriteConcernOnlyBody},
12-
options::{AggregateOptions, SelectionCriteria},
12+
options::{AggregateOptions, SelectionCriteria, WriteConcern},
1313
Namespace,
1414
};
1515

@@ -87,6 +87,12 @@ impl Operation for Aggregate {
8787
.as_ref()
8888
.and_then(|opts| opts.selection_criteria.as_ref())
8989
}
90+
91+
fn write_concern(&self) -> Option<&WriteConcern> {
92+
self.options
93+
.as_ref()
94+
.and_then(|opts| opts.write_concern.as_ref())
95+
}
9096
}
9197

9298
impl Aggregate {

src/operation/create/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::{
77
cmap::{Command, CommandResponse, StreamDescription},
88
error::Result,
99
operation::{append_options, Operation, WriteConcernOnlyBody},
10-
options::CreateCollectionOptions,
10+
options::{CreateCollectionOptions, WriteConcern},
1111
Namespace,
1212
};
1313

@@ -54,4 +54,10 @@ impl Operation for Create {
5454
fn handle_response(&self, response: CommandResponse) -> Result<Self::O> {
5555
response.body::<WriteConcernOnlyBody>()?.validate()
5656
}
57+
58+
fn write_concern(&self) -> Option<&WriteConcern> {
59+
self.options
60+
.as_ref()
61+
.and_then(|opts| opts.write_concern.as_ref())
62+
}
5763
}

src/operation/delete/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::{
99
collation::Collation,
1010
error::{convert_bulk_errors, Result},
1111
operation::{append_options, Operation, WriteResponseBody},
12-
options::DeleteOptions,
12+
options::{DeleteOptions, WriteConcern},
1313
results::DeleteResult,
1414
};
1515

@@ -88,4 +88,10 @@ impl Operation for Delete {
8888
deleted_count: body.n,
8989
})
9090
}
91+
92+
fn write_concern(&self) -> Option<&WriteConcern> {
93+
self.options
94+
.as_ref()
95+
.and_then(|opts| opts.write_concern.as_ref())
96+
}
9197
}

src/operation/drop_collection/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::{
77
cmap::{Command, CommandResponse, StreamDescription},
88
error::Result,
99
operation::{append_options, Operation, WriteConcernOnlyBody},
10-
options::DropCollectionOptions,
10+
options::{DropCollectionOptions, WriteConcern},
1111
Namespace,
1212
};
1313

@@ -55,4 +55,10 @@ impl Operation for DropCollection {
5555
fn handle_response(&self, response: CommandResponse) -> Result<Self::O> {
5656
response.body::<WriteConcernOnlyBody>()?.validate()
5757
}
58+
59+
fn write_concern(&self) -> Option<&WriteConcern> {
60+
self.options
61+
.as_ref()
62+
.and_then(|opts| opts.write_concern.as_ref())
63+
}
5864
}

src/operation/drop_database/mod.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::{
77
cmap::{Command, CommandResponse, StreamDescription},
88
error::Result,
99
operation::{append_options, Operation, WriteConcernOnlyBody},
10-
options::DropDatabaseOptions,
10+
options::{DropDatabaseOptions, WriteConcern},
1111
};
1212

1313
#[derive(Debug)]
@@ -48,4 +48,10 @@ impl Operation for DropDatabase {
4848
fn handle_response(&self, response: CommandResponse) -> Result<Self::O> {
4949
response.body::<WriteConcernOnlyBody>()?.validate()
5050
}
51+
52+
fn write_concern(&self) -> Option<&WriteConcern> {
53+
self.options
54+
.as_ref()
55+
.and_then(|opts| opts.write_concern.as_ref())
56+
}
5157
}

src/operation/find_and_modify/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use crate::{
2020
},
2121
error::{ErrorKind, Result},
2222
operation::{append_options, Operation},
23+
options::WriteConcern,
2324
};
2425

2526
pub(crate) struct FindAndModify {
@@ -88,6 +89,7 @@ impl Operation for FindAndModify {
8889
body,
8990
))
9091
}
92+
9193
fn handle_response(&self, response: CommandResponse) -> Result<Self::O> {
9294
let body: ResponseBody = response.body()?;
9395
match body.value {
@@ -103,6 +105,10 @@ impl Operation for FindAndModify {
103105
.into()),
104106
}
105107
}
108+
109+
fn write_concern(&self) -> Option<&WriteConcern> {
110+
self.options.write_concern.as_ref()
111+
}
106112
}
107113

108114
#[derive(Debug, Deserialize)]

0 commit comments

Comments
 (0)