Skip to content

Commit a805faf

Browse files
committed
Do the readonly trigger not as a global variable
1 parent 07171c8 commit a805faf

File tree

4 files changed

+222
-117
lines changed

4 files changed

+222
-117
lines changed

nativelink-redis-tester/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,4 @@ pub use fake_redis::{
2424
make_fake_redis_with_responses,
2525
};
2626
pub use pubsub::MockPubSub;
27-
pub use read_only_redis::run as read_only_run;
27+
pub use read_only_redis::ReadOnlyRedis;

nativelink-redis-tester/src/read_only_redis.rs

Lines changed: 114 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use core::fmt::Write;
1616
use core::sync::atomic::{AtomicBool, Ordering};
17+
use std::sync::Arc;
1718

1819
use either::Either;
1920
use nativelink_util::background_spawn;
@@ -27,110 +28,129 @@ use crate::fake_redis::{arg_as_string, fake_redis_internal};
2728

2829
const FAKE_SCRIPT_SHA: &str = "b22b9926cbce9dd9ba97fa7ba3626f89feea1ed5";
2930

30-
// The first time we hit SETRANGE, we output a ReadOnly
31-
static SETRANGE_TRIGGERED: AtomicBool = AtomicBool::new(false);
32-
33-
async fn dynamic_fake_redis(listener: TcpListener) {
34-
let readonly_err_str = "READONLY You can't write against a read only replica.";
35-
let readonly_err = format!("!{}\r\n{readonly_err_str}\r\n", readonly_err_str.len());
36-
37-
let inner = move |buf: &[u8]| -> String {
38-
let mut output = String::new();
39-
let mut buf_index = 0;
40-
loop {
41-
let frame = match decode(&buf[buf_index..]).unwrap() {
42-
Some((frame, amt)) => {
43-
buf_index += amt;
44-
frame
45-
}
46-
None => {
47-
panic!("No frame!");
48-
}
49-
};
50-
let (cmd, args) = {
51-
if let OwnedFrame::Array(a) = frame {
52-
if let OwnedFrame::BulkString(s) = a.first().unwrap() {
53-
let args: Vec<_> = a[1..].to_vec();
54-
(str::from_utf8(s).unwrap().to_string(), args)
31+
#[derive(Clone, Debug)]
32+
pub struct ReadOnlyRedis {
33+
// The first time we hit SETRANGE/HMSET, we output a ReadOnly. Next time, we assume we're reconnected and do correct values
34+
readonly_triggered: Arc<AtomicBool>,
35+
}
36+
37+
impl Default for ReadOnlyRedis {
38+
fn default() -> Self {
39+
Self::new()
40+
}
41+
}
42+
43+
impl ReadOnlyRedis {
44+
pub fn new() -> Self {
45+
Self {
46+
readonly_triggered: Arc::new(AtomicBool::new(false)),
47+
}
48+
}
49+
50+
async fn dynamic_fake_redis(self, listener: TcpListener) {
51+
let readonly_err_str = "READONLY You can't write against a read only replica.";
52+
let readonly_err = format!("!{}\r\n{readonly_err_str}\r\n", readonly_err_str.len());
53+
54+
let inner = move |buf: &[u8]| -> String {
55+
let mut output = String::new();
56+
let mut buf_index = 0;
57+
loop {
58+
let frame = match decode(&buf[buf_index..]).unwrap() {
59+
Some((frame, amt)) => {
60+
buf_index += amt;
61+
frame
62+
}
63+
None => {
64+
panic!("No frame!");
65+
}
66+
};
67+
let (cmd, args) = {
68+
if let OwnedFrame::Array(a) = frame {
69+
if let OwnedFrame::BulkString(s) = a.first().unwrap() {
70+
let args: Vec<_> = a[1..].to_vec();
71+
(str::from_utf8(s).unwrap().to_string(), args)
72+
} else {
73+
panic!("Array not starting with cmd: {a:?}");
74+
}
5575
} else {
56-
panic!("Array not starting with cmd: {a:?}");
76+
panic!("Non array cmd: {frame:?}");
5777
}
58-
} else {
59-
panic!("Non array cmd: {frame:?}");
60-
}
61-
};
62-
63-
let ret: Either<Value, String> = match cmd.as_str() {
64-
"HELLO" => Either::Left(Value::Map(vec![(
65-
Value::SimpleString("server".into()),
66-
Value::SimpleString("redis".into()),
67-
)])),
68-
"CLIENT" => {
69-
// We can safely ignore these, as it's just setting the library name/version
70-
Either::Left(Value::Int(0))
71-
}
72-
"SCRIPT" => {
73-
assert_eq!(args[0], OwnedFrame::BulkString(b"LOAD".to_vec()));
78+
};
7479

75-
let OwnedFrame::BulkString(ref _script) = args[1] else {
76-
panic!("Script should be a bulkstring: {args:?}");
77-
};
78-
Either::Left(Value::SimpleString(FAKE_SCRIPT_SHA.to_string()))
79-
}
80-
"ROLE" => Either::Left(Value::Array(vec![
81-
Value::BulkString(b"master".to_vec()),
82-
Value::Int(0),
83-
Value::Array(vec![]),
84-
])),
85-
"SETRANGE" => {
86-
let value = SETRANGE_TRIGGERED.load(Ordering::Relaxed);
87-
if value {
88-
Either::Left(Value::Int(5))
89-
} else {
90-
SETRANGE_TRIGGERED.store(true, Ordering::Relaxed);
91-
Either::Right(readonly_err.clone())
80+
let ret: Either<Value, String> = match cmd.as_str() {
81+
"HELLO" => Either::Left(Value::Map(vec![(
82+
Value::SimpleString("server".into()),
83+
Value::SimpleString("redis".into()),
84+
)])),
85+
"CLIENT" => {
86+
// We can safely ignore these, as it's just setting the library name/version
87+
Either::Left(Value::Int(0))
9288
}
93-
}
94-
"STRLEN" => Either::Left(Value::Int(5)),
95-
"RENAME" | "HMSET" => {
96-
let value = SETRANGE_TRIGGERED.load(Ordering::Relaxed);
97-
if value {
98-
Either::Left(Value::Okay)
99-
} else {
100-
Either::Right(readonly_err.clone())
89+
"SCRIPT" => {
90+
assert_eq!(args[0], OwnedFrame::BulkString(b"LOAD".to_vec()));
91+
92+
let OwnedFrame::BulkString(ref _script) = args[1] else {
93+
panic!("Script should be a bulkstring: {args:?}");
94+
};
95+
Either::Left(Value::SimpleString(FAKE_SCRIPT_SHA.to_string()))
10196
}
102-
}
103-
actual => {
104-
panic!("Mock command not implemented! {actual:?}");
105-
}
106-
};
97+
"ROLE" => Either::Left(Value::Array(vec![
98+
Value::BulkString(b"master".to_vec()),
99+
Value::Int(0),
100+
Value::Array(vec![]),
101+
])),
102+
"SETRANGE" => {
103+
let value = self.readonly_triggered.load(Ordering::Relaxed);
104+
if value {
105+
Either::Left(Value::Int(5))
106+
} else {
107+
self.readonly_triggered.store(true, Ordering::Relaxed);
108+
Either::Right(readonly_err.clone())
109+
}
110+
}
111+
"STRLEN" => Either::Left(Value::Int(5)),
112+
"RENAME" | "HMSET" => {
113+
let value = self.readonly_triggered.load(Ordering::Relaxed);
114+
if value {
115+
Either::Left(Value::Okay)
116+
} else {
117+
self.readonly_triggered.store(true, Ordering::Relaxed);
118+
Either::Right(readonly_err.clone())
119+
}
120+
}
121+
"EVALSHA" => Either::Left(Value::Array(vec![Value::Int(1), Value::Int(0)])),
122+
actual => {
123+
panic!("Mock command not implemented! {actual:?}");
124+
}
125+
};
107126

108-
match ret {
109-
Either::Left(v) => {
110-
arg_as_string(&mut output, v);
111-
}
112-
Either::Right(s) => {
113-
write!(&mut output, "{s}").unwrap();
127+
match ret {
128+
Either::Left(v) => {
129+
arg_as_string(&mut output, v);
130+
}
131+
Either::Right(s) => {
132+
write!(&mut output, "{s}").unwrap();
133+
}
114134
}
115-
}
116135

117-
if buf_index == buf.len() {
118-
break;
136+
if buf_index == buf.len() {
137+
break;
138+
}
119139
}
120-
}
121-
output
122-
};
123-
fake_redis_internal(listener, vec![inner]).await;
124-
}
140+
output
141+
};
142+
fake_redis_internal(listener, vec![inner]).await;
143+
}
125144

126-
pub async fn run() -> u16 {
127-
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
128-
let port = listener.local_addr().unwrap().port();
129-
info!("Using port {port}");
145+
pub async fn run(self) -> u16 {
146+
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
147+
let port = listener.local_addr().unwrap().port();
148+
info!("Using port {port}");
130149

131-
background_spawn!("listener", async move {
132-
dynamic_fake_redis(listener).await;
133-
});
150+
background_spawn!("listener", async move {
151+
self.dynamic_fake_redis(listener).await;
152+
});
134153

135-
port
154+
port
155+
}
136156
}

nativelink-store/src/redis_store.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1583,11 +1583,30 @@ where
15831583
for (name, value) in maybe_index {
15841584
fields.push((name.into(), value.to_vec()));
15851585
}
1586-
client
1586+
match client
15871587
.connection_manager
15881588
.hset_multiple::<_, _, _, ()>(redis_key.as_ref(), &fields)
15891589
.await
1590-
.err_tip(|| format!("In RedisStore::update_data::noversion for {redis_key}"))?;
1590+
{
1591+
Ok(v) => v,
1592+
Err(err)
1593+
if err.kind() == redis::ErrorKind::Server(redis::ServerErrorKind::ReadOnly) =>
1594+
{
1595+
client.reconnect(&self.connection_manager).await?;
1596+
client
1597+
.connection_manager
1598+
.hset_multiple::<_, _, _, ()>(redis_key.as_ref(), &fields)
1599+
.await
1600+
.err_tip(|| format!("(after reconnect) In RedisStore::update_data::noversion for {redis_key}"))?
1601+
}
1602+
Err(err) => {
1603+
let mut error: Error = err.into();
1604+
error.messages.push(format!(
1605+
"In RedisStore::update_data::noversion for {redis_key}"
1606+
));
1607+
return Err(error);
1608+
}
1609+
};
15911610
// If we have a publish channel configured, send a notice that the key has been set.
15921611
if let Some(pub_sub_channel) = &self.pub_sub_channel {
15931612
return Ok(client

0 commit comments

Comments
 (0)