Skip to content

Commit 1fde992

Browse files
committed
Refactor integration for post processing
1 parent 807f845 commit 1fde992

File tree

6 files changed

+301
-494
lines changed

6 files changed

+301
-494
lines changed

crates/common/src/html_processor.rs

Lines changed: 77 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,84 @@
22
//!
33
//! This module provides a StreamProcessor implementation for HTML content.
44
use std::cell::Cell;
5+
use std::io;
56
use std::rc::Rc;
7+
use std::sync::Arc;
68

79
use lol_html::{element, html_content::ContentType, text, Settings as RewriterSettings};
810

911
use crate::integrations::{
10-
AttributeRewriteOutcome, IntegrationAttributeContext, IntegrationRegistry,
11-
IntegrationScriptContext, ScriptRewriteAction,
12+
AttributeRewriteOutcome, IntegrationAttributeContext, IntegrationHtmlContext,
13+
IntegrationHtmlPostProcessor, IntegrationRegistry, IntegrationScriptContext,
14+
ScriptRewriteAction,
1215
};
1316
use crate::settings::Settings;
1417
use crate::streaming_processor::{HtmlRewriterAdapter, StreamProcessor};
1518
use crate::tsjs;
1619

20+
struct HtmlWithPostProcessing {
21+
inner: HtmlRewriterAdapter,
22+
post_processors: Vec<Arc<dyn IntegrationHtmlPostProcessor>>,
23+
origin_host: String,
24+
request_host: String,
25+
request_scheme: String,
26+
}
27+
28+
impl StreamProcessor for HtmlWithPostProcessing {
29+
fn process_chunk(&mut self, chunk: &[u8], is_last: bool) -> Result<Vec<u8>, io::Error> {
30+
let output = self.inner.process_chunk(chunk, is_last)?;
31+
if !is_last || output.is_empty() || self.post_processors.is_empty() {
32+
return Ok(output);
33+
}
34+
35+
let Ok(output_str) = std::str::from_utf8(&output) else {
36+
return Ok(output);
37+
};
38+
39+
let ctx = IntegrationHtmlContext {
40+
request_host: &self.request_host,
41+
request_scheme: &self.request_scheme,
42+
origin_host: &self.origin_host,
43+
};
44+
45+
// Preflight to avoid allocating a `String` unless at least one post-processor wants to run.
46+
if !self
47+
.post_processors
48+
.iter()
49+
.any(|p| p.should_process(output_str, &ctx))
50+
{
51+
return Ok(output);
52+
}
53+
54+
let mut html = String::from_utf8(output).map_err(|e| {
55+
io::Error::other(format!(
56+
"HTML post-processing expected valid UTF-8 output: {e}"
57+
))
58+
})?;
59+
60+
let mut changed = false;
61+
for processor in &self.post_processors {
62+
if processor.should_process(&html, &ctx) {
63+
changed |= processor.post_process(&mut html, &ctx);
64+
}
65+
}
66+
67+
if changed {
68+
log::info!(
69+
"HTML post-processing complete: origin_host={}, output_len={}",
70+
self.origin_host,
71+
html.len()
72+
);
73+
}
74+
75+
Ok(html.into_bytes())
76+
}
77+
78+
fn reset(&mut self) {
79+
self.inner.reset();
80+
}
81+
}
82+
1783
/// Configuration for HTML processing
1884
#[derive(Clone)]
1985
pub struct HtmlProcessorConfig {
@@ -43,6 +109,8 @@ impl HtmlProcessorConfig {
43109

44110
/// Create an HTML processor with URL replacement and optional Prebid injection
45111
pub fn create_html_processor(config: HtmlProcessorConfig) -> impl StreamProcessor {
112+
let post_processors = config.integrations.html_post_processors();
113+
46114
// Simplified URL patterns structure - stores only core data and generates variants on-demand
47115
struct UrlPatterns {
48116
origin_host: String,
@@ -343,7 +411,13 @@ pub fn create_html_processor(config: HtmlProcessorConfig) -> impl StreamProcesso
343411
..RewriterSettings::default()
344412
};
345413

346-
HtmlRewriterAdapter::new(rewriter_settings)
414+
HtmlWithPostProcessing {
415+
inner: HtmlRewriterAdapter::new(rewriter_settings),
416+
post_processors,
417+
origin_host: config.origin_host,
418+
request_host: config.request_host,
419+
request_scheme: config.request_scheme,
420+
}
347421
}
348422

349423
#[cfg(test)]

crates/common/src/integrations/nextjs.rs

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -95,43 +95,40 @@ impl IntegrationHtmlPostProcessor for NextJsHtmlPostProcessor {
9595
NEXTJS_INTEGRATION_ID
9696
}
9797

98-
fn post_process(&self, html: &str, ctx: &IntegrationHtmlContext<'_>) -> String {
99-
log::info!(
100-
"NextJs post-processor called: enabled={}, rewrite_attributes={:?}, html_len={}, origin={}, proxy={}://{}",
101-
self.config.enabled,
102-
self.config.rewrite_attributes,
103-
html.len(),
104-
ctx.origin_host,
105-
ctx.request_scheme,
106-
ctx.request_host
107-
);
108-
98+
fn should_process(&self, html: &str, ctx: &IntegrationHtmlContext<'_>) -> bool {
10999
if !self.config.enabled || self.config.rewrite_attributes.is_empty() {
110-
log::info!("NextJs post-processor skipped (disabled or no attributes)");
111-
return html.to_string();
100+
return false;
101+
}
102+
103+
// Only Next.js App Router pages will contain `__next_f` pushes.
104+
// Also require an origin host hit to avoid running on already-rewritten pages.
105+
html.contains("__next_f.push") && html.contains(ctx.origin_host)
106+
}
107+
108+
fn post_process(&self, html: &mut String, ctx: &IntegrationHtmlContext<'_>) -> bool {
109+
if !self.should_process(html, ctx) {
110+
return false;
112111
}
113112

114-
// Count origin URLs before
115113
let origin_before = html.matches(ctx.origin_host).count();
116114
log::info!(
117-
"NextJs post-processor: {} origin URLs before rewrite",
118-
origin_before
115+
"NextJs post-processor running: html_len={}, origin_matches={}, origin={}, proxy={}://{}",
116+
html.len(),
117+
origin_before,
118+
ctx.origin_host,
119+
ctx.request_scheme,
120+
ctx.request_host
119121
);
120122

121123
let result =
122124
post_process_rsc_html(html, ctx.origin_host, ctx.request_host, ctx.request_scheme);
123125

124-
// Count after
125-
let origin_after = result.matches(ctx.origin_host).count();
126-
let proxy_after = result.matches(ctx.request_host).count();
127-
log::info!(
128-
"NextJs post-processor complete: input_len={}, output_len={}, origin_remaining={}, proxy_urls={}",
129-
html.len(),
130-
result.len(),
131-
origin_after,
132-
proxy_after
133-
);
134-
result
126+
if result == *html {
127+
return false;
128+
}
129+
130+
*html = result;
131+
true
135132
}
136133
}
137134

@@ -291,14 +288,14 @@ impl IntegrationScriptRewriter for NextJsScriptRewriter {
291288
match self.mode {
292289
NextJsRewriteMode::Structured => self.rewrite_structured(content, ctx),
293290
NextJsRewriteMode::Streamed => {
294-
// RSC push scripts (self.__next_f.push) are handled by the post-processor
291+
// RSC push scripts (__next_f.push) are handled by the post-processor
295292
// because T-chunks can span multiple scripts and require combined processing.
296293
// Only handle non-RSC scripts here.
297-
if content.contains("self.__next_f.push") {
294+
if content.contains("__next_f.push") {
298295
return ScriptRewriteAction::keep();
299296
}
300297
// For other __next_f scripts (like initialization), use simple URL rewriting
301-
if content.contains("self.__next_f") {
298+
if content.contains("__next_f") {
302299
return self.rewrite_streamed(content, ctx);
303300
}
304301
ScriptRewriteAction::keep()

crates/common/src/integrations/registry.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -264,10 +264,19 @@ pub trait IntegrationHtmlPostProcessor: Send + Sync {
264264
/// Identifier for logging/diagnostics.
265265
fn integration_id(&self) -> &'static str;
266266

267+
/// Fast preflight check to decide whether post-processing should run for this document.
268+
///
269+
/// Implementations should keep this cheap (e.g., a substring check) because it may run on
270+
/// every HTML response when the integration is enabled.
271+
fn should_process(&self, html: &str, ctx: &IntegrationHtmlContext<'_>) -> bool {
272+
let _ = (html, ctx);
273+
true
274+
}
275+
267276
/// Post-process complete HTML content.
268277
/// This is called after streaming HTML processing with the complete HTML.
269-
/// Return the modified HTML or the original if no changes needed.
270-
fn post_process(&self, html: &str, ctx: &IntegrationHtmlContext<'_>) -> String;
278+
/// Implementations should mutate `html` in-place and return `true` when changes were made.
279+
fn post_process(&self, html: &mut String, ctx: &IntegrationHtmlContext<'_>) -> bool;
271280
}
272281

273282
/// Registration payload returned by integration builders.

crates/common/src/publisher.rs

Lines changed: 2 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -1,75 +1,20 @@
11
use error_stack::{Report, ResultExt};
22
use fastly::http::{header, StatusCode};
33
use fastly::{Body, Request, Response};
4-
use std::io::Write;
54

65
use crate::backend::ensure_backend_from_url;
76
use crate::http_util::serve_static_with_etag;
87

98
use crate::constants::{HEADER_SYNTHETIC_TRUSTED_SERVER, HEADER_X_COMPRESS_HINT};
109
use crate::cookies::create_synthetic_cookie;
1110
use crate::error::TrustedServerError;
12-
use crate::integrations::{IntegrationHtmlContext, IntegrationRegistry};
11+
use crate::integrations::IntegrationRegistry;
1312
use crate::rsc_flight::RscFlightUrlRewriter;
1413
use crate::settings::Settings;
1514
use crate::streaming_processor::{Compression, PipelineConfig, StreamProcessor, StreamingPipeline};
1615
use crate::streaming_replacer::create_url_replacer;
1716
use crate::synthetic::get_or_generate_synthetic_id;
1817

19-
/// Compress data using the specified compression algorithm
20-
fn compress_data(
21-
data: &[u8],
22-
compression: Compression,
23-
) -> Result<Vec<u8>, Report<TrustedServerError>> {
24-
match compression {
25-
Compression::None => Ok(data.to_vec()),
26-
Compression::Gzip => {
27-
use flate2::write::GzEncoder;
28-
use flate2::Compression as GzCompression;
29-
let mut encoder = GzEncoder::new(Vec::new(), GzCompression::default());
30-
encoder
31-
.write_all(data)
32-
.change_context(TrustedServerError::Proxy {
33-
message: "Failed to gzip compress data".to_string(),
34-
})?;
35-
encoder.finish().change_context(TrustedServerError::Proxy {
36-
message: "Failed to finish gzip compression".to_string(),
37-
})
38-
}
39-
Compression::Deflate => {
40-
use flate2::write::ZlibEncoder;
41-
use flate2::Compression as ZlibCompression;
42-
let mut encoder = ZlibEncoder::new(Vec::new(), ZlibCompression::default());
43-
encoder
44-
.write_all(data)
45-
.change_context(TrustedServerError::Proxy {
46-
message: "Failed to deflate compress data".to_string(),
47-
})?;
48-
encoder.finish().change_context(TrustedServerError::Proxy {
49-
message: "Failed to finish deflate compression".to_string(),
50-
})
51-
}
52-
Compression::Brotli => {
53-
use brotli::enc::writer::CompressorWriter;
54-
use brotli::enc::BrotliEncoderParams;
55-
let params = BrotliEncoderParams {
56-
quality: 4, // Balance speed and compression
57-
..Default::default()
58-
};
59-
let mut output = Vec::new();
60-
{
61-
let mut writer = CompressorWriter::with_params(&mut output, 4096, &params);
62-
writer
63-
.write_all(data)
64-
.change_context(TrustedServerError::Proxy {
65-
message: "Failed to brotli compress data".to_string(),
66-
})?;
67-
}
68-
Ok(output)
69-
}
70-
}
71-
}
72-
7318
/// Detects the request scheme (HTTP or HTTPS) using Fastly SDK methods and headers.
7419
///
7520
/// Tries multiple methods in order of reliability:
@@ -199,67 +144,14 @@ fn process_response_streaming(
199144
params.integration_registry,
200145
)?;
201146

202-
// Check if we have post-processors that need uncompressed HTML
203-
let post_processors = params.integration_registry.html_post_processors();
204-
let needs_post_processing = !post_processors.is_empty();
205-
206-
// If we have post-processors, output uncompressed HTML so they can work with it,
207-
// then compress only once at the end. This avoids double decompression/compression.
208-
let output_compression = if needs_post_processing {
209-
Compression::None
210-
} else {
211-
compression
212-
};
213-
214147
let config = PipelineConfig {
215148
input_compression: compression,
216-
output_compression,
149+
output_compression: compression,
217150
chunk_size: 8192,
218151
};
219152

220153
let mut pipeline = StreamingPipeline::new(config, processor);
221154
pipeline.process(body, &mut output)?;
222-
223-
// Post-process HTML through registered integration post-processors.
224-
// This handles cross-script T-chunks for RSC and other integration-specific
225-
// processing that requires the complete HTML document.
226-
log::info!(
227-
"HTML post-processors: count={}, output_len={}, needs_post_processing={}",
228-
post_processors.len(),
229-
output.len(),
230-
needs_post_processing
231-
);
232-
if needs_post_processing {
233-
// Output is already uncompressed, convert to string for post-processing
234-
if let Ok(html) = std::str::from_utf8(&output) {
235-
log::info!(
236-
"NextJs post-processor called with {} bytes of HTML",
237-
html.len()
238-
);
239-
let ctx = IntegrationHtmlContext {
240-
request_host: params.request_host,
241-
request_scheme: params.request_scheme,
242-
origin_host: params.origin_host,
243-
};
244-
let mut processed = html.to_string();
245-
for processor in post_processors {
246-
processed = processor.post_process(&processed, &ctx);
247-
}
248-
249-
// Now compress if original content was compressed
250-
if compression != Compression::None {
251-
output = compress_data(processed.as_bytes(), compression)?;
252-
} else {
253-
output = processed.into_bytes();
254-
}
255-
} else {
256-
log::warn!("HTML post-processing skipped: content is not valid UTF-8");
257-
// If not valid UTF-8, recompress the output as-is
258-
if compression != Compression::None {
259-
output = compress_data(&output, compression)?;
260-
}
261-
}
262-
}
263155
} else if is_rsc_flight {
264156
// RSC Flight responses are length-prefixed (T rows). A naive string replacement will
265157
// corrupt the stream by changing byte lengths without updating the prefixes.

0 commit comments

Comments
 (0)