@@ -37,7 +37,7 @@ pub struct BotState {
3737 pub download_semaphore : Arc < tokio:: sync:: Semaphore > ,
3838 pub upload_semaphore : Arc < tokio:: sync:: Semaphore > ,
3939 pub message_task_semaphore : Arc < tokio:: sync:: Semaphore > ,
40- pub maintenance_tx : tokio:: sync:: mpsc:: UnboundedSender < MaintenanceSignal > ,
40+ pub maintenance_tx : tokio:: sync:: mpsc:: Sender < MaintenanceSignal > ,
4141 pub bot_username : String ,
4242 pub upload_client_state : Arc < std:: sync:: Mutex < UploadClientState > > ,
4343 pub maintenance_counters : MaintenanceCounters ,
@@ -61,6 +61,7 @@ pub struct UploadCounters {
6161}
6262
6363const SPEED_SAMPLE_WINDOW : usize = 20 ;
64+ const MAINTENANCE_QUEUE_CAPACITY : usize = 64 ;
6465
6566const MIN_DOWNLOAD_CHUNK_BYTES : usize = 64 * 1024 ;
6667
@@ -468,7 +469,7 @@ pub async fn run(config: Config) -> Result<()> {
468469 let database = Database :: new ( & config. database ) . await ?;
469470 tracing:: info!( "Database initialized" ) ;
470471
471- let ( maintenance_tx, maintenance_rx) = tokio:: sync:: mpsc:: unbounded_channel ( ) ;
472+ let ( maintenance_tx, maintenance_rx) = tokio:: sync:: mpsc:: channel ( MAINTENANCE_QUEUE_CAPACITY ) ;
472473 let maintenance_database = database. clone ( ) ;
473474 tokio:: spawn ( async move {
474475 maintenance_worker ( maintenance_rx, maintenance_database) . await ;
@@ -489,11 +490,11 @@ pub async fn run(config: Config) -> Result<()> {
489490 tracing:: info!( "Using custom Telegram API URL: {}" , api_url) ;
490491
491492 // Create a custom HTTP client tuned for Cloudflare compatibility (mimic Go http client)
492- // pool_max_idle_per_host(64) keeps reasonable connection pool for API efficiency
493+ let pool_max_idle_per_host = telegram_pool_max_idle_per_host ( & config ) ;
493494 let client_builder = reqwest:: Client :: builder ( )
494495 . use_rustls_tls ( )
495496 . user_agent ( "Go-http-client/2.0" )
496- . pool_max_idle_per_host ( 64 )
497+ . pool_max_idle_per_host ( pool_max_idle_per_host )
497498 . pool_idle_timeout ( std:: time:: Duration :: from_secs ( 60 ) )
498499 . timeout ( std:: time:: Duration :: from_secs ( 30 ) )
499500 . no_gzip ( ) ;
@@ -550,9 +551,10 @@ pub async fn run(config: Config) -> Result<()> {
550551 } else {
551552 // 使用默认API URL,但配置连接池以提高效率
552553 tracing:: info!( "Using default Telegram API URL: https://api.telegram.org" ) ;
554+ let pool_max_idle_per_host = telegram_pool_max_idle_per_host ( & config) ;
553555 let client_builder = reqwest:: Client :: builder ( )
554556 . use_rustls_tls ( )
555- . pool_max_idle_per_host ( 64 )
557+ . pool_max_idle_per_host ( pool_max_idle_per_host )
556558 . pool_idle_timeout ( std:: time:: Duration :: from_secs ( 60 ) )
557559 . timeout ( std:: time:: Duration :: from_secs ( 30 ) ) ;
558560 let client = build_reqwest_client ( client_builder) ?;
@@ -1482,9 +1484,7 @@ async fn download_and_send_music(
14821484 } else {
14831485 state. database . save_song_info ( & song_info) . await ?;
14841486 for signal in collect_maintenance_signals ( & state. maintenance_counters , & state. config ) {
1485- if state. maintenance_tx . send ( signal) . is_err ( ) {
1486- tracing:: warn!( "Maintenance worker unavailable; skipping signal" ) ;
1487- }
1487+ let _ = enqueue_maintenance_signal ( & state. maintenance_tx , signal) ;
14881488 }
14891489 }
14901490
@@ -1725,6 +1725,23 @@ fn collect_maintenance_signals(
17251725 signals
17261726}
17271727
1728+ fn enqueue_maintenance_signal (
1729+ tx : & tokio:: sync:: mpsc:: Sender < MaintenanceSignal > ,
1730+ signal : MaintenanceSignal ,
1731+ ) -> bool {
1732+ match tx. try_send ( signal) {
1733+ Ok ( ( ) ) => true ,
1734+ Err ( tokio:: sync:: mpsc:: error:: TrySendError :: Full ( _) ) => {
1735+ tracing:: debug!( "Maintenance queue is full; dropping signal {:?}" , signal) ;
1736+ false
1737+ }
1738+ Err ( tokio:: sync:: mpsc:: error:: TrySendError :: Closed ( _) ) => {
1739+ tracing:: warn!( "Maintenance worker unavailable; skipping signal" ) ;
1740+ false
1741+ }
1742+ }
1743+ }
1744+
17281745async fn join_futures < F1 , F2 , T1 , T2 , E > (
17291746 f1 : F1 ,
17301747 f2 : F2 ,
@@ -1750,7 +1767,7 @@ async fn acquire_download_leader(
17501767}
17511768
17521769async fn maintenance_worker (
1753- mut rx : tokio:: sync:: mpsc:: UnboundedReceiver < MaintenanceSignal > ,
1770+ mut rx : tokio:: sync:: mpsc:: Receiver < MaintenanceSignal > ,
17541771 database : Database ,
17551772) {
17561773 while let Some ( signal) = rx. recv ( ) . await {
@@ -1801,6 +1818,10 @@ fn should_set_upload_pool_idle_timeout(secs: u64) -> bool {
18011818 secs > 0
18021819}
18031820
1821+ fn telegram_pool_max_idle_per_host ( config : & Config ) -> usize {
1822+ config. download_pool_max_idle_per_host . max ( 1 )
1823+ }
1824+
18041825fn download_chunk_bytes ( config : & Config ) -> usize {
18051826 config
18061827 . download_chunk_size_kb
@@ -2686,6 +2707,32 @@ mod tests {
26862707 assert ! ( super :: should_set_upload_pool_idle_timeout( 60 ) ) ;
26872708 }
26882709
2710+ #[ test]
2711+ fn telegram_pool_uses_configured_download_limit ( ) {
2712+ let mut config = Config :: default ( ) ;
2713+ config. download_pool_max_idle_per_host = 7 ;
2714+ assert_eq ! ( super :: telegram_pool_max_idle_per_host( & config) , 7 ) ;
2715+ }
2716+
2717+ #[ tokio:: test]
2718+ async fn enqueue_maintenance_signal_drops_when_queue_full ( ) {
2719+ let ( tx, mut rx) = tokio:: sync:: mpsc:: channel ( 1 ) ;
2720+ assert ! ( super :: enqueue_maintenance_signal(
2721+ & tx,
2722+ super :: MaintenanceSignal :: AnalyzeDb
2723+ ) ) ;
2724+ assert ! ( !super :: enqueue_maintenance_signal(
2725+ & tx,
2726+ super :: MaintenanceSignal :: ReleaseMemory
2727+ ) ) ;
2728+
2729+ assert_eq ! ( rx. recv( ) . await , Some ( super :: MaintenanceSignal :: AnalyzeDb ) ) ;
2730+ assert ! ( matches!(
2731+ rx. try_recv( ) ,
2732+ Err ( tokio:: sync:: mpsc:: error:: TryRecvError :: Empty )
2733+ ) ) ;
2734+ }
2735+
26892736 #[ test]
26902737 fn download_chunk_bytes_uses_configured_kib ( ) {
26912738 let mut config = Config :: default ( ) ;
0 commit comments