Skip to content

stream rustdoc html content from S3, use streaming rewriter, stream to client #2872

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 0 additions & 51 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,40 +253,6 @@ impl AsyncStorage {
}
}

/// Fetch a rustdoc file from our blob storage.
/// * `name` - the crate name
/// * `version` - the crate version
/// * `latest_build_id` - the id of the most recent build. used purely to invalidate the local archive
/// index cache, when `archive_storage` is `true.` Without it we wouldn't know that we have
/// to invalidate the locally cached file after a rebuild.
/// * `path` - the wanted path inside the documentation.
/// * `archive_storage` - if `true`, we will assume we have a remove ZIP archive and an index
/// where we can fetch the requested path from inside the ZIP file.
#[instrument]
pub(crate) async fn fetch_rustdoc_file(
&self,
name: &str,
version: &str,
latest_build_id: Option<BuildId>,
path: &str,
archive_storage: bool,
) -> Result<Blob> {
trace!("fetch rustdoc file");
Ok(if archive_storage {
self.get_from_archive(
&rustdoc_archive_path(name, version),
latest_build_id,
path,
self.max_file_size_for(path),
)
.await?
} else {
// Add rustdoc prefix, name and version to the path for accessing the file stored in the database
let remote_path = format!("rustdoc/{name}/{version}/{path}");
self.get(&remote_path, self.max_file_size_for(path)).await?
})
}

/// Fetch a rustdoc file from our blob storage.
/// * `name` - the crate name
/// * `version` - the crate version
Expand Down Expand Up @@ -840,23 +806,6 @@ impl Storage {
.block_on(self.inner.set_public_access(path, public))
}

pub(crate) fn fetch_rustdoc_file(
&self,
name: &str,
version: &str,
latest_build_id: Option<BuildId>,
path: &str,
archive_storage: bool,
) -> Result<Blob> {
self.runtime.block_on(self.inner.fetch_rustdoc_file(
name,
version,
latest_build_id,
path,
archive_storage,
))
}

