Skip to content

Commit 8768f63

Browse files
authored
RUST-981 Merge new load balancer spec tests (#480)
1 parent 7caebdf commit 8768f63

38 files changed

+5960
-212
lines changed

src/client/executor.rs

Lines changed: 45 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,13 @@ use std::{collections::HashSet, sync::Arc, time::Instant};
77
use super::{session::TransactionState, Client, ClientSession};
88
use crate::{
99
bson::Document,
10-
cmap::{conn::PinnedConnectionHandle, Connection, RawCommand, RawCommandResponse},
10+
cmap::{
11+
conn::PinnedConnectionHandle,
12+
Connection,
13+
ConnectionPool,
14+
RawCommand,
15+
RawCommandResponse,
16+
},
1117
cursor::{session::SessionCursor, Cursor, CursorSpecification},
1218
error::{
1319
Error,
@@ -158,11 +164,13 @@ impl Client {
158164
Op: Operation<O = CursorSpecification<T>>,
159165
T: DeserializeOwned + Unpin + Send + Sync,
160166
{
161-
let mut details = self.execute_operation_with_details(op, session).await?;
162-
let pinned = if details.output.connection.is_pinned() {
163-
// Cursor operations on load-balanced transactions will be pinned via the transaction
164-
// pin.
165-
None
167+
let mut details = self
168+
.execute_operation_with_details(op, &mut *session)
169+
.await?;
170+
171+
let pinned = if let Some(handle) = session.transaction.pinned_connection() {
172+
// Cursor operations on a transaction share the same pinned connection.
173+
Some(handle.replicate())
166174
} else {
167175
self.pin_connection_for_cursor(&mut details.output)?
168176
};
@@ -225,20 +233,17 @@ impl Client {
225233
}
226234
};
227235

228-
let mut conn = match op.pinned_connection() {
229-
Some(l) => l.take_connection().await?,
230-
None => match server.pool.check_out().await {
231-
Ok(c) => c,
232-
Err(mut err) => {
233-
err.add_labels_and_update_pin(None, &mut session, None)?;
236+
let mut conn = match get_connection(&session, &op, &server.pool).await {
237+
Ok(c) => c,
238+
Err(mut err) => {
239+
err.add_labels_and_update_pin(None, &mut session, None)?;
234240

235-
if err.is_pool_cleared() {
236-
return self.execute_retry(&mut op, &mut session, None, err).await;
237-
} else {
238-
return Err(err);
239-
}
241+
if err.is_pool_cleared() {
242+
return self.execute_retry(&mut op, &mut session, None, err).await;
243+
} else {
244+
return Err(err);
240245
}
241-
},
246+
}
242247
};
243248

244249
let retryability = self.get_retryability(&conn, &op, &session).await?;
@@ -326,23 +331,9 @@ impl Client {
326331
}
327332
};
328333

329-
let session_pinned = session
330-
.as_ref()
331-
.and_then(|s| s.transaction.pinned_connection());
332-
let mut conn = match (session_pinned, op.pinned_connection()) {
333-
(Some(c), None) | (None, Some(c)) => c.take_connection().await?,
334-
(Some(c), Some(_)) => {
335-
// An operation executing in a transaction should never have a pinned connection,
336-
// but in case it does, prefer the transaction's pin.
337-
if cfg!(debug_assertions) {
338-
panic!("pinned operation executing in pinned transaction");
339-
}
340-
c.take_connection().await?
341-
}
342-
(None, None) => match server.pool.check_out().await {
343-
Ok(c) => c,
344-
Err(_) => return Err(first_error),
345-
},
334+
let mut conn = match get_connection(session, op, &server.pool).await {
335+
Ok(c) => c,
336+
Err(_) => return Err(first_error),
346337
};
347338

348339
let retryability = self.get_retryability(&conn, op, session).await?;
@@ -776,6 +767,25 @@ impl Client {
776767
}
777768
}
778769

770+
async fn get_connection<T: Operation>(
771+
session: &Option<&mut ClientSession>,
772+
op: &T,
773+
pool: &ConnectionPool,
774+
) -> Result<Connection> {
775+
let session_pinned = session
776+
.as_ref()
777+
.and_then(|s| s.transaction.pinned_connection());
778+
match (session_pinned, op.pinned_connection()) {
779+
(Some(c), None) | (None, Some(c)) => c.take_connection().await,
780+
(Some(session_handle), Some(op_handle)) => {
781+
// An operation executing in a transaction should be sharing the same pinned connection.
782+
debug_assert_eq!(session_handle.id(), op_handle.id());
783+
session_handle.take_connection().await
784+
}
785+
(None, None) => pool.check_out().await,
786+
}
787+
}
788+
779789
impl Error {
780790
/// Adds the necessary labels to this Error, and unpins the session if needed.
781791
///

src/cmap/conn/mod.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -345,11 +345,6 @@ impl Connection {
345345
})
346346
}
347347

348-
/// Whether this connection has a live `PinnedConnectionHandle`.
349-
pub(crate) fn is_pinned(&self) -> bool {
350-
self.pinned_sender.is_some()
351-
}
352-
353348
/// Close this connection, emitting a `ConnectionClosedEvent` with the supplied reason.
354349
pub(super) fn close_and_drop(mut self, reason: ConnectionClosedReason) {
355350
self.close(reason);
@@ -470,12 +465,8 @@ impl PinnedConnectionHandle {
470465
})
471466
}
472467

473-
/// Return the pinned connection to the normal connection pool.
474-
pub(crate) async fn unpin_connection(&self) {
475-
let mut receiver = self.receiver.lock().await;
476-
receiver.close();
477-
// Ensure any connections buffered in the channel are dropped, returning them to the pool.
478-
while receiver.recv().await.is_some() {}
468+
pub(crate) fn id(&self) -> u32 {
469+
self.id
479470
}
480471
}
481472

src/cmap/test/mod.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ const TEST_DESCRIPTIONS_TO_SKIP: &[&str] = &[
3939
// WaitQueueTimeoutMS is not supported
4040
"must aggressively timeout threads enqueued longer than waitQueueTimeoutMS",
4141
"waiting on maxConnecting is limited by WaitQueueTimeoutMS",
42+
// TODO DRIVERS-1785 remove this skip when test event order is fixed
43+
"error during minPoolSize population clears pool",
4244
];
4345

4446
/// Many different types of CMAP events are emitted from tasks spawned in the drop
@@ -165,9 +167,12 @@ impl Executor {
165167
let manager = pool.manager.clone();
166168
RUNTIME.execute(async move {
167169
while let Some(update) = update_receiver.recv().await {
168-
match update.into_message() {
169-
ServerUpdate::Error { error, .. } => manager.clear(error.cause, None).await,
170+
match update.message() {
171+
ServerUpdate::Error { error, .. } => {
172+
manager.clear(error.cause.clone(), None).await
173+
}
170174
}
175+
drop(update);
171176
}
172177
});
173178

@@ -426,6 +431,7 @@ async fn cmap_spec_tests() {
426431

427432
let mut options = CLIENT_OPTIONS.clone();
428433
if options.load_balanced.unwrap_or(false) {
434+
println!("skipping due to load balanced topology");
429435
return;
430436
}
431437
options.hosts.drain(1..);
@@ -434,6 +440,7 @@ async fn cmap_spec_tests() {
434440
if let Some(ref run_on) = test_file.run_on {
435441
let can_run_on = run_on.iter().any(|run_on| run_on.can_run_on(&client));
436442
if !can_run_on {
443+
println!("skipping due to runOn requirements");
437444
return;
438445
}
439446
}

src/cmap/worker.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,7 @@ async fn establish_connection(
646646

647647
match establish_result {
648648
Err(ref e) => {
649+
server_updater.handle_error(e.clone()).await;
649650
if let Some(handler) = event_handler {
650651
let event = ConnectionClosedEvent {
651652
address,
@@ -654,7 +655,6 @@ async fn establish_connection(
654655
};
655656
handler.handle_connection_closed_event(event);
656657
}
657-
server_updater.handle_error(e.clone()).await;
658658
manager.handle_connection_failed();
659659
}
660660
Ok(ref mut connection) => {

src/coll/options.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -920,7 +920,7 @@ pub struct ListIndexesOptions {
920920
pub max_time: Option<Duration>,
921921

922922
/// The number of indexes the server should return per cursor batch.
923-
#[serde(default, serialize_with = "bson_util::serialize_u32_option_as_i32")]
923+
#[serde(default, skip_serializing)]
924924
pub batch_size: Option<u32>,
925925
}
926926

src/cursor/common.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -283,12 +283,6 @@ impl PinnedConnection {
283283
matches!(self, Self::Invalid(_))
284284
}
285285

286-
async fn unpin(&self) {
287-
if let Some(h) = self.handle() {
288-
h.unpin_connection().await;
289-
}
290-
}
291-
292286
fn invalidate(&mut self) {
293287
take_mut::take(self, |self_| {
294288
if let Self::Valid(c) = self_ {
@@ -318,6 +312,5 @@ pub(super) fn kill_cursor(
318312
let _ = tx.send(());
319313
}
320314
}
321-
pinned_conn.unpin().await;
322315
});
323316
}

src/operation/list_indexes/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@ impl Operation for ListIndexes {
4848
let mut body = doc! {
4949
"listIndexes": self.ns.coll.clone(),
5050
};
51+
if let Some(size) = self.options.as_ref().and_then(|o| o.batch_size) {
52+
body.insert("cursor", doc! { "batchSize": size });
53+
}
5154
append_options(&mut body, self.options.as_ref())?;
5255

5356
Ok(Command::new(

src/operation/list_indexes/test.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ async fn build() {
3333
doc! {
3434
"listIndexes": "test_coll",
3535
"maxTimeMS": 42,
36-
"batchSize": 4,
36+
"cursor": doc! {
37+
"batchSize": 4,
38+
},
3739
}
3840
);
3941
}

src/runtime/acknowledged_message.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ impl<M, R> AcknowledgedMessage<M, R> {
2020
)
2121
}
2222

23-
/// Get the message.
24-
pub(crate) fn into_message(self) -> M {
25-
self.message
23+
/// Borrow the message.
24+
pub(crate) fn message(&self) -> &M {
25+
&self.message
2626
}
2727

2828
/// Send acknowledgement to the receiver.

src/sdam/description/topology/mod.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -261,12 +261,8 @@ impl TopologyDescription {
261261
match read_preference {
262262
ReadPreference::Secondary { .. }
263263
| ReadPreference::PrimaryPreferred { .. }
264-
| ReadPreference::Nearest { .. } => {
265-
command.set_read_preference(read_preference.clone())
266-
}
267-
ReadPreference::SecondaryPreferred { ref options }
268-
if options.max_staleness.is_some() || options.tag_sets.is_some() =>
269-
{
264+
| ReadPreference::Nearest { .. }
265+
| ReadPreference::SecondaryPreferred { .. } => {
270266
command.set_read_preference(read_preference.clone())
271267
}
272268
_ => {}

0 commit comments

Comments
 (0)