Skip to content

Commit 36ba1bb

Browse files
committed
perf(memory): bound maintenance queue and prune stale api cache
1 parent 859031f commit 36ba1bb

File tree

2 files changed

+178
-13
lines changed

2 files changed

+178
-13
lines changed

src/bot.rs

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@ use crate::utils::{
3232
pub struct BotState {
3333
pub config: Config,
3434
pub database: Database,
35-
pub music_api: MusicApi,
35+
pub music_api: Arc<MusicApi>,
3636
inflight_downloads: Arc<InflightDownloads>,
3737
pub download_semaphore: Arc<tokio::sync::Semaphore>,
3838
pub upload_semaphore: Arc<tokio::sync::Semaphore>,
3939
pub message_task_semaphore: Arc<tokio::sync::Semaphore>,
40-
pub maintenance_tx: tokio::sync::mpsc::UnboundedSender<MaintenanceSignal>,
40+
pub maintenance_tx: tokio::sync::mpsc::Sender<MaintenanceSignal>,
4141
pub bot_username: String,
4242
pub upload_client_state: Arc<Mutex<UploadClientState>>,
4343
pub maintenance_counters: MaintenanceCounters,
@@ -63,6 +63,8 @@ pub struct UploadCounters {
6363
const SPEED_SAMPLE_WINDOW: usize = 20;
6464
const STATUS_RESOURCE_REFRESH_INTERVAL: std::time::Duration = std::time::Duration::from_secs(2);
6565
const MIN_DOWNLOAD_CHUNK_BYTES: usize = 64 * 1024;
66+
const MAINTENANCE_QUEUE_CAPACITY: usize = 32;
67+
const CACHE_PRUNE_INTERVAL_REQUESTS: u32 = 50;
6668

6769
#[derive(Debug)]
6870
pub struct RuntimeMetrics {
@@ -129,12 +131,14 @@ static STATUS_RESOURCE_CACHE: LazyLock<std::sync::Mutex<(System, Instant, Resour
129131
pub struct MaintenanceCounters {
130132
pub memory_release_requests: AtomicU32,
131133
pub db_analyze_requests: AtomicU32,
134+
pub api_cache_prune_requests: AtomicU32,
132135
}
133136

134137
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
135138
pub enum MaintenanceSignal {
136139
AnalyzeDb,
137140
ReleaseMemory,
141+
PruneApiCache,
138142
}
139143

140144
#[derive(Debug, Default)]
@@ -248,6 +252,7 @@ impl MaintenanceCounters {
248252
Self {
249253
memory_release_requests: AtomicU32::new(0),
250254
db_analyze_requests: AtomicU32::new(0),
255+
api_cache_prune_requests: AtomicU32::new(0),
251256
}
252257
}
253258

@@ -516,16 +521,17 @@ pub async fn run(config: Config) -> Result<()> {
516521
let database = Database::new(&config.database).await?;
517522
tracing::info!("Database initialized");
518523

519-
let (maintenance_tx, maintenance_rx) = tokio::sync::mpsc::unbounded_channel();
524+
// Initialize music API
525+
let music_api = Arc::new(MusicApi::new_with_config(&config));
526+
tracing::info!("Music API initialized");
527+
528+
let (maintenance_tx, maintenance_rx) = tokio::sync::mpsc::channel(MAINTENANCE_QUEUE_CAPACITY);
520529
let maintenance_database = database.clone();
530+
let maintenance_music_api = Arc::clone(&music_api);
521531
tokio::spawn(async move {
522-
maintenance_worker(maintenance_rx, maintenance_database).await;
532+
maintenance_worker(maintenance_rx, maintenance_database, maintenance_music_api).await;
523533
});
524534

525-
// Initialize music API
526-
let music_api = MusicApi::new_with_config(&config);
527-
tracing::info!("Music API initialized");
528-
529535
// Initialize bot with custom API URL support
530536
let bot = if !config.bot_api.is_empty() && config.bot_api != "https://api.telegram.org" {
531537
// 使用自定义API URL
@@ -625,7 +631,7 @@ pub async fn run(config: Config) -> Result<()> {
625631
let bot_state = Arc::new(BotState {
626632
config,
627633
database,
628-
music_api,
634+
music_api: Arc::clone(&music_api),
629635
inflight_downloads: Arc::new(InflightDownloads::default()),
630636
download_semaphore: Arc::new(tokio::sync::Semaphore::new(
631637
max_concurrent_downloads as usize,
@@ -1525,8 +1531,14 @@ async fn download_and_send_music(
15251531
} else {
15261532
state.database.save_song_info(&song_info).await?;
15271533
for signal in collect_maintenance_signals(&state.maintenance_counters, &state.config) {
1528-
if state.maintenance_tx.send(signal).is_err() {
1529-
tracing::warn!("Maintenance worker unavailable; skipping signal");
1534+
match state.maintenance_tx.try_send(signal) {
1535+
Ok(()) => {}
1536+
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
1537+
tracing::debug!("Maintenance queue full; dropping signal {:?}", signal);
1538+
}
1539+
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
1540+
tracing::warn!("Maintenance worker unavailable; skipping signal");
1541+
}
15301542
}
15311543
}
15321544
}
@@ -1747,7 +1759,7 @@ fn collect_maintenance_signals(
17471759
counters: &MaintenanceCounters,
17481760
config: &Config,
17491761
) -> Vec<MaintenanceSignal> {
1750-
let mut signals = Vec::with_capacity(2);
1762+
let mut signals = Vec::with_capacity(3);
17511763
for (counter, interval, signal) in [
17521764
(
17531765
&counters.db_analyze_requests,
@@ -1759,6 +1771,11 @@ fn collect_maintenance_signals(
17591771
config.memory_release_interval_requests,
17601772
MaintenanceSignal::ReleaseMemory,
17611773
),
1774+
(
1775+
&counters.api_cache_prune_requests,
1776+
CACHE_PRUNE_INTERVAL_REQUESTS,
1777+
MaintenanceSignal::PruneApiCache,
1778+
),
17621779
] {
17631780
if MaintenanceCounters::should_run(counter, interval) {
17641781
signals.push(signal);
@@ -1793,8 +1810,9 @@ async fn acquire_download_leader(
17931810
}
17941811

17951812
async fn maintenance_worker(
1796-
mut rx: tokio::sync::mpsc::UnboundedReceiver<MaintenanceSignal>,
1813+
mut rx: tokio::sync::mpsc::Receiver<MaintenanceSignal>,
17971814
database: Database,
1815+
music_api: Arc<MusicApi>,
17981816
) {
17991817
while let Some(signal) = rx.recv().await {
18001818
match signal {
@@ -1813,6 +1831,17 @@ async fn maintenance_worker(
18131831
tracing::warn!("Memory release background task failed: {}", e);
18141832
}
18151833
}
1834+
MaintenanceSignal::PruneApiCache => {
1835+
let stats = music_api.prune_expired_cache_entries();
1836+
if stats.total_removed() > 0 {
1837+
tracing::debug!(
1838+
"Pruned API cache entries: detail={}, url={}, lyric={}",
1839+
stats.song_detail_removed,
1840+
stats.song_url_removed,
1841+
stats.song_lyric_removed
1842+
);
1843+
}
1844+
}
18161845
}
18171846
}
18181847
}
@@ -2709,6 +2738,21 @@ mod tests {
27092738
assert_eq!(third, vec![super::MaintenanceSignal::ReleaseMemory]);
27102739
}
27112740

2741+
#[test]
2742+
fn maintenance_scheduler_emits_cache_prune_signal_on_interval() {
2743+
let counters = super::MaintenanceCounters::new();
2744+
let mut config = crate::config::Config::default();
2745+
config.db_analyze_interval_requests = 0;
2746+
config.memory_release_interval_requests = 0;
2747+
2748+
for _ in 0..super::CACHE_PRUNE_INTERVAL_REQUESTS.saturating_sub(1) {
2749+
let _ = super::collect_maintenance_signals(&counters, &config);
2750+
}
2751+
2752+
let signals = super::collect_maintenance_signals(&counters, &config);
2753+
assert_eq!(signals, vec![super::MaintenanceSignal::PruneApiCache]);
2754+
}
2755+
27122756
#[test]
27132757
fn upload_pool_idle_timeout_disabled_when_zero() {
27142758
assert!(!super::should_set_upload_pool_idle_timeout(0));

src/music_api.rs

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,20 @@ fn song_url_has_download_url(song_url: &SongUrl) -> bool {
194194
!song_url.url.is_empty()
195195
}
196196

197+
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
198+
pub struct CachePruneStats {
199+
pub song_detail_removed: usize,
200+
pub song_url_removed: usize,
201+
pub song_lyric_removed: usize,
202+
}
203+
204+
impl CachePruneStats {
205+
#[must_use]
206+
pub fn total_removed(self) -> usize {
207+
self.song_detail_removed + self.song_url_removed + self.song_lyric_removed
208+
}
209+
}
210+
197211
impl MusicApi {
198212
#[must_use]
199213
pub fn new(music_u: Option<String>, base_url: String) -> Self {
@@ -354,6 +368,32 @@ impl MusicApi {
354368
.insert(song_id, TimedCacheEntry::new(lyric, SONG_LYRIC_CACHE_TTL));
355369
}
356370

371+
#[must_use]
372+
pub fn prune_expired_cache_entries(&self) -> CachePruneStats {
373+
let now = Instant::now();
374+
375+
let detail_before = self.song_detail_cache.len();
376+
self.song_detail_cache
377+
.retain(|_, entry| entry.is_fresh_at(now));
378+
let song_detail_removed = detail_before.saturating_sub(self.song_detail_cache.len());
379+
380+
let url_before = self.song_url_cache.len();
381+
self.song_url_cache
382+
.retain(|_, entry| entry.is_fresh_at(now));
383+
let song_url_removed = url_before.saturating_sub(self.song_url_cache.len());
384+
385+
let lyric_before = self.song_lyric_cache.len();
386+
self.song_lyric_cache
387+
.retain(|_, entry| entry.is_fresh_at(now));
388+
let song_lyric_removed = lyric_before.saturating_sub(self.song_lyric_cache.len());
389+
390+
CachePruneStats {
391+
song_detail_removed,
392+
song_url_removed,
393+
song_lyric_removed,
394+
}
395+
}
396+
357397
fn generate_eapi_cookie(music_u: Option<&str>) -> String {
358398
let device_id = Uuid::new_v4().simple().to_string();
359399
let appver = "9.3.40";
@@ -1378,6 +1418,87 @@ mod tests {
13781418
assert!(missing.is_none(), "uncached bitrate should return None");
13791419
}
13801420

1421+
#[test]
1422+
fn prune_expired_cache_entries_removes_stale_entries_only() {
1423+
let api = MusicApi::new(None, "http://localhost".to_string());
1424+
let now = std::time::Instant::now();
1425+
1426+
api.song_detail_cache.insert(
1427+
1,
1428+
super::TimedCacheEntry {
1429+
value: Arc::new(sample_song_detail(1)),
1430+
created_at: now - super::SONG_DETAIL_CACHE_TTL - Duration::from_secs(1),
1431+
ttl: super::SONG_DETAIL_CACHE_TTL,
1432+
},
1433+
);
1434+
api.song_detail_cache.insert(
1435+
2,
1436+
super::TimedCacheEntry {
1437+
value: Arc::new(sample_song_detail(2)),
1438+
created_at: now,
1439+
ttl: super::SONG_DETAIL_CACHE_TTL,
1440+
},
1441+
);
1442+
api.song_url_cache.insert(
1443+
super::song_url_cache_key(1, 320_000),
1444+
super::TimedCacheEntry {
1445+
value: Arc::new(sample_song_url(1, 320_000, "https://stale.example/1.mp3")),
1446+
created_at: now - super::SONG_URL_CACHE_TTL - Duration::from_secs(1),
1447+
ttl: super::SONG_URL_CACHE_TTL,
1448+
},
1449+
);
1450+
api.song_url_cache.insert(
1451+
super::song_url_cache_key(2, 320_000),
1452+
super::TimedCacheEntry {
1453+
value: Arc::new(sample_song_url(2, 320_000, "https://fresh.example/2.mp3")),
1454+
created_at: now,
1455+
ttl: super::SONG_URL_CACHE_TTL,
1456+
},
1457+
);
1458+
api.song_lyric_cache.insert(
1459+
1,
1460+
super::TimedCacheEntry {
1461+
value: "stale lyric".to_string(),
1462+
created_at: now - super::SONG_LYRIC_CACHE_TTL - Duration::from_secs(1),
1463+
ttl: super::SONG_LYRIC_CACHE_TTL,
1464+
},
1465+
);
1466+
api.song_lyric_cache.insert(
1467+
2,
1468+
super::TimedCacheEntry {
1469+
value: "fresh lyric".to_string(),
1470+
created_at: now,
1471+
ttl: super::SONG_LYRIC_CACHE_TTL,
1472+
},
1473+
);
1474+
1475+
let stats = api.prune_expired_cache_entries();
1476+
1477+
assert_eq!(
1478+
stats,
1479+
super::CachePruneStats {
1480+
song_detail_removed: 1,
1481+
song_url_removed: 1,
1482+
song_lyric_removed: 1,
1483+
}
1484+
);
1485+
assert_eq!(stats.total_removed(), 3);
1486+
assert!(api.song_detail_cache.get(&1).is_none());
1487+
assert!(
1488+
api.song_url_cache
1489+
.get(&super::song_url_cache_key(1, 320_000))
1490+
.is_none()
1491+
);
1492+
assert!(api.song_lyric_cache.get(&1).is_none());
1493+
assert!(api.song_detail_cache.get(&2).is_some());
1494+
assert!(
1495+
api.song_url_cache
1496+
.get(&super::song_url_cache_key(2, 320_000))
1497+
.is_some()
1498+
);
1499+
assert!(api.song_lyric_cache.get(&2).is_some());
1500+
}
1501+
13811502
#[tokio::test]
13821503
async fn get_song_detail_and_best_url_returns_cached_detail_and_cached_fallback_without_network()
13831504
{

0 commit comments

Comments
 (0)