Skip to content

Commit c54a463

Browse files
authored
Do not retry requests that have been dropped by the user. (#189)
valkey-io/valkey-glide#2138
1 parent 426bb99 commit c54a463

File tree

5 files changed

+55
-7
lines changed

5 files changed

+55
-7
lines changed

redis/benches/bench_basic.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@ fn bench_simple_getsetdel_async(b: &mut Bencher) {
2828
runtime
2929
.block_on(async {
3030
let key = "test_key";
31-
redis::cmd("SET")
31+
() = redis::cmd("SET")
3232
.arg(key)
3333
.arg(42)
3434
.query_async(&mut con)
3535
.await?;
3636
let _: isize = redis::cmd("GET").arg(key).query_async(&mut con).await?;
37-
redis::cmd("DEL").arg(key).query_async(&mut con).await?;
37+
() = redis::cmd("DEL").arg(key).query_async(&mut con).await?;
3838
Ok::<_, RedisError>(())
3939
})
4040
.unwrap()

redis/benches/bench_cluster_async.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ fn bench_cluster_async(
2121
runtime
2222
.block_on(async {
2323
let key = "test_key";
24-
redis::cmd("SET").arg(key).arg(42).query_async(con).await?;
24+
() = redis::cmd("SET").arg(key).arg(42).query_async(con).await?;
2525
let _: isize = redis::cmd("GET").arg(key).query_async(con).await?;
26-
redis::cmd("DEL").arg(key).query_async(con).await?;
26+
() = redis::cmd("DEL").arg(key).query_async(con).await?;
2727

2828
Ok::<_, RedisError>(())
2929
})

redis/src/cluster_async/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -835,7 +835,9 @@ impl<C> Future for Request<C> {
835835

836836
fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output> {
837837
let mut this = self.as_mut().project();
838-
if this.request.is_none() {
838+
// If the sender is closed, the caller is no longer waiting for the reply, and it is ambiguous
839+
// whether they expect the side-effect of the request to happen or not.
840+
if this.request.is_none() || this.request.as_ref().unwrap().sender.is_closed() {
839841
return Poll::Ready(Next::Done);
840842
}
841843
let future = match this.future.as_mut().project() {

redis/tests/support/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -653,7 +653,7 @@ pub fn build_keys_and_certs_for_tls(tempdir: &TempDir) -> TlsFilePaths {
653653
.arg("genrsa")
654654
.arg("-out")
655655
.arg(name)
656-
.arg(&format!("{size}"))
656+
.arg(format!("{size}"))
657657
.stdout(process::Stdio::null())
658658
.stderr(process::Stdio::null())
659659
.spawn()

redis/tests/test_cluster_async.rs

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ mod cluster_async {
1616
};
1717

1818
use futures::prelude::*;
19-
use futures_time::task::sleep;
19+
use futures_time::{future::FutureExt, task::sleep};
2020
use once_cell::sync::Lazy;
2121
use std::ops::Add;
2222

@@ -4142,6 +4142,52 @@ mod cluster_async {
41424142
.unwrap();
41434143
}
41444144

4145+
#[test]
4146+
fn test_async_cluster_do_not_retry_when_receiver_was_dropped() {
4147+
let name = "test_async_cluster_do_not_retry_when_receiver_was_dropped";
4148+
let cmd = cmd("FAKE_COMMAND");
4149+
let packed_cmd = cmd.get_packed_command();
4150+
let request_counter = Arc::new(AtomicU32::new(0));
4151+
let cloned_req_counter = request_counter.clone();
4152+
let MockEnv {
4153+
runtime,
4154+
async_connection: mut connection,
4155+
..
4156+
} = MockEnv::with_client_builder(
4157+
ClusterClient::builder(vec![&*format!("redis://{name}")])
4158+
.retries(5)
4159+
.max_retry_wait(2)
4160+
.min_retry_wait(2),
4161+
name,
4162+
move |received_cmd: &[u8], _| {
4163+
respond_startup(name, received_cmd)?;
4164+
4165+
if received_cmd == packed_cmd {
4166+
cloned_req_counter.fetch_add(1, Ordering::Relaxed);
4167+
return Err(Err((ErrorKind::TryAgain, "seriously, try again").into()));
4168+
}
4169+
4170+
Err(Ok(Value::Okay))
4171+
},
4172+
);
4173+
4174+
runtime.block_on(async move {
4175+
let err = cmd
4176+
.query_async::<_, Value>(&mut connection)
4177+
.timeout(futures_time::time::Duration::from_millis(1))
4178+
.await
4179+
.unwrap_err();
4180+
assert_eq!(err.kind(), std::io::ErrorKind::TimedOut);
4181+
4182+
// we sleep here, to allow the cluster connection time to retry. We expect it won't, but without this
4183+
// sleep the test will complete before the the runtime gave the connection time to retry, which would've made the
4184+
// test pass regardless of whether the connection tries retrying or not.
4185+
sleep(Duration::from_millis(10).into()).await;
4186+
});
4187+
4188+
assert_eq!(request_counter.load(Ordering::Relaxed), 1);
4189+
}
4190+
41454191
#[cfg(feature = "tls-rustls")]
41464192
mod mtls_test {
41474193
use crate::support::mtls_test::create_cluster_client_from_cluster;

0 commit comments

Comments
 (0)