From 07a7ca62cdc1bd53a2745282d4358b7bab5edef3 Mon Sep 17 00:00:00 2001 From: Denis Cornehl Date: Sun, 22 Jun 2025 12:38:35 +0200 Subject: [PATCH] support streaming content from S3, start using it in the webserver --- ...0c477667d5f022349661385026c757df88cc.json} | 10 +- ...f4b3643b09cfea98ed8ba13e4dadb5b5e48c.json} | 15 +- Cargo.lock | 28 ++- Cargo.toml | 2 + src/storage/database.rs | 41 ++-- src/storage/mod.rs | 183 ++++++++++++++++-- src/storage/s3.rs | 32 ++- src/utils/html.rs | 9 +- src/web/file.rs | 36 +++- src/web/rustdoc.rs | 19 +- 10 files changed, 271 insertions(+), 104 deletions(-) rename .sqlx/{query-3bdc47a7b7457e290e2c63f9c22742d17a52940631caa0688d3c8b5e2c3765c8.json => query-2fd2aad681960b30ca5149dfc7050c477667d5f022349661385026c757df88cc.json} (73%) rename .sqlx/{query-f0239a895d0ef72aff8d99f77a35656d2642564a6a3c40d742fc1b62d1c80d59.json => query-ce9cf294c964be76b19585a35c37f4b3643b09cfea98ed8ba13e4dadb5b5e48c.json} (56%) diff --git a/.sqlx/query-3bdc47a7b7457e290e2c63f9c22742d17a52940631caa0688d3c8b5e2c3765c8.json b/.sqlx/query-2fd2aad681960b30ca5149dfc7050c477667d5f022349661385026c757df88cc.json similarity index 73% rename from .sqlx/query-3bdc47a7b7457e290e2c63f9c22742d17a52940631caa0688d3c8b5e2c3765c8.json rename to .sqlx/query-2fd2aad681960b30ca5149dfc7050c477667d5f022349661385026c757df88cc.json index 5c8d99956..9510799b3 100644 --- a/.sqlx/query-3bdc47a7b7457e290e2c63f9c22742d17a52940631caa0688d3c8b5e2c3765c8.json +++ b/.sqlx/query-2fd2aad681960b30ca5149dfc7050c477667d5f022349661385026c757df88cc.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT\n path, mime, date_updated, compression,\n substring(content from $2 for $3) as content,\n FALSE as \"is_too_big!\"\n FROM files\n WHERE path = $1;", + "query": "SELECT\n path, mime, date_updated, compression,\n substring(content from $2 for $3) as content\n FROM files\n WHERE path = $1;", "describe": { "columns": [ { @@ -27,11 +27,6 @@ "ordinal": 4, "name": "content", "type_info": "Bytea" - }, - { - "ordinal": 5, - "name": "is_too_big!", - "type_info": "Bool" } ], "parameters": { @@ -46,9 +41,8 @@ false, false, true, - null, null ] }, - "hash": "3bdc47a7b7457e290e2c63f9c22742d17a52940631caa0688d3c8b5e2c3765c8" + "hash": "2fd2aad681960b30ca5149dfc7050c477667d5f022349661385026c757df88cc" } diff --git a/.sqlx/query-f0239a895d0ef72aff8d99f77a35656d2642564a6a3c40d742fc1b62d1c80d59.json b/.sqlx/query-ce9cf294c964be76b19585a35c37f4b3643b09cfea98ed8ba13e4dadb5b5e48c.json similarity index 56% rename from .sqlx/query-f0239a895d0ef72aff8d99f77a35656d2642564a6a3c40d742fc1b62d1c80d59.json rename to .sqlx/query-ce9cf294c964be76b19585a35c37f4b3643b09cfea98ed8ba13e4dadb5b5e48c.json index 6476873e5..62e26518c 100644 --- a/.sqlx/query-f0239a895d0ef72aff8d99f77a35656d2642564a6a3c40d742fc1b62d1c80d59.json +++ b/.sqlx/query-ce9cf294c964be76b19585a35c37f4b3643b09cfea98ed8ba13e4dadb5b5e48c.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT\n path, mime, date_updated, compression,\n (CASE WHEN LENGTH(content) <= $2 THEN content ELSE NULL END) AS content,\n (LENGTH(content) > $2) AS \"is_too_big!\"\n FROM files\n WHERE path = $1;", + "query": "SELECT\n path,\n mime,\n date_updated,\n compression,\n content\n FROM files\n WHERE path = $1;", "describe": { "columns": [ { @@ -27,17 +27,11 @@ "ordinal": 4, "name": "content", "type_info": "Bytea" - }, - { - "ordinal": 5, - "name": "is_too_big!", - "type_info": "Bool" } ], "parameters": { "Left": [ - "Text", - "Int4" + "Text" ] }, "nullable": [ @@ -45,9 +39,8 @@ false, false, true, - null, - null + true ] }, - "hash": "f0239a895d0ef72aff8d99f77a35656d2642564a6a3c40d742fc1b62d1c80d59" + "hash": "ce9cf294c964be76b19585a35c37f4b3643b09cfea98ed8ba13e4dadb5b5e48c" } diff --git a/Cargo.lock b/Cargo.lock index db656e681..0ee412118 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -370,15 +370,18 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.24" +version = "0.4.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d615619615a650c571269c00dca41db04b9210037fa76ed8239f70404ab56985" +checksum = "40f6024f3f856663b45fd0c9b6f2024034a702f453549449e0d84a305900dad4" dependencies = [ + "bzip2 0.6.0", "flate2", "futures-core", "memchr", "pin-project-lite", "tokio", + "zstd", + "zstd-safe", ] [[package]] @@ -1213,6 +1216,15 @@ dependencies = [ "bzip2-sys", ] +[[package]] +name = "bzip2" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bea8dcd42434048e4f7a304411d9273a411f647446c1234a65ce0554923f4cff" +dependencies = [ + "libbz2-rs-sys", +] + [[package]] name = "bzip2-sys" version = "0.1.13+1.0.8" @@ -1944,6 +1956,7 @@ version = "0.6.0" dependencies = [ "anyhow", "askama", + "async-compression", "async-stream", "async-trait", "aws-config", @@ -1957,7 +1970,7 @@ dependencies = [ "axum-extra", "backtrace", "base64 0.22.1", - "bzip2", + "bzip2 0.5.2", "chrono", "clap", "comrak", @@ -2018,6 +2031,7 @@ dependencies = [ "thread_local", "time", "tokio", + "tokio-util", "toml", "tower", "tower-http", @@ -4088,6 +4102,12 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" +[[package]] +name = "libbz2-rs-sys" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "775bf80d5878ab7c2b1080b5351a48b2f737d9f6f8b383574eebcc22be0dfccb" + [[package]] name = "libc" version = "0.2.173" @@ -7930,7 +7950,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "153a6fff49d264c4babdcfa6b4d534747f520e56e8f0f384f3b808c4b64cc1fd" dependencies = [ "arbitrary", - "bzip2", + "bzip2 0.5.2", "crc32fast", "indexmap 2.9.0", "memchr", diff --git a/Cargo.toml b/Cargo.toml index 3b5342ba0..4635a17cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,8 +64,10 @@ derive_more = { version = "2.0.0", features = ["display"] } # Async tokio = { version = "1.0", features = ["rt-multi-thread", "signal", "macros"] } +tokio-util = { version = "0.7.15", default-features = false, features = ["io"] } futures-util = "0.3.5" async-stream = "0.3.5" +async-compression = { version = "0.4.25", features = ["tokio", "bzip2", "zstd", "gzip"] } aws-config = "1.0.0" aws-sdk-s3 = "1.3.0" aws-sdk-cloudfront = "1.3.0" diff --git a/src/storage/database.rs b/src/storage/database.rs index 0e03075fd..275b30409 100644 --- a/src/storage/database.rs +++ b/src/storage/database.rs @@ -1,9 +1,9 @@ -use super::{Blob, FileRange}; +use super::{Blob, FileRange, StreamingBlob}; use crate::{InstanceMetrics, db::Pool, error::Result}; use chrono::{DateTime, Utc}; use futures_util::stream::{Stream, TryStreamExt}; use sqlx::Acquire; -use std::sync::Arc; +use std::{io, sync::Arc}; pub(crate) struct DatabaseBackend { pool: Pool, @@ -58,38 +58,27 @@ impl DatabaseBackend { } } - pub(super) async fn get( + pub(super) async fn get_stream( &self, path: &str, - max_size: usize, range: Option, - ) -> Result { - // The maximum size for a BYTEA (the type used for `content`) is 1GB, so this cast is safe: - // https://www.postgresql.org/message-id/162867790712200946i7ba8eb92v908ac595c0c35aee%40mail.gmail.com - let max_size = max_size.min(i32::MAX as usize) as i32; - + ) -> Result { struct Result { path: String, mime: String, date_updated: DateTime, compression: Option, content: Option>, - is_too_big: bool, } let result = if let Some(r) = range { - // when we only want to get a range we can validate already if the range is small enough - if (r.end() - r.start() + 1) > max_size as u64 { - return Err(std::io::Error::other(crate::error::SizeLimitReached).into()); - } let range_start = i32::try_from(*r.start())?; sqlx::query_as!( Result, r#"SELECT path, mime, date_updated, compression, - substring(content from $2 for $3) as content, - FALSE as "is_too_big!" + substring(content from $2 for $3) as content FROM files WHERE path = $1;"#, path, @@ -105,35 +94,35 @@ impl DatabaseBackend { sqlx::query_as!( Result, r#"SELECT - path, mime, date_updated, compression, - (CASE WHEN LENGTH(content) <= $2 THEN content ELSE NULL END) AS content, - (LENGTH(content) > $2) AS "is_too_big!" + path, + mime, + date_updated, + compression, + content FROM files WHERE path = $1;"#, path, - max_size, ) .fetch_optional(&self.pool) .await? .ok_or(super::PathNotFoundError)? }; - if result.is_too_big { - return Err(std::io::Error::other(crate::error::SizeLimitReached).into()); - } - let compression = result.compression.map(|i| { i.try_into() .expect("invalid compression algorithm stored in database") }); - Ok(Blob { + let content = result.content.unwrap_or_default(); + let content_len = content.len(); + Ok(StreamingBlob { path: result.path, mime: result .mime .parse() .unwrap_or(mime::APPLICATION_OCTET_STREAM), date_updated: result.date_updated, - content: result.content.unwrap_or_default(), + content: Box::new(io::Cursor::new(content)), + content_length: content_len, compression, }) } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 11692a7d6..c680e2996 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -25,14 +25,17 @@ use path_slash::PathExt; use std::{ fmt, fs::{self, File}, - io::{self, BufReader}, + io::{self, BufReader, Write as _}, num::ParseIntError, ops::RangeInclusive, path::{Path, PathBuf}, sync::Arc, }; use std::{iter, str::FromStr}; -use tokio::{io::AsyncWriteExt, runtime::Runtime}; +use tokio::{ + io::{AsyncRead, AsyncReadExt as _, AsyncWriteExt}, + runtime::Runtime, +}; use tracing::{error, info_span, instrument, trace}; use walkdir::WalkDir; @@ -57,6 +60,80 @@ impl Blob { } } +pub(crate) struct StreamingBlob { + pub(crate) path: String, + pub(crate) mime: Mime, + pub(crate) date_updated: DateTime, + pub(crate) compression: Option, + pub(crate) content_length: usize, + pub(crate) content: Box, +} + +impl std::fmt::Debug for StreamingBlob { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("StreamingBlob") + .field("path", &self.path) + .field("mime", &self.mime) + .field("date_updated", &self.date_updated) + .field("compression", &self.compression) + .finish() + } +} + +impl StreamingBlob { + /// wrap the content stream in a streaming decompressor according to the + /// algorithm found in `compression` attribute. + pub(crate) fn decompress(mut self) -> Self { + let Some(alg) = self.compression else { + return self; + }; + + match alg { + CompressionAlgorithm::Zstd => { + self.content = Box::new(async_compression::tokio::bufread::ZstdDecoder::new( + tokio::io::BufReader::new(self.content), + )) + } + CompressionAlgorithm::Bzip2 => { + self.content = Box::new(async_compression::tokio::bufread::BzDecoder::new( + tokio::io::BufReader::new(self.content), + )) + } + CompressionAlgorithm::Gzip => { + self.content = Box::new(async_compression::tokio::bufread::GzipDecoder::new( + tokio::io::BufReader::new(self.content), + )) + } + }; + self.compression = None; + self + } + + pub(crate) async fn materialize(mut self, max_size: usize) -> Result { + self = self.decompress(); + + let mut content = crate::utils::sized_buffer::SizedBuffer::new(max_size); + content.reserve(self.content_length); + + let mut buf = [0u8; 8 * 1024]; + loop { + let n = self.content.read(&mut buf).await?; + if n == 0 { + break; + } + content.write_all(&buf[..n])?; + } + + Ok(Blob { + path: self.path, + mime: self.mime, + date_updated: self.date_updated, + content: content.into_inner(), + compression: self.compression, + }) + } +} + pub fn get_file_list>(path: P) -> Box>> { let path = path.as_ref().to_path_buf(); if path.is_file() { @@ -210,6 +287,35 @@ impl AsyncStorage { }) } + /// Fetch a rustdoc file from our blob storage. + /// * `name` - the crate name + /// * `version` - the crate version + /// * `latest_build_id` - the id of the most recent build. used purely to invalidate the local archive + /// index cache, when `archive_storage` is `true.` Without it we wouldn't know that we have + /// to invalidate the locally cached file after a rebuild. + /// * `path` - the wanted path inside the documentation. + /// * `archive_storage` - if `true`, we will assume we have a remove ZIP archive and an index + /// where we can fetch the requested path from inside the ZIP file. + #[instrument] + pub(crate) async fn stream_rustdoc_file( + &self, + name: &str, + version: &str, + latest_build_id: Option, + path: &str, + archive_storage: bool, + ) -> Result { + trace!("fetch rustdoc file"); + Ok(if archive_storage { + self.stream_from_archive(&rustdoc_archive_path(name, version), latest_build_id, path) + .await? + } else { + // Add rustdoc prefix, name and version to the path for accessing the file stored in the database + let remote_path = format!("rustdoc/{name}/{version}/{path}"); + self.get_stream(&remote_path).await? + }) + } + #[context("fetching {path} from {name} {version} (archive: {archive_storage})")] pub(crate) async fn fetch_source_file( &self, @@ -282,15 +388,16 @@ impl AsyncStorage { #[instrument] pub(crate) async fn get(&self, path: &str, max_size: usize) -> Result { - let mut blob = match &self.backend { - StorageBackend::Database(db) => db.get(path, max_size, None).await, - StorageBackend::S3(s3) => s3.get(path, max_size, None).await, + self.get_stream(path).await?.materialize(max_size).await + } + + #[instrument] + pub(crate) async fn get_stream(&self, path: &str) -> Result { + let blob = match &self.backend { + StorageBackend::Database(db) => db.get_stream(path, None).await, + StorageBackend::S3(s3) => s3.get_stream(path, None).await, }?; - if let Some(alg) = blob.compression { - blob.content = decompress(blob.content.as_slice(), alg, max_size)?; - blob.compression = None; - } - Ok(blob) + Ok(blob.decompress()) } #[instrument] @@ -301,18 +408,28 @@ impl AsyncStorage { range: FileRange, compression: Option, ) -> Result { + self.get_range_stream(path, range, compression) + .await? + .materialize(max_size) + .await + } + + #[instrument] + pub(super) async fn get_range_stream( + &self, + path: &str, + range: FileRange, + compression: Option, + ) -> Result { let mut blob = match &self.backend { - StorageBackend::Database(db) => db.get(path, max_size, Some(range)).await, - StorageBackend::S3(s3) => s3.get(path, max_size, Some(range)).await, + StorageBackend::Database(db) => db.get_stream(path, Some(range)).await, + StorageBackend::S3(s3) => s3.get_stream(path, Some(range)).await, }?; // `compression` represents the compression of the file-stream inside the archive. // We don't compress the whole archive, so the encoding of the archive's blob is irrelevant // here. - if let Some(alg) = compression { - blob.content = decompress(blob.content.as_slice(), alg, max_size)?; - blob.compression = None; - } - Ok(blob) + blob.compression = compression; + Ok(blob.decompress()) } #[instrument] @@ -389,6 +506,38 @@ impl AsyncStorage { }) } + #[instrument] + pub(crate) async fn stream_from_archive( + &self, + archive_path: &str, + latest_build_id: Option, + path: &str, + ) -> Result { + let index_filename = self + .download_archive_index(archive_path, latest_build_id) + .await?; + + let info = { + let path = path.to_owned(); + spawn_blocking(move || archive_index::find_in_file(index_filename, &path)).await + }? + .ok_or(PathNotFoundError)?; + + let blob = self + .get_range_stream(archive_path, info.range(), Some(info.compression())) + .await?; + assert_eq!(blob.compression, None); + + Ok(StreamingBlob { + path: format!("{archive_path}/{path}"), + mime: detect_mime(path), + date_updated: blob.date_updated, + content: blob.content, + content_length: blob.content_length, + compression: None, + }) + } + #[instrument(skip(self))] pub(crate) async fn store_all_in_archive( &self, diff --git a/src/storage/s3.rs b/src/storage/s3.rs index 8e94d4d0a..c3aa69eb2 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -1,4 +1,4 @@ -use super::{Blob, FileRange}; +use super::{Blob, FileRange, StreamingBlob}; use crate::{Config, InstanceMetrics}; use anyhow::{Context as _, Error}; use async_stream::try_stream; @@ -16,7 +16,7 @@ use futures_util::{ pin_mut, stream::{FuturesUnordered, Stream, StreamExt}, }; -use std::{io::Write, sync::Arc}; +use std::sync::Arc; use tracing::{error, warn}; const PUBLIC_ACCESS_TAG: &str = "static-cloudfront-access"; @@ -174,12 +174,11 @@ impl S3Backend { .map(|_| ()) } - pub(super) async fn get( + pub(super) async fn get_stream( &self, path: &str, - max_size: usize, range: Option, - ) -> Result { + ) -> Result { let res = self .client .get_object() @@ -190,19 +189,6 @@ impl S3Backend { .await .convert_errors()?; - let mut content = crate::utils::sized_buffer::SizedBuffer::new(max_size); - content.reserve( - res.content_length - .and_then(|length| length.try_into().ok()) - .unwrap_or(0), - ); - - let mut body = res.body; - - while let Some(data) = body.next().await.transpose()? { - content.write_all(data.as_ref())?; - } - let date_updated = res .last_modified // This is a bug from AWS, it should always have a modified date of when it was created if nothing else. @@ -210,9 +196,9 @@ impl S3Backend { .and_then(|dt| dt.to_chrono_utc().ok()) .unwrap_or_else(Utc::now); - let compression = res.content_encoding.and_then(|s| s.parse().ok()); + let compression = res.content_encoding.as_ref().and_then(|s| s.parse().ok()); - Ok(Blob { + Ok(StreamingBlob { path: path.into(), mime: res .content_type @@ -221,7 +207,11 @@ impl S3Backend { .parse() .unwrap_or(mime::APPLICATION_OCTET_STREAM), date_updated, - content: content.into_inner(), + content_length: res + .content_length + .and_then(|length| length.try_into().ok()) + .unwrap_or(0), + content: Box::new(res.body.into_async_read()), compression, }) } diff --git a/src/utils/html.rs b/src/utils/html.rs index b7bc32658..965b74f5c 100644 --- a/src/utils/html.rs +++ b/src/utils/html.rs @@ -1,8 +1,9 @@ -use crate::web::page::templates::{Body, Head, Vendored}; -use crate::web::rustdoc::RustdocPage; +use crate::web::{ + page::templates::{Body, Head, Vendored}, + rustdoc::RustdocPage, +}; use askama::Template; -use lol_html::element; -use lol_html::errors::RewritingError; +use lol_html::{element, errors::RewritingError}; /// Rewrite a rustdoc page to have the docs.rs topbar /// diff --git a/src/web/file.rs b/src/web/file.rs index bf3ea8768..cd04bb4df 100644 --- a/src/web/file.rs +++ b/src/web/file.rs @@ -4,10 +4,10 @@ use super::cache::CachePolicy; use crate::{ Config, error::Result, - storage::{AsyncStorage, Blob}, + storage::{AsyncStorage, Blob, StreamingBlob}, }; - use axum::{ + body::Body, extract::Extension, http::{ StatusCode, @@ -15,6 +15,7 @@ use axum::{ }, response::{IntoResponse, Response as AxumResponse}, }; +use tokio_util::io::ReaderStream; #[derive(Debug)] pub(crate) struct File(pub(crate) Blob); @@ -54,6 +55,37 @@ impl IntoResponse for File { } } +#[derive(Debug)] +pub(crate) struct StreamingFile(pub(crate) StreamingBlob); + +impl StreamingFile { + /// Gets file from database + pub(super) async fn from_path(storage: &AsyncStorage, path: &str) -> Result { + Ok(StreamingFile(storage.get_stream(path).await?)) + } +} + +impl IntoResponse for StreamingFile { + fn into_response(self) -> AxumResponse { + // Convert the AsyncBufRead into a Stream of Bytes + let stream = ReaderStream::new(self.0.content); + let body = Body::from_stream(stream); + ( + StatusCode::OK, + [ + (CONTENT_TYPE, self.0.mime.as_ref()), + ( + LAST_MODIFIED, + &self.0.date_updated.format("%a, %d %b %Y %T %Z").to_string(), + ), + ], + Extension(CachePolicy::ForeverInCdnAndBrowser), + body, + ) + .into_response() + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/web/rustdoc.rs b/src/web/rustdoc.rs index 1a2e98598..5ee9d6ddf 100644 --- a/src/web/rustdoc.rs +++ b/src/web/rustdoc.rs @@ -16,7 +16,7 @@ use crate::{ encode_url_path, error::{AxumNope, AxumResult, EscapedURI}, extractors::{DbConnection, Path}, - file::File, + file::{File, StreamingFile}, match_version, page::{ TemplateData, @@ -75,7 +75,6 @@ pub(crate) struct RustdocRedirectorParams { /// See also https://github.com/rust-lang/docs.rs/pull/1889 async fn try_serve_legacy_toolchain_asset( storage: Arc, - config: Arc, path: impl AsRef, ) -> AxumResult { let path = path.as_ref().to_owned(); @@ -87,18 +86,17 @@ async fn try_serve_legacy_toolchain_asset( // since new nightly versions will always put their // toolchain specific resources into the new folder, // which is reached via the new handler. - Ok(File::from_path(&storage, &path, &config) + Ok(StreamingFile::from_path(&storage, &path) .await .map(IntoResponse::into_response)?) } /// Handler called for `/:crate` and `/:crate/:version` URLs. Automatically redirects to the docs /// or crate details page based on whether the given crate version was successfully built. -#[instrument(skip(storage, config, conn))] +#[instrument(skip(storage, conn))] pub(crate) async fn rustdoc_redirector_handler( Path(params): Path, Extension(storage): Extension>, - Extension(config): Extension>, mut conn: DbConnection, Query(query_pairs): Query>, uri: Uri, @@ -129,7 +127,7 @@ pub(crate) async fn rustdoc_redirector_handler( .binary_search(&extension) .is_ok() { - return try_serve_legacy_toolchain_asset(storage, config, params.name) + return try_serve_legacy_toolchain_asset(storage, params.name) .instrument(info_span!("serve static asset")) .await; } @@ -183,7 +181,7 @@ pub(crate) async fn rustdoc_redirector_handler( let krate = CrateDetails::from_matched_release(&mut conn, matched_release).await?; match storage - .fetch_rustdoc_file( + .stream_rustdoc_file( &crate_name, &krate.version.to_string(), krate.latest_build_id, @@ -192,7 +190,7 @@ pub(crate) async fn rustdoc_redirector_handler( ) .await { - Ok(blob) => Ok(File(blob).into_response()), + Ok(blob) => Ok(StreamingFile(blob).into_response()), Err(err) => { if !matches!(err.downcast_ref(), Some(AxumNope::ResourceNotFound)) && !matches!( @@ -208,7 +206,7 @@ pub(crate) async fn rustdoc_redirector_handler( // docs that were affected by this bug. // https://github.com/rust-lang/docs.rs/issues/1979 if target.starts_with("search-") || target.starts_with("settings-") { - try_serve_legacy_toolchain_asset(storage, config, target).await + try_serve_legacy_toolchain_asset(storage, target).await } else { Err(err.into()) } @@ -999,11 +997,10 @@ pub(crate) async fn download_handler( pub(crate) async fn static_asset_handler( Path(path): Path, Extension(storage): Extension>, - Extension(config): Extension>, ) -> AxumResult { let storage_path = format!("{RUSTDOC_STATIC_STORAGE_PREFIX}{path}"); - Ok(File::from_path(&storage, &storage_path, &config).await?) + Ok(StreamingFile::from_path(&storage, &storage_path).await?) } #[cfg(test)]