@@ -21,6 +21,7 @@ use crate::auth::{AuthAction, BackendAuth, FrontendAuthenticator};
2121use crate :: backend:: client:: { ClientId , FrontConnectionGuard } ;
2222use crate :: backend:: pool:: { BackendNode , ConnectionPool , Connector , SessionCommand } ;
2323use crate :: config:: { ClusterConfig , ClusterRuntime , ConfigManager } ;
24+ use crate :: hotkey:: Hotkey ;
2425use crate :: info:: { InfoContext , ProxyMode } ;
2526use crate :: metrics;
2627use crate :: protocol:: redis:: {
@@ -54,6 +55,7 @@ pub struct ClusterProxy {
5455 runtime : Arc < ClusterRuntime > ,
5556 config_manager : Arc < ConfigManager > ,
5657 slowlog : Arc < Slowlog > ,
58+ hotkey : Arc < Hotkey > ,
5759 listen_port : u16 ,
5860 seed_nodes : usize ,
5961}
@@ -88,6 +90,9 @@ impl ClusterProxy {
8890 let slowlog = config_manager
8991 . slowlog_for ( & config. name )
9092 . ok_or_else ( || anyhow ! ( "missing slowlog state for cluster {}" , config. name) ) ?;
93+ let hotkey = config_manager
94+ . hotkey_for ( & config. name )
95+ . ok_or_else ( || anyhow ! ( "missing hotkey state for cluster {}" , config. name) ) ?;
9196 let proxy = Self {
9297 cluster : cluster. clone ( ) ,
9398 hash_tag,
@@ -100,6 +105,7 @@ impl ClusterProxy {
100105 runtime,
101106 config_manager,
102107 slowlog,
108+ hotkey,
103109 listen_port,
104110 seed_nodes : config. servers . len ( ) ,
105111 } ;
@@ -301,6 +307,18 @@ impl ClusterProxy {
301307 inflight += 1 ;
302308 continue ;
303309 }
310+ if let Some ( response) = self . try_handle_hotkey( & cmd) {
311+ let success = !response. is_error( ) ;
312+ metrics:: front_command(
313+ self . cluster. as_ref( ) ,
314+ cmd. kind_label( ) ,
315+ success,
316+ ) ;
317+ let fut = async move { response } ;
318+ pending. push_back( Box :: pin( fut) ) ;
319+ inflight += 1 ;
320+ continue ;
321+ }
304322 if let Some ( response) = self . try_handle_slowlog( & cmd) {
305323 let success = !response. is_error( ) ;
306324 metrics:: front_command(
@@ -367,6 +385,13 @@ impl ClusterProxy {
367385 ) )
368386 }
369387
388+ fn try_handle_hotkey ( & self , command : & RedisCommand ) -> Option < RespValue > {
389+ if !command. command_name ( ) . eq_ignore_ascii_case ( b"HOTKEY" ) {
390+ return None ;
391+ }
392+ Some ( crate :: hotkey:: handle_command ( & self . hotkey , command. args ( ) ) )
393+ }
394+
370395 fn try_handle_info ( & self , command : & RedisCommand ) -> Option < RespValue > {
371396 if !command. command_name ( ) . eq_ignore_ascii_case ( b"INFO" ) {
372397 return None ;
@@ -675,6 +700,7 @@ impl ClusterProxy {
675700 let fetch_trigger = self . fetch_trigger . clone ( ) ;
676701 let cluster = self . cluster . clone ( ) ;
677702 let slowlog = self . slowlog . clone ( ) ;
703+ let hotkey = self . hotkey . clone ( ) ;
678704 let kind_label = command. kind_label ( ) ;
679705 Box :: pin ( async move {
680706 match dispatch_with_context (
@@ -685,6 +711,7 @@ impl ClusterProxy {
685711 fetch_trigger,
686712 client_id,
687713 slowlog,
714+ hotkey,
688715 command,
689716 )
690717 . await
@@ -1263,6 +1290,7 @@ async fn dispatch_with_context(
12631290 fetch_trigger : mpsc:: UnboundedSender < ( ) > ,
12641291 client_id : ClientId ,
12651292 slowlog : Arc < Slowlog > ,
1293+ hotkey : Arc < Hotkey > ,
12661294 command : RedisCommand ,
12671295) -> Result < RespValue > {
12681296 let command_snapshot = command. clone ( ) ;
@@ -1292,6 +1320,7 @@ async fn dispatch_with_context(
12921320 . await
12931321 } ;
12941322 slowlog. maybe_record ( & command_snapshot, started. elapsed ( ) ) ;
1323+ hotkey. record_command ( & command_snapshot) ;
12951324 result
12961325}
12971326
0 commit comments