Skip to content

Commit 0fc8773

Browse files
authored
feature: slowlog support (#44)
* feature: slowlog support * slowlog: refactor as ringbuffer
1 parent c78b1ee commit 0fc8773

File tree

7 files changed

+497
-9
lines changed

7 files changed

+497
-9
lines changed

default.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222
password = "example-frontend-secret"
2323
# 后端 ACL:此处演示用户名 + 密码;如果后端是简单密码,可直接使用 backend_password
2424
backend_auth = { username = "proxy", password = "example-backend-secret" }
25+
# 慢查询日志阈值(微秒);默认 10000,设置为 -1 以关闭记录
26+
slowlog_log_slower_than = 10000
27+
# 慢查询日志最大保留条数;默认 128
28+
slowlog_max_len = 128
2529

2630
[[clusters]]
2731
name = "test-cluster"
@@ -50,3 +54,7 @@
5054
] }
5155
# 旧版写法依然兼容:backend_password = "raw-secret"
5256
backend_password = "cluster-backend-secret"
57+
# 慢查询日志阈值(微秒);默认 10000,设置为 -1 以关闭记录
58+
slowlog_log_slower_than = 10000
59+
# 慢查询日志最大保留条数;默认 128
60+
slowlog_max_len = 128

docs/usage.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,13 @@ cargo build --release
2626
- `hash_tag`:一致性 hash 标签,例如 `{}`
2727
- `read_timeout` / `write_timeout`:后端超时(毫秒)。
2828
- `read_from_slave`:Cluster 模式下允许从 replica 读取。
29+
- `slowlog_log_slower_than`:慢查询阈值(微秒,默认 `10000`,设为 `-1` 关闭记录)。
30+
- `slowlog_max_len`:慢查询日志最大保留条数(默认 `128`)。
2931
- `auth` / `password`:前端 ACL,详见下文。
3032
- `backend_auth` / `backend_password`:后端 ACL 认证,详见下文。
3133

34+
> 提示:代理原生支持 `SLOWLOG GET/LEN/RESET`,并按集群维度汇总慢查询;配置上述阈值和长度即可控制记录行为。
35+
3236
示例参见仓库根目录的 `default.toml`
3337

3438
### ACL 配置

src/cluster/mod.rs

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::protocol::redis::{
2727
BlockingKind, MultiDispatch, RedisCommand, RespCodec, RespValue, SlotMap, SubCommand,
2828
SubResponse, SubscriptionKind, SLOT_COUNT,
2929
};
30+
use crate::slowlog::Slowlog;
3031
use crate::utils::{crc16, trim_hash_tag};
3132

