Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions nativelink-redis-tester/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ rust_library(
"src/fake_redis.rs",
"src/lib.rs",
"src/pubsub.rs",
"src/read_only_redis.rs",
],
visibility = ["//visibility:public"],
deps = [
"//nativelink-util",
"@crates//:either",
"@crates//:redis",
"@crates//:redis-protocol",
"@crates//:redis-test",
Expand Down
1 change: 1 addition & 0 deletions nativelink-redis-tester/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ version = "1.0.0-rc2"
[dependencies]
nativelink-util = { path = "../nativelink-util" }

either = { version = "1.15.0", default-features = false }
redis = { version = "1.0.0", default-features = false }
redis-protocol = { version = "6.0.0", default-features = false, features = [
"bytes",
Expand Down
6 changes: 5 additions & 1 deletion nativelink-redis-tester/src/dynamic_fake_redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ impl<S: SubscriptionManagerNotify + Send + 'static + Sync> FakeRedisBackend<S> {
};

let ret: Value = match cmd.as_str() {
"HELLO" => Value::Map(vec![(
Value::SimpleString("server".into()),
Value::SimpleString("redis".into()),
)]),
"CLIENT" => {
// We can safely ignore these, as it's just setting the library name/version
Value::Int(0)
Expand Down Expand Up @@ -350,7 +354,7 @@ impl<S: SubscriptionManagerNotify + Send + 'static + Sync> FakeRedisBackend<S> {
}
output
};
fake_redis_internal(listener, inner).await;
fake_redis_internal(listener, vec![inner]).await;
}

pub async fn run(self) -> u16 {
Expand Down
102 changes: 73 additions & 29 deletions nativelink-redis-tester/src/fake_redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,23 +78,54 @@ fn args_as_string(args: Vec<Value>) -> String {
output
}

fn add_to_response<B: BuildHasher>(
pub fn add_to_response<B: BuildHasher>(
response: &mut HashMap<String, String, B>,
cmd: &redis::Cmd,
args: Vec<Value>,
) {
response.insert(cmd_as_string(cmd), args_as_string(args));
add_to_response_raw(response, cmd, args_as_string(args));
}

pub fn add_to_response_raw<B: BuildHasher>(
response: &mut HashMap<String, String, B>,
cmd: &redis::Cmd,
args: String,
) {
response.insert(cmd_as_string(cmd), args);
}

fn setinfo(responses: &mut HashMap<String, String>) {
// Library sends both lib-name and lib-ver in one go, so we respond to both
add_to_response(
responses,
// We do raw inserts of command here, because the library sends 3/4 commands in one go
// They always start with HELLO, then optionally SELECT, so we use this to differentiate
let hello = cmd_as_string(redis::cmd("HELLO").arg("3"));
let setinfo = cmd_as_string(
redis::cmd("CLIENT")
.arg("SETINFO")
.arg("LIB-NAME")
.arg("redis-rs"),
vec![Value::Okay, Value::Okay],
);
responses.insert(
[hello.clone(), setinfo.clone()].join(""),
args_as_string(vec![
Value::Map(vec![(
Value::SimpleString("server".into()),
Value::SimpleString("redis".into()),
)]),
Value::Okay,
Value::Okay,
]),
);
responses.insert(
[hello, cmd_as_string(redis::cmd("SELECT").arg(3)), setinfo].join(""),
args_as_string(vec![
Value::Map(vec![(
Value::SimpleString("server".into()),
Value::SimpleString("redis".into()),
)]),
Value::Okay,
Value::Okay,
Value::Okay,
]),
);
}

Expand Down Expand Up @@ -159,10 +190,11 @@ pub fn fake_redis_sentinel_stream(master_name: &str, redis_port: u16) -> HashMap
response
}

pub(crate) async fn fake_redis_internal<H>(listener: TcpListener, handler: H)
pub(crate) async fn fake_redis_internal<H>(listener: TcpListener, handlers: Vec<H>)
where
H: Fn(&[u8]) -> String + Send + Clone + 'static,
H: Fn(&[u8]) -> String + Send + Clone + 'static + Sync,
{
let mut handler_iter = handlers.iter().cloned().cycle();
loop {
info!(
"Waiting for connection on {}",
Expand All @@ -173,7 +205,7 @@ where
panic!("error");
};
info!("Accepted new connection");
let local_handler = handler.clone();
let local_handler = handler_iter.next().unwrap();
background_spawn!("thread", async move {
loop {
let mut buf = vec![0; 8192];
Expand All @@ -189,32 +221,38 @@ where
}
}

async fn fake_redis<B>(listener: TcpListener, responses: HashMap<String, String, B>)
async fn fake_redis<B>(listener: TcpListener, all_responses: Vec<HashMap<String, String, B>>)
where
B: BuildHasher + Clone + Send + 'static,
B: BuildHasher + Clone + Send + 'static + Sync,
{
info!("Responses are: {:?}", responses);
let values = responses.clone();
let inner = move |buf: &[u8]| -> String {
let str_buf = str::from_utf8(buf);
if let Ok(s) = str_buf {
for (key, value) in &values {
if s.starts_with(key) {
info!("Responding to {}", s.replace("\r\n", "\\r\\n"));
return value.clone();
let funcs = all_responses
.iter()
.map(|responses| {
info!("Responses are: {:?}", responses);
let values = responses.clone();
move |buf: &[u8]| -> String {
let str_buf = String::from_utf8_lossy(buf).into_owned();
for (key, value) in &values {
if str_buf.starts_with(key) {
info!("Responding to {}", str_buf.replace("\r\n", "\\r\\n"));
return value.clone();
}
}
warn!(
"Unknown command: {}",
str_buf.chars().take(1000).collect::<String>()
);
String::new()
}
warn!("Unknown command: {s}");
} else {
warn!("Bytes buffer: {:?}", &buf);
}
String::new()
};
fake_redis_internal(listener, inner).await;
})
.collect();
fake_redis_internal(listener, funcs).await;
}

pub async fn make_fake_redis_with_responses<B: BuildHasher + Clone + Send + 'static>(
responses: HashMap<String, String, B>,
pub async fn make_fake_redis_with_multiple_responses<
B: BuildHasher + Clone + Send + 'static + Sync,
>(
responses: Vec<HashMap<String, String, B>>,
) -> u16 {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
Expand All @@ -226,3 +264,9 @@ pub async fn make_fake_redis_with_responses<B: BuildHasher + Clone + Send + 'sta

port
}

pub async fn make_fake_redis_with_responses<B: BuildHasher + Clone + Send + 'static + Sync>(
responses: HashMap<String, String, B>,
) -> u16 {
make_fake_redis_with_multiple_responses(vec![responses]).await
}
7 changes: 5 additions & 2 deletions nativelink-redis-tester/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
mod dynamic_fake_redis;
mod fake_redis;
mod pubsub;
mod read_only_redis;

pub use dynamic_fake_redis::{FakeRedisBackend, SubscriptionManagerNotify};
pub use fake_redis::{
add_lua_script, fake_redis_sentinel_master_stream, fake_redis_sentinel_stream,
fake_redis_stream, make_fake_redis_with_responses,
add_lua_script, add_to_response, add_to_response_raw, fake_redis_sentinel_master_stream,
fake_redis_sentinel_stream, fake_redis_stream, make_fake_redis_with_multiple_responses,
make_fake_redis_with_responses,
};
pub use pubsub::MockPubSub;
pub use read_only_redis::ReadOnlyRedis;
156 changes: 156 additions & 0 deletions nativelink-redis-tester/src/read_only_redis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright 2026 The NativeLink Authors. All rights reserved.
//
// Licensed under the Functional Source License, Version 1.1, Apache 2.0 Future License (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// See LICENSE file for details
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use core::fmt::Write;
use core::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

use either::Either;
use nativelink_util::background_spawn;
use redis::Value;
use redis_protocol::resp2::decode::decode;
use redis_protocol::resp2::types::OwnedFrame;
use tokio::net::TcpListener;
use tracing::info;

use crate::fake_redis::{arg_as_string, fake_redis_internal};

const FAKE_SCRIPT_SHA: &str = "b22b9926cbce9dd9ba97fa7ba3626f89feea1ed5";

#[derive(Clone, Debug)]
pub struct ReadOnlyRedis {
// The first time we hit SETRANGE/HMSET, we output a ReadOnly. Next time, we assume we're reconnected and do correct values
readonly_triggered: Arc<AtomicBool>,
}

impl Default for ReadOnlyRedis {
fn default() -> Self {
Self::new()
}
}

impl ReadOnlyRedis {
pub fn new() -> Self {
Self {
readonly_triggered: Arc::new(AtomicBool::new(false)),
}
}

async fn dynamic_fake_redis(self, listener: TcpListener) {
let readonly_err_str = "READONLY You can't write against a read only replica.";
let readonly_err = format!("!{}\r\n{readonly_err_str}\r\n", readonly_err_str.len());

let inner = move |buf: &[u8]| -> String {
let mut output = String::new();
let mut buf_index = 0;
loop {
let frame = match decode(&buf[buf_index..]).unwrap() {
Some((frame, amt)) => {
buf_index += amt;
frame
}
None => {
panic!("No frame!");
}
};
let (cmd, args) = {
if let OwnedFrame::Array(a) = frame {
if let OwnedFrame::BulkString(s) = a.first().unwrap() {
let args: Vec<_> = a[1..].to_vec();
(str::from_utf8(s).unwrap().to_string(), args)
} else {
panic!("Array not starting with cmd: {a:?}");
}
} else {
panic!("Non array cmd: {frame:?}");
}
};

let ret: Either<Value, String> = match cmd.as_str() {
"HELLO" => Either::Left(Value::Map(vec![(
Value::SimpleString("server".into()),
Value::SimpleString("redis".into()),
)])),
"CLIENT" => {
// We can safely ignore these, as it's just setting the library name/version
Either::Left(Value::Int(0))
}
"SCRIPT" => {
assert_eq!(args[0], OwnedFrame::BulkString(b"LOAD".to_vec()));

let OwnedFrame::BulkString(ref _script) = args[1] else {
panic!("Script should be a bulkstring: {args:?}");
};
Either::Left(Value::SimpleString(FAKE_SCRIPT_SHA.to_string()))
}
"ROLE" => Either::Left(Value::Array(vec![
Value::BulkString(b"master".to_vec()),
Value::Int(0),
Value::Array(vec![]),
])),
"SETRANGE" => {
let value = self.readonly_triggered.load(Ordering::Relaxed);
if value {
Either::Left(Value::Int(5))
} else {
self.readonly_triggered.store(true, Ordering::Relaxed);
Either::Right(readonly_err.clone())
}
}
"STRLEN" => Either::Left(Value::Int(5)),
"RENAME" | "HMSET" => {
let value = self.readonly_triggered.load(Ordering::Relaxed);
if value {
Either::Left(Value::Okay)
} else {
self.readonly_triggered.store(true, Ordering::Relaxed);
Either::Right(readonly_err.clone())
}
}
"EVALSHA" => Either::Left(Value::Array(vec![Value::Int(1), Value::Int(0)])),
actual => {
panic!("Mock command not implemented! {actual:?}");
}
};

match ret {
Either::Left(v) => {
arg_as_string(&mut output, v);
}
Either::Right(s) => {
write!(&mut output, "{s}").unwrap();
}
}

if buf_index == buf.len() {
break;
}
}
output
};
fake_redis_internal(listener, vec![inner]).await;
}

pub async fn run(self) -> u16 {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let port = listener.local_addr().unwrap().port();
info!("Using port {port}");

background_spawn!("listener", async move {
self.dynamic_fake_redis(listener).await;
});

port
}
}
Loading
Loading