Skip to content

Commit 41745bb

Browse files
RUST-645 / RUST-1245 Add Atlas planned maintenance testing workload executor (#657)
1 parent fa402be commit 41745bb

File tree

22 files changed

+1026
-367
lines changed

22 files changed

+1026
-367
lines changed

Cargo.toml

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,24 @@ exclude = [
2323
".evergreen/**",
2424
".gitignore",
2525
"src/test/**",
26-
"tests/**"
26+
"tests/**",
2727
]
2828

2929
[features]
3030
default = ["tokio-runtime"]
31-
tokio-runtime = ["tokio/macros", "tokio/net", "tokio/rt", "tokio/time", "serde_bytes"]
32-
async-std-runtime = ["async-std", "async-std/attributes", "async-std-resolver", "tokio-util/compat"]
31+
tokio-runtime = [
32+
"tokio/macros",
33+
"tokio/net",
34+
"tokio/rt",
35+
"tokio/time",
36+
"serde_bytes",
37+
]
38+
async-std-runtime = [
39+
"async-std",
40+
"async-std/attributes",
41+
"async-std-resolver",
42+
"tokio-util/compat",
43+
]
3344
sync = ["async-std-runtime"]
3445
tokio-sync = ["tokio-runtime"]
3546
openssl-tls = ["openssl", "openssl-probe", "tokio-openssl"]
@@ -76,7 +87,7 @@ rustls-pemfile = "0.3.0"
7687
serde_with = "1.3.1"
7788
sha-1 = "0.10.0"
7889
sha2 = "0.10.2"
79-
snap = { version = "1.0.5", optional = true}
90+
snap = { version = "1.0.5", optional = true }
8091
socket2 = "0.4.0"
8192
stringprep = "0.1.2"
8293
strsim = "0.10.0"
@@ -138,13 +149,15 @@ features = ["v4"]
138149
[dev-dependencies]
139150
approx = "0.5.1"
140151
async_once = "0.2.6"
152+
ctrlc = "3.2.2"
141153
derive_more = "0.99.13"
142154
function_name = "0.2.1"
143155
futures = "0.3"
144156
home = "0.5"
145157
pretty_assertions = "1.1.0"
146158
serde_json = "1.0.64"
147159
semver = "1.0.0"
160+
time = "0.3.9"
148161

149162
[package.metadata.docs.rs]
150163
rustdoc-args = ["--cfg", "docsrs"]

src/bson_util/mod.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use serde::{de::Error as SerdeDeError, ser, Deserialize, Deserializer, Serialize
99

1010
use crate::{
1111
bson::{doc, Bson, Document},
12-
error::{ErrorKind, Result},
12+
error::{Error, ErrorKind, Result},
1313
runtime::SyncLittleEndianRead,
1414
};
1515

@@ -217,6 +217,14 @@ pub(crate) fn read_document_bytes<R: Read>(mut reader: R) -> Result<Vec<u8>> {
217217
Ok(bytes)
218218
}
219219

220+
/// Serializes an Error as a string.
221+
pub(crate) fn serialize_error_as_string<S: Serializer>(
222+
val: &Error,
223+
serializer: S,
224+
) -> std::result::Result<S::Ok, S::Error> {
225+
serializer.serialize_str(&val.to_string())
226+
}
227+
220228
#[cfg(test)]
221229
mod test {
222230
use crate::bson_util::num_decimal_digits;

src/cmap/conn/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use std::{
88
};
99

1010
use derivative::Derivative;
11+
use serde::Serialize;
1112
use tokio::sync::{mpsc, Mutex};
1213

1314
use self::wire::Message;
@@ -37,7 +38,7 @@ pub(crate) use stream_description::StreamDescription;
3738
pub(crate) use wire::next_request_id;
3839

3940
/// User-facing information about a connection to the database.
40-
#[derive(Clone, Debug)]
41+
#[derive(Clone, Debug, Serialize)]
4142
pub struct ConnectionInfo {
4243
/// A driver-generated identifier that uniquely identifies the connection.
4344
pub id: u32,

src/cmap/test/event.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use std::{
33
time::Duration,
44
};
55

6-
use serde::{de::Unexpected, Deserialize, Deserializer};
6+
use serde::{de::Unexpected, Deserialize, Deserializer, Serialize};
77

88
use crate::{event::cmap::*, options::ServerAddress, runtime};
99
use tokio::sync::broadcast::error::{RecvError, SendError};
@@ -154,6 +154,27 @@ pub enum Event {
154154
ConnectionCheckedIn(ConnectionCheckedInEvent),
155155
}
156156

157+
impl Serialize for Event {
158+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
159+
where
160+
S: serde::Serializer,
161+
{
162+
match self {
163+
Self::PoolCreated(event) => event.serialize(serializer),
164+
Self::PoolClosed(event) => event.serialize(serializer),
165+
Self::PoolReady(event) => event.serialize(serializer),
166+
Self::ConnectionCreated(event) => event.serialize(serializer),
167+
Self::ConnectionReady(event) => event.serialize(serializer),
168+
Self::ConnectionClosed(event) => event.serialize(serializer),
169+
Self::ConnectionCheckOutStarted(event) => event.serialize(serializer),
170+
Self::ConnectionCheckOutFailed(event) => event.serialize(serializer),
171+
Self::ConnectionCheckedOut(event) => event.serialize(serializer),
172+
Self::PoolCleared(event) => event.serialize(serializer),
173+
Self::ConnectionCheckedIn(event) => event.serialize(serializer),
174+
}
175+
}
176+
}
177+
157178
impl Event {
158179
pub fn name(&self) -> &'static str {
159180
match self {
@@ -170,6 +191,24 @@ impl Event {
170191
Event::ConnectionCheckedIn(_) => "ConnectionCheckedIn",
171192
}
172193
}
194+
195+
// The names in drivers-atlas-testing tests are slightly different than those used in spec
196+
// tests.
197+
pub fn planned_maintenance_testing_name(&self) -> &'static str {
198+
match self {
199+
Event::PoolCreated(_) => "PoolCreatedEvent",
200+
Event::PoolReady(_) => "PoolReadyEvent",
201+
Event::PoolCleared(_) => "PoolClearedEvent",
202+
Event::PoolClosed(_) => "PoolClosedEvent",
203+
Event::ConnectionCreated(_) => "ConnectionCreatedEvent",
204+
Event::ConnectionReady(_) => "ConnectionReadyEvent",
205+
Event::ConnectionClosed(_) => "ConnectionClosedEvent",
206+
Event::ConnectionCheckOutStarted(_) => "ConnectionCheckOutStartedEvent",
207+
Event::ConnectionCheckOutFailed(_) => "ConnectionCheckOutFailedEvent",
208+
Event::ConnectionCheckedOut(_) => "ConnectionCheckedOutEvent",
209+
Event::ConnectionCheckedIn(_) => "ConnectionCheckedInEvent",
210+
}
211+
}
173212
}
174213

175214
#[derive(Debug, Deserialize)]

src/event/cmap.rs

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
44
use std::time::Duration;
55

6-
use serde::Deserialize;
6+
use serde::{Deserialize, Serialize};
77

88
use crate::{bson::oid::ObjectId, bson_util, options::ServerAddress};
99

@@ -20,12 +20,12 @@ fn empty_address() -> ServerAddress {
2020
}
2121

2222
/// Event emitted when a connection pool is created.
23-
#[derive(Clone, Debug, Deserialize, PartialEq)]
23+
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
2424
#[non_exhaustive]
2525
pub struct PoolCreatedEvent {
2626
/// The address of the server that the pool's connections will connect to.
2727
#[serde(default = "self::empty_address")]
28-
#[serde(skip)]
28+
#[serde(skip_deserializing)]
2929
pub address: ServerAddress,
3030

3131
/// The options used for the pool.
@@ -34,7 +34,7 @@ pub struct PoolCreatedEvent {
3434

3535
/// Contains the options for creating a connection pool. While these options are specified at the
3636
/// client-level, `ConnectionPoolOptions` is exposed for the purpose of CMAP event handling.
37-
#[derive(Clone, Default, Deserialize, Debug, PartialEq)]
37+
#[derive(Clone, Default, Deserialize, Debug, PartialEq, Serialize)]
3838
#[serde(rename_all = "camelCase")]
3939
#[non_exhaustive]
4040
pub struct ConnectionPoolOptions {
@@ -62,46 +62,46 @@ pub struct ConnectionPoolOptions {
6262
}
6363

6464
/// Event emitted when a connection pool becomes ready.
65-
#[derive(Clone, Debug, Deserialize, PartialEq)]
65+
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
6666
#[non_exhaustive]
6767
pub struct PoolReadyEvent {
6868
/// The address of the server that the pool's connections will connect to.
6969
#[serde(default = "self::empty_address")]
70-
#[serde(skip)]
70+
#[serde(skip_deserializing)]
7171
pub address: ServerAddress,
7272
}
7373

7474
/// Event emitted when a connection pool is cleared.
75-
#[derive(Clone, Debug, Deserialize, PartialEq)]
75+
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
7676
#[non_exhaustive]
7777
pub struct PoolClearedEvent {
7878
/// The address of the server that the pool's connections will connect to.
7979
#[serde(default = "self::empty_address")]
80-
#[serde(skip)]
80+
#[serde(skip_deserializing)]
8181
pub address: ServerAddress,
8282

8383
/// If the connection is to a load balancer, the id of the selected backend.
8484
pub service_id: Option<ObjectId>,
8585
}
8686

8787
/// Event emitted when a connection pool is cleared.
88-
#[derive(Clone, Debug, Deserialize, PartialEq)]
88+
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
8989
#[non_exhaustive]
9090
pub struct PoolClosedEvent {
9191
/// The address of the server that the pool's connections will connect to.
9292
#[serde(default = "self::empty_address")]
93-
#[serde(skip)]
93+
#[serde(skip_deserializing)]
9494
pub address: ServerAddress,
9595
}
9696

9797
/// Event emitted when a connection is created.
98-
#[derive(Clone, Debug, Deserialize, PartialEq)]
98+
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
9999
#[serde(rename_all = "camelCase")]
100100
#[non_exhaustive]
101101
pub struct ConnectionCreatedEvent {
102102
/// The address of the server that the connection will connect to.
103103
#[serde(default = "self::empty_address")]
104-
#[serde(skip)]
104+
#[serde(skip_deserializing)]
105105
pub address: ServerAddress,
106106

107107
/// The unique ID of the connection. This is not used for anything internally, but can be used
@@ -112,13 +112,13 @@ pub struct ConnectionCreatedEvent {
112112

113113
/// Event emitted when a connection is ready to be used. This indicates that all the necessary
114114
/// prerequisites for using a connection (handshake, authentication, etc.) have been completed.
115-
#[derive(Clone, Debug, Deserialize, PartialEq)]
115+
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
116116
#[serde(rename_all = "camelCase")]
117117
#[non_exhaustive]
118118
pub struct ConnectionReadyEvent {
119119
/// The address of the server that the connection is connected to.
120120
#[serde(default = "self::empty_address")]
121-
#[serde(skip)]
121+
#[serde(skip_deserializing)]
122122
pub address: ServerAddress,
123123

124124
/// The unique ID of the connection. This is not used for anything internally, but can be used
@@ -128,13 +128,13 @@ pub struct ConnectionReadyEvent {
128128
}
129129

130130
/// Event emitted when a connection is closed.
131-
#[derive(Clone, Debug, Deserialize, PartialEq)]
131+
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
132132
#[serde(rename_all = "camelCase")]
133133
#[non_exhaustive]
134134
pub struct ConnectionClosedEvent {
135135
/// The address of the server that the connection was connected to.
136136
#[serde(default = "self::empty_address")]
137-
#[serde(skip)]
137+
#[serde(skip_deserializing)]
138138
pub address: ServerAddress,
139139

140140
/// The unique ID of the connection. This is not used for anything internally, but can be used
@@ -147,7 +147,7 @@ pub struct ConnectionClosedEvent {
147147
}
148148

149149
/// The reasons that a connection may be closed.
150-
#[derive(Clone, Debug, Deserialize, PartialEq)]
150+
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
151151
#[serde(rename_all = "camelCase")]
152152
#[non_exhaustive]
153153
pub enum ConnectionClosedReason {
@@ -168,30 +168,30 @@ pub enum ConnectionClosedReason {
168168
}
169169

170170
/// Event emitted when a thread begins checking out a connection to use for an operation.
171-
#[derive(Clone, Debug, Deserialize, PartialEq)]
171+
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
172172
#[non_exhaustive]
173173
pub struct ConnectionCheckoutStartedEvent {
174174
/// The address of the server that the connection will connect to.
175175
#[serde(default = "self::empty_address")]
176-
#[serde(skip)]
176+
#[serde(skip_deserializing)]
177177
pub address: ServerAddress,
178178
}
179179

180180
/// Event emitted when a thread is unable to check out a connection.
181-
#[derive(Clone, Debug, Deserialize, PartialEq)]
181+
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
182182
#[non_exhaustive]
183183
pub struct ConnectionCheckoutFailedEvent {
184184
/// The address of the server that the connection would have connected to.
185185
#[serde(default = "self::empty_address")]
186-
#[serde(skip)]
186+
#[serde(skip_deserializing)]
187187
pub address: ServerAddress,
188188

189189
/// The reason a connection was unable to be checked out.
190190
pub reason: ConnectionCheckoutFailedReason,
191191
}
192192

193193
/// The reasons a connection may not be able to be checked out.
194-
#[derive(Clone, Debug, Deserialize, PartialEq)]
194+
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
195195
#[serde(rename_all = "camelCase")]
196196
#[non_exhaustive]
197197
pub enum ConnectionCheckoutFailedReason {
@@ -204,13 +204,13 @@ pub enum ConnectionCheckoutFailedReason {
204204
}
205205

206206
/// Event emitted when a connection is successfully checked out.
207-
#[derive(Clone, Debug, Deserialize, PartialEq)]
207+
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
208208
#[serde(rename_all = "camelCase")]
209209
#[non_exhaustive]
210210
pub struct ConnectionCheckedOutEvent {
211211
/// The address of the server that the connection will connect to.
212212
#[serde(default = "self::empty_address")]
213-
#[serde(skip)]
213+
#[serde(skip_deserializing)]
214214
pub address: ServerAddress,
215215

216216
/// The unique ID of the connection. This is not used for anything internally, but can be used
@@ -220,13 +220,13 @@ pub struct ConnectionCheckedOutEvent {
220220
}
221221

222222
/// Event emitted when a connection is checked back into a connection pool.
223-
#[derive(Clone, Debug, Deserialize, PartialEq)]
223+
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
224224
#[serde(rename_all = "camelCase")]
225225
#[non_exhaustive]
226226
pub struct ConnectionCheckedInEvent {
227227
/// The address of the server that the connection was connected to.
228228
#[serde(default = "self::empty_address")]
229-
#[serde(skip)]
229+
#[serde(skip_deserializing)]
230230
pub address: ServerAddress,
231231

232232
/// The unique ID of the connection. This is not used for anything internally, but can be used

0 commit comments

Comments
 (0)