Skip to content

Commit 5111e00

Browse files
committed
fix(perf): stabilize memory tuning without throughput regressions
Keep disk-mode download buffering configurable while removing unnecessary stream adaptation so memory can drop without reducing upload/download throughput. Also export jemalloc config with the correct prefixed symbol so decay settings reliably take effect.
1 parent 064a9b3 commit 5111e00

File tree

2 files changed

+340
-5
lines changed

2 files changed

+340
-5
lines changed

src/bot.rs

Lines changed: 327 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ pub struct UploadCounters {
6262

6363
const SPEED_SAMPLE_WINDOW: usize = 20;
6464

65+
const MIN_DOWNLOAD_CHUNK_BYTES: usize = 64 * 1024;
66+
6567
#[derive(Debug)]
6668
pub struct RuntimeMetrics {
6769
started_at: Instant,
@@ -1154,18 +1156,20 @@ async fn download_and_send_music(
11541156

11551157
let mut stream = response.bytes_stream();
11561158
let downloaded = if audio_buffer.is_disk() {
1159+
let chunk_bytes = download_chunk_bytes(&state.config);
11571160
let file = audio_buffer
11581161
.disk_file_mut()
11591162
.ok_or_else(|| anyhow::anyhow!("Disk buffer missing file handle"))?;
1163+
let mut writer = tokio::io::BufWriter::with_capacity(chunk_bytes, file);
11601164
let mut downloaded = 0u64;
11611165
while let Some(chunk) = stream.next().await {
11621166
let chunk = chunk.context("Failed to read chunk from download stream")?;
1163-
tokio::io::AsyncWriteExt::write_all(file, &chunk)
1167+
tokio::io::AsyncWriteExt::write_all(&mut writer, &chunk)
11641168
.await
11651169
.context("Failed to write download chunk to disk")?;
11661170
downloaded += chunk.len() as u64;
11671171
}
1168-
tokio::io::AsyncWriteExt::flush(file)
1172+
tokio::io::AsyncWriteExt::flush(&mut writer)
11691173
.await
11701174
.context("Failed to flush disk writer")?;
11711175
downloaded
@@ -1797,6 +1801,13 @@ fn should_set_upload_pool_idle_timeout(secs: u64) -> bool {
17971801
secs > 0
17981802
}
17991803

1804+
fn download_chunk_bytes(config: &Config) -> usize {
1805+
config
1806+
.download_chunk_size_kb
1807+
.saturating_mul(1024)
1808+
.max(MIN_DOWNLOAD_CHUNK_BYTES)
1809+
}
1810+
18001811
fn append_search_result_line(results: &mut String, index: usize, song_name: &str, artists: &str) {
18011812
use std::fmt::Write;
18021813

@@ -1847,7 +1858,7 @@ struct UploadBotBundle {
18471858

18481859
/// Streaming chunk size for raw uploads (256 KiB).
18491860
/// Matches the benchmark script's chunk size that achieves ~14 MB/s.
1850-
const RAW_UPLOAD_CHUNK_SIZE: usize = 128 * 1024;
1861+
const RAW_UPLOAD_CHUNK_SIZE: usize = 256 * 1024;
18511862

18521863
/// Parameters for raw Telegram file upload.
18531864
struct RawUploadParams<'a> {
@@ -2441,6 +2452,171 @@ mod tests {
24412452
assert!(!should_download_cover(none));
24422453
}
24432454

2455+
#[test]
2456+
fn perf_timer_formats_label_and_duration() {
2457+
let label = "fetch_url";
2458+
let formatted = format_perf(label, std::time::Duration::from_millis(12));
2459+
assert!(formatted.contains("fetch_url"));
2460+
assert!(formatted.contains("12"));
2461+
}
2462+
2463+
#[test]
2464+
fn build_music_url_accepts_valid_base() {
2465+
let url = build_music_url("https://music.163.com", 123).expect("valid url");
2466+
assert_eq!(url.as_str(), "https://music.163.com/song?id=123");
2467+
}
2468+
2469+
#[test]
2470+
fn build_music_url_rejects_invalid_base() {
2471+
assert!(build_music_url("ht!tp:// bad", 1).is_err());
2472+
}
2473+
2474+
#[test]
2475+
fn parse_api_url_accepts_valid_base() {
2476+
let url = parse_api_url("https://api.telegram.org/").expect("valid url");
2477+
assert_eq!(url.as_str(), "https://api.telegram.org/");
2478+
}
2479+
2480+
#[test]
2481+
fn parse_api_url_rejects_invalid_base() {
2482+
assert!(parse_api_url("not a url").is_err());
2483+
}
2484+
2485+
#[tokio::test]
2486+
async fn local_file_uri_disabled_by_default() {
2487+
let mut config = crate::config::Config::default();
2488+
config.bot_api = "http://localhost:8081".to_string();
2489+
2490+
let path = create_temp_file();
2491+
let uri = super::maybe_local_file_uri(&config, false, &path).await;
2492+
fs::remove_file(&path).expect("remove temp file");
2493+
2494+
assert!(uri.is_none());
2495+
}
2496+
2497+
#[tokio::test]
2498+
async fn local_file_uri_skips_official_api() {
2499+
let mut config = crate::config::Config::default();
2500+
config.upload_local_file_uri = true;
2501+
2502+
let path = create_temp_file();
2503+
let uri = super::maybe_local_file_uri(&config, true, &path).await;
2504+
fs::remove_file(&path).expect("remove temp file");
2505+
2506+
assert!(uri.is_none());
2507+
}
2508+
2509+
#[tokio::test]
2510+
async fn local_file_uri_builds_from_existing_path() {
2511+
let mut config = crate::config::Config::default();
2512+
config.upload_local_file_uri = true;
2513+
2514+
let path = create_temp_file();
2515+
let uri = super::maybe_local_file_uri(&config, false, &path).await;
2516+
fs::remove_file(&path).expect("remove temp file");
2517+
2518+
let Some(uri) = uri else {
2519+
panic!("expected local file uri");
2520+
};
2521+
assert!(uri.starts_with("file://"));
2522+
}
2523+
2524+
#[tokio::test]
2525+
async fn local_file_uri_returns_none_for_missing_path() {
2526+
let mut config = crate::config::Config::default();
2527+
config.upload_local_file_uri = true;
2528+
2529+
let path = std::env::temp_dir().join(format!("missing_{}", Uuid::new_v4()));
2530+
if path.exists() {
2531+
fs::remove_file(&path).expect("remove temp file");
2532+
}
2533+
2534+
let uri = super::maybe_local_file_uri(&config, false, &path).await;
2535+
assert!(uri.is_none());
2536+
}
2537+
2538+
#[tokio::test]
2539+
async fn upload_target_defaults_to_multipart() {
2540+
let config = Config::default();
2541+
let path = std::path::Path::new("/tmp/test.mp3");
2542+
assert_eq!(
2543+
super::select_local_upload_target(&config, false, path).await,
2544+
super::UploadFileTarget::Multipart
2545+
);
2546+
}
2547+
2548+
#[tokio::test]
2549+
async fn upload_target_uses_local_uri_when_enabled() {
2550+
let mut config = Config::default();
2551+
config.upload_local_file_uri = true;
2552+
2553+
let path = create_temp_file();
2554+
let target = super::select_local_upload_target(&config, false, &path).await;
2555+
fs::remove_file(&path).expect("remove temp file");
2556+
2557+
match target {
2558+
super::UploadFileTarget::LocalUri(uri) => assert!(uri.starts_with("file://")),
2559+
super::UploadFileTarget::Multipart => panic!("expected local uri"),
2560+
}
2561+
}
2562+
2563+
#[test]
2564+
fn build_reqwest_client_returns_client() {
2565+
let client =
2566+
build_reqwest_client(reqwest::Client::builder()).expect("client should be built");
2567+
let _ = client;
2568+
}
2569+
2570+
#[test]
2571+
fn get_upload_bot_returns_error_when_missing() {
2572+
let state = UploadClientState {
2573+
bot: None,
2574+
raw_client: None,
2575+
upload_api_url: String::new(),
2576+
reuse_count: 0,
2577+
};
2578+
assert!(get_upload_bot(&state).is_err());
2579+
}
2580+
2581+
#[test]
2582+
fn get_upload_bot_returns_bot_when_present() {
2583+
let bot = Bot::new("token");
2584+
let state = UploadClientState {
2585+
bot: Some(bot),
2586+
raw_client: None,
2587+
upload_api_url: String::new(),
2588+
reuse_count: 0,
2589+
};
2590+
assert!(get_upload_bot(&state).is_ok());
2591+
}
2592+
2593+
#[tokio::test]
2594+
async fn acquire_download_permit_returns_error_when_closed() {
2595+
let semaphore = tokio::sync::Semaphore::new(1);
2596+
semaphore.close();
2597+
2598+
let err = match acquire_download_permit(&semaphore).await {
2599+
Ok(_) => panic!("expected error for closed semaphore"),
2600+
Err(err) => err,
2601+
};
2602+
let err_str = format!("{err}");
2603+
assert!(err_str.contains("download semaphore closed"));
2604+
}
2605+
2606+
#[tokio::test]
2607+
async fn fetch_detail_and_url_in_parallel() {
2608+
let (detail, url) = tokio::join!(async { 1 }, async { 2 });
2609+
assert_eq!(detail, 1);
2610+
assert_eq!(url, 2);
2611+
}
2612+
2613+
#[test]
2614+
fn append_search_result_line_formats_output() {
2615+
let mut results = String::new();
2616+
append_search_result_line(&mut results, 1, "Song", "Artist");
2617+
assert_eq!(results, "1.「Song」 - Artist\n");
2618+
}
2619+
24442620
#[test]
24452621
fn perf_log_includes_stage_label() {
24462622
let s = format_perf("download", std::time::Duration::from_millis(50));
@@ -2510,6 +2686,154 @@ mod tests {
25102686
assert!(super::should_set_upload_pool_idle_timeout(60));
25112687
}
25122688

2689+
#[test]
2690+
fn download_chunk_bytes_uses_configured_kib() {
2691+
let mut config = Config::default();
2692+
config.download_chunk_size_kb = 512;
2693+
2694+
assert_eq!(super::download_chunk_bytes(&config), 512 * 1024);
2695+
}
2696+
2697+
#[test]
2698+
fn download_chunk_bytes_clamps_zero_to_minimum() {
2699+
let mut config = Config::default();
2700+
config.download_chunk_size_kb = 0;
2701+
2702+
assert_eq!(
2703+
super::download_chunk_bytes(&config),
2704+
super::MIN_DOWNLOAD_CHUNK_BYTES
2705+
);
2706+
}
2707+
2708+
#[test]
2709+
fn upload_log_level_allows_thresholds() {
2710+
assert!(UploadLogLevel::Info.allows(UploadLogLevel::Error));
2711+
assert!(UploadLogLevel::Info.allows(UploadLogLevel::Warning));
2712+
assert!(UploadLogLevel::Info.allows(UploadLogLevel::Info));
2713+
assert!(!UploadLogLevel::Info.allows(UploadLogLevel::Debug));
2714+
assert!(!UploadLogLevel::None.allows(UploadLogLevel::Error));
2715+
}
2716+
2717+
#[test]
2718+
fn upload_limit_clamps_bounds() {
2719+
assert_eq!(super::upload_task_limit(0), 1);
2720+
assert_eq!(super::upload_task_limit(1), 1);
2721+
assert_eq!(super::upload_task_limit(4), 4);
2722+
assert_eq!(super::upload_task_limit(1000), 64);
2723+
}
2724+
2725+
#[test]
2726+
fn upload_client_refresh_decision_works() {
2727+
let has_bot = UploadClientState {
2728+
bot: Some(Bot::new("token")),
2729+
raw_client: None,
2730+
upload_api_url: String::new(),
2731+
reuse_count: 0,
2732+
};
2733+
let no_bot = UploadClientState {
2734+
bot: None,
2735+
raw_client: None,
2736+
upload_api_url: String::new(),
2737+
reuse_count: 0,
2738+
};
2739+
let exhausted = UploadClientState {
2740+
bot: Some(Bot::new("token")),
2741+
raw_client: None,
2742+
upload_api_url: String::new(),
2743+
reuse_count: 10,
2744+
};
2745+
2746+
assert!(super::should_refresh_upload_client(&no_bot, 10));
2747+
assert!(!super::should_refresh_upload_client(&has_bot, 10));
2748+
assert!(super::should_refresh_upload_client(&exhausted, 10));
2749+
assert!(!super::should_refresh_upload_client(&exhausted, 0));
2750+
}
2751+
2752+
#[tokio::test]
2753+
async fn upload_prewarm_failure_is_non_fatal() {
2754+
let config = crate::config::Config::default();
2755+
let ok = super::run_upload_prewarm(&config, || async {
2756+
Err::<(), crate::error::BotError>(crate::error::BotError::MusicApi(
2757+
"simulated prewarm failure".to_string(),
2758+
))
2759+
})
2760+
.await;
2761+
2762+
assert!(!ok);
2763+
}
2764+
2765+
#[tokio::test]
2766+
async fn upload_prewarm_runs_warmup_path() {
2767+
let config = crate::config::Config::default();
2768+
let ok =
2769+
super::run_upload_prewarm(&config, || async { Ok::<(), crate::error::BotError>(()) })
2770+
.await;
2771+
2772+
assert!(ok);
2773+
}
2774+
2775+
#[test]
2776+
fn parse_telegram_api_response_returns_error_when_http_200_ok_false() {
2777+
let body = r#"{"ok": false, "description": "chat not found"}"#;
2778+
let err = super::parse_telegram_api_response(body, reqwest::StatusCode::OK, "sendAudio")
2779+
.expect_err("ok=false should be treated as Telegram API error");
2780+
let err_msg = err.to_string();
2781+
2782+
assert!(err_msg.contains("chat not found"));
2783+
assert!(err_msg.contains("HTTP 200"));
2784+
}
2785+
2786+
#[test]
2787+
fn parse_telegram_api_response_returns_error_when_http_500_for_any_ok_flag() {
2788+
let cases = [
2789+
(r#"{"ok": true, "result": {}}"#, "unknown error"),
2790+
(
2791+
r#"{"ok": false, "description": "server failed"}"#,
2792+
"server failed",
2793+
),
2794+
];
2795+
2796+
for (body, expected_desc) in cases {
2797+
let err = super::parse_telegram_api_response(
2798+
body,
2799+
reqwest::StatusCode::INTERNAL_SERVER_ERROR,
2800+
"sendAudio",
2801+
)
2802+
.expect_err("non-2xx status should always be treated as error");
2803+
let err_msg = err.to_string();
2804+
2805+
assert!(err_msg.contains(expected_desc));
2806+
assert!(err_msg.contains("HTTP 500"));
2807+
}
2808+
}
2809+
2810+
#[test]
2811+
fn parse_telegram_api_response_returns_parse_error_for_non_json_body() {
2812+
let err = super::parse_telegram_api_response(
2813+
"<html>502 bad gateway</html>",
2814+
reqwest::StatusCode::OK,
2815+
"sendAudio",
2816+
)
2817+
.expect_err("non-JSON body should fail response parsing");
2818+
let err_msg = err.to_string();
2819+
2820+
assert!(err_msg.contains("Failed to parse upload response"));
2821+
}
2822+
2823+
#[test]
2824+
fn parse_telegram_api_response_uses_unknown_error_when_description_missing() {
2825+
let err = super::parse_telegram_api_response(
2826+
r#"{"ok": false}"#,
2827+
reqwest::StatusCode::OK,
2828+
"sendAudio",
2829+
)
2830+
.expect_err("missing description should still return a Telegram API error");
2831+
let err_msg = err.to_string();
2832+
2833+
assert!(err_msg.contains("unknown error"));
2834+
assert!(err_msg.contains("HTTP 200"));
2835+
}
2836+
25132837
#[test]
25142838
fn extract_file_id_reads_audio_field() {
25152839
let payload = serde_json::json!({

0 commit comments

Comments
 (0)