pub(crate) fn fetch_source_file(
&self,
name: &str,
Expand Down
286 changes: 195 additions & 91 deletions src/utils/html.rs
Original file line number Diff line number Diff line change
@@ -1,109 +1,213 @@
use crate::web::{
page::templates::{Body, Head, Vendored},
rustdoc::RustdocPage,
use crate::{
InstanceMetrics,
web::{
page::{
TemplateData,
templates::{Body, Head, Vendored},
},
rustdoc::RustdocPage,
},
};
use askama::Template;
use async_stream::stream;
use axum::body::Bytes;
use futures_util::{Stream, StreamExt as _};
use lol_html::{element, errors::RewritingError};
use std::sync::Arc;
use tokio::{io::AsyncRead, task::JoinHandle};
use tokio_util::io::ReaderStream;
use tracing::error;

#[derive(thiserror::Error, Debug)]
pub(crate) enum RustdocRewritingError {
#[error("HTML rewriter error: {0}")]
RewritingError(#[from] lol_html::errors::RewritingError),
#[error("generic error while rewriting rustdoc HTML: {0}")]
Other(#[from] anyhow::Error),
}

/// Rewrite a rustdoc page to have the docs.rs topbar
///
/// Given a rustdoc HTML page and a context to serialize it with,
/// render the `rustdoc/` templates with the `html`.
/// The output is an HTML page which has not yet been UTF-8 validated.
/// In practice, the output should always be valid UTF-8.
pub(crate) fn rewrite_lol(
html: &[u8],
pub(crate) fn rewrite_rustdoc_html_stream<R>(
template_data: Arc<TemplateData>,
mut reader: R,
max_allowed_memory_usage: usize,
data: &RustdocPage,
) -> Result<Vec<u8>, RewritingError> {
use lol_html::html_content::{ContentType, Element};
use lol_html::{HtmlRewriter, MemorySettings, Settings};

let head_html = Head::new(data).render().unwrap();
let vendored_html = Vendored.render().unwrap();
let body_html = Body.render().unwrap();
let topbar_html = data.render().unwrap();

// Before: <body> ... rustdoc content ... </body>
// After:
// ```html
// <div id="rustdoc_body_wrapper" class="{{ rustdoc_body_class }}" tabindex="-1">
// ... rustdoc content ...
// </div>
// ```
let body_handler = |rustdoc_body_class: &mut Element| {
// Add the `rustdoc` classes to the html body
let mut tmp;
let klass = if let Some(classes) = rustdoc_body_class.get_attribute("class") {
tmp = classes;
tmp.push_str(" container-rustdoc");
&tmp
} else {
"container-rustdoc"
};
rustdoc_body_class.set_attribute("class", klass)?;
rustdoc_body_class.set_attribute("id", "rustdoc_body_wrapper")?;
rustdoc_body_class.set_attribute("tabindex", "-1")?;
// Change the `body` to a `div`
rustdoc_body_class.set_tag_name("div")?;
// Prepend the askama content
rustdoc_body_class.prepend(&body_html, ContentType::Html);
// Wrap the transformed body and topbar into a <body> element
rustdoc_body_class.before(r#"<body class="rustdoc-page">"#, ContentType::Html);
// Insert the topbar outside of the rustdoc div
rustdoc_body_class.before(&topbar_html, ContentType::Html);
// Finalize body with </body>
rustdoc_body_class.after("</body>", ContentType::Html);

Ok(())
};

let settings = Settings {
element_content_handlers: vec![
// Append `style.css` stylesheet after all head elements.
element!("head", |head: &mut Element| {
head.append(&head_html, ContentType::Html);
Ok(())
}),
element!("body", body_handler),
// Append `vendored.css` before `rustdoc.css`, so that the duplicate copy of
// `normalize.css` will be overridden by the later version.
//
// Later rustdoc has `#mainThemeStyle` that could be used, but pre-2018 docs
// don't have this:
//
// https://github.com/rust-lang/rust/commit/003b2bc1c65251ec2fc80b78ed91c43fb35402ec
//
// Pre-2018 rustdoc also didn't have the resource suffix, but docs.rs was using a fork
// that had implemented it already then, so we can assume the css files are
// `<some path>/rustdoc-<some suffix>.css` and use the `-` to distinguish from the
// `rustdoc.static` path.
element!(
"link[rel='stylesheet'][href*='rustdoc-']",
|rustdoc_css: &mut Element| {
rustdoc_css.before(&vendored_html, ContentType::Html);
data: Arc<RustdocPage>,
metrics: Arc<InstanceMetrics>,
) -> impl Stream<Item = Result<Bytes, RustdocRewritingError>>
where
R: AsyncRead + Unpin + 'static,
{
stream!({
let (input_sender, input_receiver) = std::sync::mpsc::channel::<Option<Vec<u8>>>();
let (result_sender, mut result_receiver) = tokio::sync::mpsc::unbounded_channel::<Bytes>();

let join_handle: JoinHandle<anyhow::Result<_>> = tokio::spawn(async move {
// we're using the rendering threadpool to limit CPU usage on the server, and to
// offload potentially CPU intensive stuff from the tokio runtime.
// Also this lets us limit the threadpool size and through that the CPU usage.
template_data
.render_in_threadpool(move || {
use lol_html::html_content::{ContentType, Element};
use lol_html::{HtmlRewriter, MemorySettings, Settings};

let head_html = Head::new(&data).render().unwrap();
let vendored_html = Vendored.render().unwrap();
let body_html = Body.render().unwrap();
let topbar_html = data.render().unwrap();

// Before: <body> ... rustdoc content ... </body>
// After:
// ```html
// <div id="rustdoc_body_wrapper" class="{{ rustdoc_body_class }}" tabindex="-1">
// ... rustdoc content ...
// </div>
// ```
let body_handler = |rustdoc_body_class: &mut Element| {
// Add the `rustdoc` classes to the html body
let mut tmp;
let klass = if let Some(classes) = rustdoc_body_class.get_attribute("class")
{
tmp = classes;
tmp.push_str(" container-rustdoc");
&tmp
} else {
"container-rustdoc"
};
rustdoc_body_class.set_attribute("class", klass)?;
rustdoc_body_class.set_attribute("id", "rustdoc_body_wrapper")?;
rustdoc_body_class.set_attribute("tabindex", "-1")?;
// Change the `body` to a `div`
rustdoc_body_class.set_tag_name("div")?;
// Prepend the askama content
rustdoc_body_class.prepend(&body_html, ContentType::Html);
// Wrap the transformed body and topbar into a <body> element
rustdoc_body_class
.before(r#"<body class="rustdoc-page">"#, ContentType::Html);
// Insert the topbar outside of the rustdoc div
rustdoc_body_class.before(&topbar_html, ContentType::Html);
// Finalize body with </body>
rustdoc_body_class.after("</body>", ContentType::Html);

Ok(())
};

let settings = Settings {
element_content_handlers: vec![
// Append `style.css` stylesheet after all head elements.
element!("head", |head: &mut Element| {
head.append(&head_html, ContentType::Html);
Ok(())
}),
element!("body", body_handler),
// Append `vendored.css` before `rustdoc.css`, so that the duplicate copy of
// `normalize.css` will be overridden by the later version.
//
// Later rustdoc has `#mainThemeStyle` that could be used, but pre-2018 docs
// don't have this:
//
// https://github.com/rust-lang/rust/commit/003b2bc1c65251ec2fc80b78ed91c43fb35402ec
//
// Pre-2018 rustdoc also didn't have the resource suffix, but docs.rs was using a fork
// that had implemented it already then, so we can assume the css files are
// `<some path>/rustdoc-<some suffix>.css` and use the `-` to distinguish from the
// `rustdoc.static` path.
element!(
"link[rel='stylesheet'][href*='rustdoc-']",
move |rustdoc_css: &mut Element| {
rustdoc_css.before(&vendored_html, ContentType::Html);
Ok(())
}
),
],
memory_settings: MemorySettings {
max_allowed_memory_usage,
..MemorySettings::default()
},
..Settings::default()
};

let mut rewriter = HtmlRewriter::new(settings, move |chunk: &[u8]| {
// send the result back to the main rewriter when its coming in.
// this can fail only when the receiver is dropped, in which case
// we exit this thread anyways.
let _ = result_sender.send(Bytes::from(chunk.to_vec()));
});
while let Some(chunk) = input_receiver.recv()? {
// receive data from the input receiver.
// `input_receiver` is a non-async one.
// Since we're in a normal background thread, we can use the blocking `.recv`
// here.
// We will get `None` when the reader is done reading,
// so that's our signal to exit this loop and call `rewriter.end()` below.
rewriter.write(&chunk)?;
}
// finalize everything. Will trigger the output sink (and through that,
// sending data to the `result_sender`).
rewriter.end()?;
Ok(())
}
),
],
memory_settings: MemorySettings {
max_allowed_memory_usage,
..MemorySettings::default()
},
..Settings::default()
};
})
.await?;
Ok(())
});

let mut reader_stream = ReaderStream::new(&mut reader);
while let Some(chunk) = reader_stream.next().await {
let chunk = chunk.map_err(|err| {
error!(?err, "error while reading from rustdoc HTML reader");
RustdocRewritingError::Other(err.into())
})?;

// The input and output are always strings, we just use `&[u8]` so we only have to validate once.
let mut buffer = Vec::new();
// TODO: Make the rewriter persistent?
let mut writer = HtmlRewriter::new(settings, |bytes: &[u8]| {
buffer.extend_from_slice(bytes);
});
if let Err(err) = input_sender.send(Some(chunk.to_vec())) {
error!(
?err,
"error when trying to send chunk to html rewriter thread"
);
yield Err(RustdocRewritingError::Other(err.into()));
break;
}

writer.write(html)?;
writer.end()?;
while let Ok(bytes) = result_receiver.try_recv() {
yield Ok(bytes);
}
}
// This signals the renderer thread to finalize & exit.
if let Err(err) = input_sender.send(None) {
error!(
?err,
"error when trying to send end signal to html rewriter thread"
);
yield Err(RustdocRewritingError::Other(err.into()));
}
while let Some(bytes) = result_receiver.recv().await {
yield Ok(bytes);
}

Ok(buffer)
join_handle.await.expect("Task panicked").map_err(|e| {
error!(
?e,
memory_limit = max_allowed_memory_usage,
"error while rewriting rustdoc HTML"
);
// our `render_in_threadpool` and so the async tokio task return an `anyhow::Result`.
// In most cases this will be an error from the `HtmlRewriter`, which we'll get as a
// `RewritingError` which we extract here again. The other cases remain an
// `anyhow::Error`.
match e.downcast::<RewritingError>() {
Ok(e) => {
if matches!(e, RewritingError::MemoryLimitExceeded(_)) {
metrics.html_rewrite_ooms.inc();
}
RustdocRewritingError::RewritingError(e)
}
Err(e) => RustdocRewritingError::Other(e),
}
})?;
})
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
pub(crate) use self::cargo_metadata::{CargoMetadata, Package as MetadataPackage};
pub(crate) use self::copy::copy_dir_all;
pub use self::daemon::{start_daemon, watch_registry};
pub(crate) use self::html::rewrite_lol;
pub(crate) use self::html::rewrite_rustdoc_html_stream;
pub use self::queue::{
get_crate_pattern_and_priority, get_crate_priority, list_crate_priorities,
remove_crate_priority, set_crate_priority,
Expand Down
Loading
Loading