Skip to content

Commit 16693ca

Browse files
authored
feat: add info command (#42)
1 parent fe4d786 commit 16693ca

File tree

5 files changed

+383
-0
lines changed

5 files changed

+383
-0
lines changed

src/cluster/mod.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use crate::auth::{AuthAction, BackendAuth, FrontendAuthenticator};
2121
use crate::backend::client::{ClientId, FrontConnectionGuard};
2222
use crate::backend::pool::{BackendNode, ConnectionPool, Connector, SessionCommand};
2323
use crate::config::ClusterConfig;
24+
use crate::info::{InfoContext, ProxyMode};
2425
use crate::metrics;
2526
use crate::protocol::redis::{
2627
BlockingKind, MultiDispatch, RedisCommand, RespCodec, RespValue, SlotMap, SubCommand,
@@ -50,6 +51,8 @@ pub struct ClusterProxy {
5051
pool: Arc<ConnectionPool<RedisCommand>>,
5152
fetch_trigger: mpsc::UnboundedSender<()>,
5253
backend_timeout: Duration,
54+
listen_port: u16,
55+
seed_nodes: usize,
5356
}
5457

5558
impl ClusterProxy {
@@ -77,6 +80,7 @@ impl ClusterProxy {
7780
.transpose()?
7881
.map(Arc::new);
7982

83+
let listen_port = config.listen_port()?;
8084
let proxy = Self {
8185
cluster: cluster.clone(),
8286
hash_tag,
@@ -87,6 +91,8 @@ impl ClusterProxy {
8791
pool: pool.clone(),
8892
fetch_trigger: trigger_tx.clone(),
8993
backend_timeout: Duration::from_millis(timeout_ms),
94+
listen_port,
95+
seed_nodes: config.servers.len(),
9096
};
9197

9298
// trigger an immediate topology fetch
@@ -262,6 +268,17 @@ impl ClusterProxy {
262268
}
263269
}
264270
}
271+
if let Some(response) = self.try_handle_info(&cmd) {
272+
metrics::front_command(
273+
self.cluster.as_ref(),
274+
cmd.kind_label(),
275+
true,
276+
);
277+
let fut = async move { response };
278+
pending.push_back(Box::pin(fut));
279+
inflight += 1;
280+
continue;
281+
}
265282
let guard = self.prepare_dispatch(client_id, cmd);
266283
pending.push_back(Box::pin(guard));
267284
inflight += 1;
@@ -302,6 +319,24 @@ impl ClusterProxy {
302319
Ok(())
303320
}
304321

322+
fn try_handle_info(&self, command: &RedisCommand) -> Option<RespValue> {
323+
if !command.command_name().eq_ignore_ascii_case(b"INFO") {
324+
return None;
325+
}
326+
let section = command
327+
.args()
328+
.get(1)
329+
.map(|arg| String::from_utf8_lossy(arg).to_string());
330+
let context = InfoContext {
331+
cluster: self.cluster.as_ref(),
332+
mode: ProxyMode::Cluster,
333+
listen_port: self.listen_port,
334+
backend_nodes: self.seed_nodes,
335+
};
336+
let payload = crate::info::render_info(context, section.as_deref());
337+
Some(RespValue::BulkString(payload))
338+
}
339+
305340
async fn run_subscription(
306341
&self,
307342
parts: FramedParts<TcpStream, RespCodec>,

src/info.rs

Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
use std::time::{SystemTime, UNIX_EPOCH};
2+
3+
use bytes::Bytes;
4+
use once_cell::sync::Lazy;
5+
6+
use crate::metrics::{self, FrontCommandStats};
7+
8+
static START_TIME: Lazy<SystemTime> = Lazy::new(SystemTime::now);
9+
10+
#[derive(Debug, Clone, Copy)]
11+
pub enum ProxyMode {
12+
Standalone,
13+
Cluster,
14+
}
15+
16+
impl ProxyMode {
17+
pub fn as_str(self) -> &'static str {
18+
match self {
19+
ProxyMode::Standalone => "standalone",
20+
ProxyMode::Cluster => "cluster",
21+
}
22+
}
23+
}
24+
25+
pub struct InfoContext<'a> {
26+
pub cluster: &'a str,
27+
pub mode: ProxyMode,
28+
pub listen_port: u16,
29+
pub backend_nodes: usize,
30+
}
31+
32+
pub fn render_info(context: InfoContext<'_>, section: Option<&str>) -> Bytes {
33+
let uptime = SystemTime::now()
34+
.duration_since(*START_TIME)
35+
.unwrap_or_default();
36+
let uptime_seconds = uptime.as_secs();
37+
let uptime_days = uptime_seconds / 86_400;
38+
let startup_time_unix = START_TIME
39+
.duration_since(UNIX_EPOCH)
40+
.unwrap_or_default()
41+
.as_secs();
42+
43+
let process_id = std::process::id();
44+
let version = env!("CARGO_PKG_VERSION");
45+
let arch_bits = (std::mem::size_of::<usize>() * 8) as u64;
46+
let build_target = format!("{}-{}", std::env::consts::OS, std::env::consts::ARCH);
47+
48+
let connected_clients = metrics::front_connections_current(context.cluster);
49+
let total_connections = metrics::front_connections_total(context.cluster);
50+
let command_stats = metrics::front_command_stats(context.cluster);
51+
let memory_bytes = metrics::memory_usage_bytes();
52+
let cpu_percent = metrics::cpu_usage_percent();
53+
let global_errors = metrics::global_error_count();
54+
55+
let sections = collect_sections(
56+
&context,
57+
uptime_seconds,
58+
uptime_days,
59+
startup_time_unix,
60+
process_id,
61+
version,
62+
arch_bits,
63+
&build_target,
64+
connected_clients,
65+
total_connections,
66+
command_stats,
67+
memory_bytes,
68+
cpu_percent,
69+
global_errors,
70+
);
71+
72+
let filter = section
73+
.map(|s| s.trim().to_ascii_lowercase())
74+
.filter(|s| !s.is_empty());
75+
76+
let mut output = String::new();
77+
for (name, entries) in sections {
78+
if !should_include(&filter, name) {
79+
continue;
80+
}
81+
if !output.is_empty() {
82+
output.push_str("\r\n");
83+
}
84+
output.push_str("# ");
85+
output.push_str(name);
86+
output.push_str("\r\n");
87+
for (key, value) in entries {
88+
output.push_str(&key);
89+
output.push(':');
90+
output.push_str(&value);
91+
output.push_str("\r\n");
92+
}
93+
}
94+
95+
if output.is_empty() {
96+
output.push_str("\r\n");
97+
}
98+
99+
Bytes::from(output)
100+
}
101+
102+
fn collect_sections(
103+
context: &InfoContext<'_>,
104+
uptime_seconds: u64,
105+
uptime_days: u64,
106+
startup_time_unix: u64,
107+
process_id: u32,
108+
version: &str,
109+
arch_bits: u64,
110+
build_target: &str,
111+
connected_clients: u64,
112+
total_connections: u64,
113+
command_stats: FrontCommandStats,
114+
memory_bytes: u64,
115+
cpu_percent: f64,
116+
global_errors: u64,
117+
) -> Vec<(&'static str, Vec<(String, String)>)> {
118+
let mut sections = Vec::new();
119+
120+
sections.push((
121+
"Server",
122+
vec![
123+
("aster_version".to_string(), version.to_string()),
124+
("aster_mode".to_string(), context.mode.as_str().to_string()),
125+
("cluster_name".to_string(), context.cluster.to_string()),
126+
("process_id".to_string(), process_id.to_string()),
127+
("tcp_port".to_string(), context.listen_port.to_string()),
128+
("arch_bits".to_string(), arch_bits.to_string()),
129+
("os".to_string(), std::env::consts::OS.to_string()),
130+
("build_target".to_string(), build_target.to_string()),
131+
(
132+
"startup_time_unix".to_string(),
133+
startup_time_unix.to_string(),
134+
),
135+
("uptime_in_seconds".to_string(), uptime_seconds.to_string()),
136+
("uptime_in_days".to_string(), uptime_days.to_string()),
137+
],
138+
));
139+
140+
sections.push((
141+
"Clients",
142+
vec![
143+
(
144+
"connected_clients".to_string(),
145+
connected_clients.to_string(),
146+
),
147+
(
148+
"total_connections_received".to_string(),
149+
total_connections.to_string(),
150+
),
151+
],
152+
));
153+
154+
sections.push((
155+
"Stats",
156+
vec![
157+
(
158+
"total_commands_processed".to_string(),
159+
command_stats.total().to_string(),
160+
),
161+
(
162+
"total_commands_succeeded".to_string(),
163+
command_stats.total_ok().to_string(),
164+
),
165+
(
166+
"total_commands_failed".to_string(),
167+
command_stats.total_fail().to_string(),
168+
),
169+
("global_error_count".to_string(), global_errors.to_string()),
170+
],
171+
));
172+
173+
sections.push((
174+
"Memory",
175+
vec![
176+
("used_memory".to_string(), memory_bytes.to_string()),
177+
("used_memory_human".to_string(), format_bytes(memory_bytes)),
178+
],
179+
));
180+
181+
sections.push((
182+
"CPU",
183+
vec![(
184+
"used_cpu_percent".to_string(),
185+
format!("{:.2}", cpu_percent),
186+
)],
187+
));
188+
189+
sections.push((
190+
"Proxy",
191+
vec![
192+
("proxy_mode".to_string(), context.mode.as_str().to_string()),
193+
(
194+
"backend_nodes".to_string(),
195+
context.backend_nodes.to_string(),
196+
),
197+
("cluster".to_string(), context.cluster.to_string()),
198+
],
199+
));
200+
201+
sections
202+
}
203+
204+
fn should_include(filter: &Option<String>, section: &str) -> bool {
205+
match filter.as_deref() {
206+
None => true,
207+
Some("all") | Some("default") | Some("everything") => true,
208+
Some(candidate) => candidate == section.to_ascii_lowercase(),
209+
}
210+
}
211+
212+
fn format_bytes(bytes: u64) -> String {
213+
const UNITS: [&str; 5] = ["B", "KB", "MB", "GB", "TB"];
214+
if bytes == 0 {
215+
return "0B".to_string();
216+
}
217+
218+
let mut value = bytes as f64;
219+
let mut unit = "B";
220+
for candidate in UNITS.iter() {
221+
unit = candidate;
222+
if value < 1024.0 {
223+
break;
224+
}
225+
if *candidate != "TB" {
226+
value /= 1024.0;
227+
}
228+
}
229+
if unit == "B" {
230+
format!("{bytes}{unit}")
231+
} else {
232+
format!("{value:.2}{unit}")
233+
}
234+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ pub mod auth;
1919
pub mod backend;
2020
pub mod cluster;
2121
pub mod config;
22+
pub mod info;
2223
pub mod meta;
2324
pub mod metrics;
2425
pub mod protocol;

0 commit comments

Comments
 (0)