Skip to content

Commit ea88fbc

Browse files
committed
Fix sentinel reconnect with update_oneshot
1 parent 0e99702 commit ea88fbc

File tree

9 files changed

+213
-49
lines changed

9 files changed

+213
-49
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

nativelink-redis-tester/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@ rust_library(
1414
"src/fake_redis.rs",
1515
"src/lib.rs",
1616
"src/pubsub.rs",
17+
"src/read_only_redis.rs",
1718
],
1819
visibility = ["//visibility:public"],
1920
deps = [
2021
"//nativelink-util",
22+
"@crates//:either",
2123
"@crates//:redis",
2224
"@crates//:redis-protocol",
2325
"@crates//:redis-test",

nativelink-redis-tester/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ version = "1.0.0-rc2"
99
[dependencies]
1010
nativelink-util = { path = "../nativelink-util" }
1111

12+
either = { version = "1.15.0", default-features = false }
1213
redis = { version = "1.0.0", default-features = false }
1314
redis-protocol = { version = "6.0.0", default-features = false, features = [
1415
"bytes",

nativelink-redis-tester/src/dynamic_fake_redis.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ impl<S: SubscriptionManagerNotify + Send + 'static + Sync> FakeRedisBackend<S> {
354354
}
355355
output
356356
};
357-
fake_redis_internal(listener, inner).await;
357+
fake_redis_internal(listener, vec![inner]).await;
358358
}
359359

360360
pub async fn run(self) -> u16 {

nativelink-redis-tester/src/fake_redis.rs

Lines changed: 36 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -190,10 +190,11 @@ pub fn fake_redis_sentinel_stream(master_name: &str, redis_port: u16) -> HashMap
190190
response
191191
}
192192