3233
const FETCH_INTERVAL: Duration = Duration::from_secs(10);
@@ -52,6 +53,7 @@ pub struct ClusterProxy {
5253
fetch_trigger: mpsc::UnboundedSender<()>,
5354
runtime: Arc<ClusterRuntime>,
5455
config_manager: Arc<ConfigManager>,
56+
slowlog: Arc<Slowlog>,
5557
listen_port: u16,
5658
seed_nodes: usize,
5759
}
@@ -83,6 +85,9 @@ impl ClusterProxy {
8385
.map(Arc::new);
8486

8587
let listen_port = config.listen_port()?;
88+
let slowlog = config_manager
89+
.slowlog_for(&config.name)
90+
.ok_or_else(|| anyhow!("missing slowlog state for cluster {}", config.name))?;
8691
let proxy = Self {
8792
cluster: cluster.clone(),
8893
hash_tag,
@@ -94,6 +99,7 @@ impl ClusterProxy {
9499
fetch_trigger: trigger_tx.clone(),
95100
runtime,
96101
config_manager,
102+
slowlog,
97103
listen_port,
98104
seed_nodes: config.servers.len(),
99105
};
@@ -295,6 +301,18 @@ impl ClusterProxy {
295301
inflight += 1;
296302
continue;
297303
}
304+
if let Some(response) = self.try_handle_slowlog(&cmd) {
305+
let success = !response.is_error();
306+
metrics::front_command(
307+
self.cluster.as_ref(),
308+
cmd.kind_label(),
309+
success,
310+
);
311+
let fut = async move { response };
312+
pending.push_back(Box::pin(fut));
313+
inflight += 1;
314+
continue;
315+
}
298316
let guard = self.prepare_dispatch(client_id, cmd);
299317
pending.push_back(Box::pin(guard));
300318
inflight += 1;
@@ -339,6 +357,16 @@ impl ClusterProxy {
339357
self.config_manager.handle_command(command).await
340358
}
341359

360+
fn try_handle_slowlog(&self, command: &RedisCommand) -> Option<RespValue> {
361+
if !command.command_name().eq_ignore_ascii_case(b"SLOWLOG") {
362+
return None;
363+
}
364+
Some(crate::slowlog::handle_command(
365+
&self.slowlog,
366+
command.args(),
367+
))
368+
}
369+
342370
fn try_handle_info(&self, command: &RedisCommand) -> Option<RespValue> {
343371
if !command.command_name().eq_ignore_ascii_case(b"INFO") {
344372
return None;
@@ -646,6 +674,7 @@ impl ClusterProxy {
646674
let pool = self.pool.clone();
647675
let fetch_trigger = self.fetch_trigger.clone();
648676
let cluster = self.cluster.clone();
677+
let slowlog = self.slowlog.clone();
649678
let kind_label = command.kind_label();
650679
Box::pin(async move {
651680
match dispatch_with_context(
@@ -655,6 +684,7 @@ impl ClusterProxy {
655684
pool,
656685
fetch_trigger,
657686
client_id,
687+
slowlog,
658688
command,
659689
)
660690
.await
@@ -1232,9 +1262,13 @@ async fn dispatch_with_context(
12321262
pool: Arc<ConnectionPool<RedisCommand>>,
12331263
fetch_trigger: mpsc::UnboundedSender<()>,
12341264
client_id: ClientId,
1265+
slowlog: Arc<Slowlog>,
12351266
command: RedisCommand,
12361267
) -> Result<RespValue> {
1237-
if let Some(multi) = command.expand_for_multi(hash_tag.as_deref()) {
1268+
let command_snapshot = command.clone();
1269+
let multi = command.expand_for_multi(hash_tag.as_deref());
1270+
let started = Instant::now();
1271+
let result = if let Some(multi) = multi {
12381272
dispatch_multi(
12391273
hash_tag,
12401274
read_from_slave,
@@ -1256,7 +1290,9 @@ async fn dispatch_with_context(
12561290
command,
12571291
)
12581292
.await
1259-
}
1293+
};
1294+
slowlog.maybe_record(&command_snapshot, started.elapsed());
1295+
result
12601296
}
12611297

12621298
async fn dispatch_multi(

src/config/mod.rs

Lines changed: 92 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,21 @@ use tracing::{info, warn};
1212

1313
use crate::auth::{AuthUserConfig, BackendAuthConfig, FrontendAuthConfig};
1414
use crate::protocol::redis::{RedisCommand, RespValue};
15+
use crate::slowlog::Slowlog;
1516

1617
/// Environment variable controlling the default worker thread count when a
1718
/// cluster omits the `thread` field.
1819
pub const ENV_DEFAULT_THREADS: &str = "ASTER_DEFAULT_THREAD";
1920
const DUMP_VALUE_DEFAULT: &str = "default";
2021

22+
fn default_slowlog_log_slower_than() -> i64 {
23+
10_000
24+
}
25+
26+
fn default_slowlog_max_len() -> usize {
27+
128
28+
}
29+
2130
#[derive(Debug, Clone, Deserialize, Serialize)]
2231
pub struct Config {
2332
#[serde(default)]
@@ -130,6 +139,10 @@ pub struct ClusterConfig {
130139
pub backend_auth: Option<BackendAuthConfig>,
131140
#[serde(default)]
132141
pub backend_password: Option<String>,
142+
#[serde(default = "default_slowlog_log_slower_than")]
143+
pub slowlog_log_slower_than: i64,
144+
#[serde(default = "default_slowlog_max_len")]
145+
pub slowlog_max_len: usize,
133146
}
134147

135148
impl ClusterConfig {
@@ -161,6 +174,12 @@ impl ClusterConfig {
161174
self.name
162175
);
163176
}
177+
if self.slowlog_log_slower_than < -1 {
178+
bail!(
179+
"cluster {} slowlog-log-slower-than must be >= -1",
180+
self.name
181+
);
182+
}
164183
Ok(())
165184
}
166185

@@ -290,6 +309,7 @@ fn atomic_to_option(value: i64) -> Option<u64> {
290309
struct ClusterEntry {
291310
index: usize,
292311
runtime: Arc<ClusterRuntime>,
312+
slowlog: Arc<Slowlog>,
293313
}
294314

295315
#[derive(Debug)]
@@ -304,14 +324,20 @@ impl ConfigManager {
304324
let mut clusters = HashMap::new();
305325
for (index, cluster) in config.clusters().iter().enumerate() {
306326
let key = cluster.name.to_ascii_lowercase();
327+
let runtime = Arc::new(ClusterRuntime::new(
328+
cluster.read_timeout,
329+
cluster.write_timeout,
330+
));
331+
let slowlog = Arc::new(Slowlog::new(
332+
cluster.slowlog_log_slower_than,
333+
cluster.slowlog_max_len,
334+
));
307335
clusters.insert(
308336
key,
309337
ClusterEntry {
310338
index,
311-
runtime: Arc::new(ClusterRuntime::new(
312-
cluster.read_timeout,
313-
cluster.write_timeout,
314-
)),
339+
runtime,
340+
slowlog,
315341
},
316342
);
317343
}
@@ -329,6 +355,12 @@ impl ConfigManager {
329355
.map(|entry| entry.runtime.clone())
330356
}
331357

358+
pub fn slowlog_for(&self, name: &str) -> Option<Arc<Slowlog>> {
359+
self.clusters
360+
.get(&name.to_ascii_lowercase())
361+
.map(|entry| entry.slowlog.clone())
362+
}
363+
332364
pub async fn handle_command(&self, command: &RedisCommand) -> Option<RespValue> {
333365
if !command.command_name().eq_ignore_ascii_case(b"CONFIG") {
334366
return None;
@@ -429,6 +461,28 @@ impl ConfigManager {
429461
"cluster write_timeout updated via CONFIG SET"
430462
);
431463
}
464+
ClusterField::SlowlogLogSlowerThan => {
465+
let parsed = parse_slowlog_threshold(value)?;
466+
entry.slowlog.set_threshold(parsed);
467+
let mut guard = self.config.write();
468+
guard.clusters_mut()[entry.index].slowlog_log_slower_than = parsed;
469+
info!(
470+
cluster = cluster_name,
471+
value = value,
472+
"cluster slowlog_log_slower_than updated via CONFIG SET"
473+
);
474+
}
475+
ClusterField::SlowlogMaxLen => {
476+
let parsed = parse_slowlog_len(value)?;
477+
entry.slowlog.set_max_len(parsed);
478+
let mut guard = self.config.write();
479+
guard.clusters_mut()[entry.index].slowlog_max_len = parsed;
480+
info!(
481+
cluster = cluster_name,
482+
value = value,
483+
"cluster slowlog_max_len updated via CONFIG SET"
484+
);
485+
}
432486
}
433487
Ok(())
434488
}
@@ -457,6 +511,14 @@ impl ConfigManager {
457511
format!("cluster.{}.write-timeout", name),
458512
option_to_string(runtime.write_timeout()),
459513
));
514+
entries.push((
515+
format!("cluster.{}.slowlog-log-slower-than", name),
516+
entry.slowlog.threshold().to_string(),
517+
));
518+
entries.push((
519+
format!("cluster.{}.slowlog-max-len", name),
520+
entry.slowlog.max_len().to_string(),
521+
));
460522
}
461523
}
462524
entries.sort_by(|a, b| a.0.cmp(&b.0));
@@ -491,6 +553,8 @@ fn parse_key(key: &str) -> Result<(String, ClusterField)> {
491553
let field = match field.to_ascii_lowercase().as_str() {
492554
"read-timeout" => ClusterField::ReadTimeout,
493555
"write-timeout" => ClusterField::WriteTimeout,
556+
"slowlog-log-slower-than" => ClusterField::SlowlogLogSlowerThan,
557+
"slowlog-max-len" => ClusterField::SlowlogMaxLen,
494558
unknown => bail!("unknown cluster field '{}'", unknown),
495559
};
496560
Ok((cluster.to_string(), field))
@@ -507,6 +571,28 @@ fn parse_timeout_value(value: &str) -> Result<Option<u64>> {
507571
Ok(Some(parsed))
508572
}
509573

574+
fn parse_slowlog_threshold(value: &str) -> Result<i64> {
575+
let parsed: i64 = value
576+
.trim()
577+
.parse()
578+
.with_context(|| format!("invalid slowlog-log-slower-than value '{}'", value))?;
579+
if parsed < -1 {
580+
bail!("slowlog-log-slower-than must be >= -1");
581+
}
582+
Ok(parsed)
583+
}
584+
585+
fn parse_slowlog_len(value: &str) -> Result<usize> {
586+
let parsed: i64 = value
587+
.trim()
588+
.parse()
589+
.with_context(|| format!("invalid slowlog-max-len value '{}'", value))?;
590+
if parsed < 0 {
591+
bail!("slowlog-max-len must be >= 0");
592+
}
593+
usize::try_from(parsed).map_err(|_| anyhow!("slowlog-max-len is too large"))
594+
}
595+
510596
fn option_to_string(value: Option<u64>) -> String {
511597
value
512598
.map(|v| v.to_string())
@@ -530,6 +616,8 @@ fn err_response<T: ToString>(message: T) -> RespValue {
530616
enum ClusterField {
531617
ReadTimeout,
532618
WriteTimeout,
619+
SlowlogLogSlowerThan,
620+
SlowlogMaxLen,
533621
}
534622

535623
fn wildcard_match(pattern: &str, target: &str) -> bool {

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pub mod info;
2323
pub mod meta;
2424
pub mod metrics;
2525
pub mod protocol;
26+
pub mod slowlog;
2627
pub mod standalone;
2728
pub mod utils;
2829

0 commit comments

Comments
 (0)