193-
pub(crate) async fn fake_redis_internal<H>(listener: TcpListener, handler: H)
193+
pub(crate) async fn fake_redis_internal<H>(listener: TcpListener, handlers: Vec<H>)
194194
where
195-
H: Fn(&[u8]) -> String + Send + Clone + 'static,
195+
H: Fn(&[u8]) -> String + Send + Clone + 'static + Sync,
196196
{
197+
let mut handler_iter = handlers.iter().cloned().cycle();
197198
loop {
198199
info!(
199200
"Waiting for connection on {}",
@@ -204,7 +205,7 @@ where
204205
panic!("error");
205206
};
206207
info!("Accepted new connection");
207-
let local_handler = handler.clone();
208+
let local_handler = handler_iter.next().unwrap();
208209
background_spawn!("thread", async move {
209210
loop {
210211
let mut buf = vec![0; 8192];
@@ -220,32 +221,38 @@ where
220221
}
221222
}
222223

223-
async fn fake_redis<B>(listener: TcpListener, responses: HashMap<String, String, B>)
224+
async fn fake_redis<B>(listener: TcpListener, all_responses: Vec<HashMap<String, String, B>>)
224225
where
225-
B: BuildHasher + Clone + Send + 'static,
226+
B: BuildHasher + Clone + Send + 'static + Sync,
226227
{
227-
info!("Responses are: {:?}", responses);
228-
let values = responses.clone();
229-
let inner = move |buf: &[u8]| -> String {
230-
let str_buf = str::from_utf8(buf);
231-
if let Ok(s) = str_buf {
232-
for (key, value) in &values {
233-
if s.starts_with(key) {
234-
info!("Responding to {}", s.replace("\r\n", "\\r\\n"));
235-
return value.clone();
228+
let funcs = all_responses
229+
.iter()
230+
.map(|responses| {
231+
info!("Responses are: {:?}", responses);
232+
let values = responses.clone();
233+
move |buf: &[u8]| -> String {
234+
let str_buf = String::from_utf8_lossy(buf).into_owned();
235+
for (key, value) in &values {
236+
if str_buf.starts_with(key) {
237+
info!("Responding to {}", str_buf.replace("\r\n", "\\r\\n"));
238+
return value.clone();
239+
}
236240
}
241+
warn!(
242+
"Unknown command: {}",
243+
str_buf.chars().take(1000).collect::<String>()
244+
);
245+
String::new()
237246
}
238-
warn!("Unknown command: {s}");
239-
} else {
240-
warn!("Bytes buffer: {:?}", &buf);
241-
}
242-
String::new()
243-
};
244-
fake_redis_internal(listener, inner).await;
247+
})
248+
.collect();
249+
fake_redis_internal(listener, funcs).await;
245250
}
246251

247-
pub async fn make_fake_redis_with_responses<B: BuildHasher + Clone + Send + 'static>(
248-
responses: HashMap<String, String, B>,
252+
pub async fn make_fake_redis_with_multiple_responses<
253+
B: BuildHasher + Clone + Send + 'static + Sync,
254+
>(
255+
responses: Vec<HashMap<String, String, B>>,
249256
) -> u16 {
250257
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
251258
let port = listener.local_addr().unwrap().port();
@@ -257,3 +264,9 @@ pub async fn make_fake_redis_with_responses<B: BuildHasher + Clone + Send + 'sta
257264

258265
port
259266
}
267+
268+
pub async fn make_fake_redis_with_responses<B: BuildHasher + Clone + Send + 'static + Sync>(
269+
responses: HashMap<String, String, B>,
270+
) -> u16 {
271+
make_fake_redis_with_multiple_responses(vec![responses]).await
272+
}

nativelink-redis-tester/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,13 @@
1515
mod dynamic_fake_redis;
1616
mod fake_redis;
1717
mod pubsub;
18+
mod read_only_redis;
1819

1920
pub use dynamic_fake_redis::{FakeRedisBackend, SubscriptionManagerNotify};
2021
pub use fake_redis::{
2122
add_lua_script, add_to_response, add_to_response_raw, fake_redis_sentinel_master_stream,
22-
fake_redis_sentinel_stream, fake_redis_stream, make_fake_redis_with_responses,
23+
fake_redis_sentinel_stream, fake_redis_stream, make_fake_redis_with_multiple_responses,
24+
make_fake_redis_with_responses,
2325
};
2426
pub use pubsub::MockPubSub;
27+
pub use read_only_redis::run as read_only_run;
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
// Copyright 2026 The NativeLink Authors. All rights reserved.
2+
//
3+
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// See LICENSE file for details
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use core::fmt::Write;
16+
use core::sync::atomic::{AtomicBool, Ordering};
17+
18+
use either::Either;
19+
use nativelink_util::background_spawn;
20+
use redis::Value;
21+
use redis_protocol::resp2::decode::decode;
22+
use redis_protocol::resp2::types::OwnedFrame;
23+
use tokio::net::TcpListener;
24+
use tracing::info;
25+
26+
use crate::fake_redis::{arg_as_string, fake_redis_internal};
27+
28+
const FAKE_SCRIPT_SHA: &str = "b22b9926cbce9dd9ba97fa7ba3626f89feea1ed5";
29+
30+
// The first time we hit SETRANGE, we output a ReadOnly
31+
static SETRANGE_TRIGGERED: AtomicBool = AtomicBool::new(false);
32+
33+
async fn dynamic_fake_redis(listener: TcpListener) {
34+
let readonly_err_str = "READONLY You can't write against a read only replica.";
35+
let readonly_err = format!("!{}\r\n{readonly_err_str}\r\n", readonly_err_str.len());
36+
37+
let inner = move |buf: &[u8]| -> String {
38+
let mut output = String::new();
39+
let mut buf_index = 0;
40+
loop {
41+
let frame = match decode(&buf[buf_index..]).unwrap() {
42+
Some((frame, amt)) => {
43+
buf_index += amt;
44+
frame
45+
}
46+
None => {
47+
panic!("No frame!");
48+
}
49+
};
50+
let (cmd, args) = {
51+
if let OwnedFrame::Array(a) = frame {
52+
if let OwnedFrame::BulkString(s) = a.first().unwrap() {
53+
let args: Vec<_> = a[1..].to_vec();
54+
(str::from_utf8(s).unwrap().to_string(), args)
55+
} else {
56+
panic!("Array not starting with cmd: {a:?}");
57+
}
58+
} else {
59+
panic!("Non array cmd: {frame:?}");
60+
}
61+
};
62+
63+
let ret: Either<Value, String> = match cmd.as_str() {
64+
"HELLO" => Either::Left(Value::Map(vec![(
65+
Value::SimpleString("server".into()),
66+
Value::SimpleString("redis".into()),
67+
)])),
68+
"CLIENT" => {
69+
// We can safely ignore these, as it's just setting the library name/version
70+
Either::Left(Value::Int(0))
71+
}
72+
"SCRIPT" => {
73+
assert_eq!(args[0], OwnedFrame::BulkString(b"LOAD".to_vec()));
74+
75+
let OwnedFrame::BulkString(ref _script) = args[1] else {
76+
panic!("Script should be a bulkstring: {args:?}");
77+
};
78+
Either::Left(Value::SimpleString(FAKE_SCRIPT_SHA.to_string()))
79+
}
80+
"ROLE" => Either::Left(Value::Array(vec![
81+
Value::BulkString(b"master".to_vec()),
82+
Value::Int(0),
83+
Value::Array(vec![]),
84+
])),
85+
"SETRANGE" => {
86+
let value = SETRANGE_TRIGGERED.load(Ordering::Relaxed);
87+
if value {
88+
Either::Left(Value::Int(5))
89+
} else {
90+
SETRANGE_TRIGGERED.store(true, Ordering::Relaxed);
91+
Either::Right(readonly_err.clone())
92+
}
93+
}
94+
"STRLEN" => Either::Left(Value::Int(5)),
95+
"RENAME" => {
96+
let value = SETRANGE_TRIGGERED.load(Ordering::Relaxed);
97+
if value {
98+
Either::Left(Value::Okay)
99+
} else {
100+
Either::Right(readonly_err.clone())
101+
}
102+
}
103+
actual => {
104+
panic!("Mock command not implemented! {actual:?}");
105+
}
106+
};
107+
108+
match ret {
109+
Either::Left(v) => {
110+
arg_as_string(&mut output, v);
111+
}
112+
Either::Right(s) => {
113+
write!(&mut output, "{s}").unwrap();
114+
}
115+
}
116+
117+
if buf_index == buf.len() {
118+
break;
119+
}
120+
}
121+
output
122+
};
123+
fake_redis_internal(listener, vec![inner]).await;
124+
}
125+
126+
pub async fn run() -> u16 {
127+
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
128+
let port = listener.local_addr().unwrap().port();
129+
info!("Using port {port}");
130+
131+
background_spawn!("listener", async move {
132+
dynamic_fake_redis(listener).await;
133+
});
134+
135+
port
136+
}

nativelink-store/src/redis_store.rs

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use std::time::Instant;
2727
use async_trait::async_trait;
2828
use bytes::Bytes;
2929
use const_format::formatcp;
30-
use futures::stream::{self, FuturesUnordered};
30+
use futures::stream::FuturesUnordered;
3131
use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt, future};
3232
use itertools::izip;
3333
use nativelink_config::stores::{RedisMode, RedisSpec};
@@ -321,6 +321,7 @@ where
321321

322322
/// The maximum number of chunk uploads per update.
323323
/// This is used to limit the number of chunk uploads per update to prevent
324+
/// overloading when uploading large blocks of data
324325
#[metric(help = "The maximum number of chunk uploads per update")]
325326
max_chunk_uploads_per_update: usize,
326327

@@ -892,18 +893,35 @@ where
892893
Ok::<_, Error>((offset, *bytes_read, chunk))
893894
}),
894895
))
895-
}).zip(
896-
stream::repeat(client.connection_manager.clone()))
897-
.map(|(res, mut connection_manager)| {
896+
})
897+
.map(|res| {
898898
let (offset, end_pos, chunk) = res?;
899899
let temp_key_ref = &temp_key;
900900
Ok(async move {
901-
connection_manager
901+
let (mut connection_manager, connect_id) = self.connection_manager.get_connection().await?;
902+
match connection_manager
902903
.setrange::<_, _, usize>(temp_key_ref, offset, chunk.to_vec())
903-
.await
904-
.err_tip(
905-
|| format!("While appending to temp key ({temp_key_ref}) in RedisStore::update. offset = {offset}. end_pos = {end_pos}"),
906-
)?;
904+
.await {
905+
Ok(_) => {},
906+
Err(err)
907+
if err.kind() == redis::ErrorKind::Server(redis::ServerErrorKind::ReadOnly) =>
908+
{
909+
let (mut connection_manager, _connect_id) = self.connection_manager.reconnect(connect_id).await?;
910+
connection_manager
911+
.setrange::<_, _, usize>(temp_key_ref, offset, chunk.to_vec())
912+
.await
913+
.err_tip(
914+
|| format!("(after reconnect) while appending to temp key ({temp_key_ref}) in RedisStore::update. offset = {offset}. end_pos = {end_pos}"),
915+
)?;
916+
}
917+
Err(err) => {
918+
let mut error: Error = err.into();
919+
error
920+
.messages
921+
.push(format!("While appending to temp key ({temp_key_ref}) in RedisStore::update. offset = {offset}. end_pos = {end_pos}"));
922+
return Err(error);
923+
}
924+
}
907925
Ok::<u32, Error>(end_pos)
908926
})
909927
})
@@ -1502,7 +1520,7 @@ where
15021520
.invoke_async(&mut client.connection_manager)
15031521
.await
15041522
{
1505-
Ok((success, new_version)) => (success, new_version),
1523+
Ok(v) => v,
15061524
Err(err)
15071525
if err.kind() == redis::ErrorKind::Server(redis::ServerErrorKind::ReadOnly) =>
15081526
{
@@ -1513,7 +1531,6 @@ where
15131531
}
15141532
Err(err) => {
15151533
let mut error: Error = err.into();
1516-
error.code = Code::Internal;
15171534
error
15181535
.messages
15191536
.push(format!("In RedisStore::update_data::versioned for {key:?}"));

nativelink-store/tests/redis_store_test.rs

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ use nativelink_config::stores::{RedisMode, RedisSpec};
2323
use nativelink_error::{Code, Error, ResultExt, make_err};
2424
use nativelink_macro::nativelink_test;
2525
use nativelink_redis_tester::{
26-
add_lua_script, add_to_response_raw, fake_redis_sentinel_master_stream,
27-
fake_redis_sentinel_stream, fake_redis_stream, make_fake_redis_with_responses,
26+
add_lua_script, fake_redis_sentinel_master_stream, fake_redis_sentinel_stream,
27+
fake_redis_stream, make_fake_redis_with_responses, read_only_run,
2828
};
2929
use nativelink_store::cas_utils::ZERO_BYTE_DIGESTS;
3030
use nativelink_store::redis_store::{
@@ -751,17 +751,8 @@ async fn test_sentinel_connect_with_bad_master() {
751751
#[nativelink_test]
752752
async fn test_sentinel_connect_and_update_readonly() {
753753
let redis_span = info_span!("redis");
754-
let mut responses = add_lua_version_script(fake_redis_sentinel_master_stream());
755-
let temp_key = make_temp_key("abcd");
756-
let readonly_err = "READONLY You can't write against a read only replica.";
757-
add_to_response_raw(
758-
&mut responses,
759-
redis::cmd("SETRANGE").arg(&temp_key).arg(0).arg("hello"),
760-
format!("!{}\r\n{readonly_err}\r\n", readonly_err.len()),
761-
);
762-
let redis_port = make_fake_redis_with_responses(responses)
763-
.instrument(redis_span)
764-
.await;
754+
755+
let redis_port = read_only_run().instrument(redis_span).await;
765756
let sentinel_span = info_span!("sentinel");
766757
let sentinel_port =
767758
make_fake_redis_with_responses(fake_redis_sentinel_stream("master", redis_port))

0 commit comments

Comments
 (0)