diff --git a/crates/common/src/integrations/testlight.rs b/crates/common/src/integrations/testlight.rs index 419bd8f..7fddd0a 100644 --- a/crates/common/src/integrations/testlight.rs +++ b/crates/common/src/integrations/testlight.rs @@ -2,19 +2,19 @@ use std::sync::Arc; use async_trait::async_trait; use error_stack::{Report, ResultExt}; -use fastly::http::{header, Method}; +use fastly::http::{header, HeaderValue}; use fastly::{Request, Response}; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; use validator::Validate; -use crate::backend::ensure_backend_from_url; use crate::constants::{HEADER_SYNTHETIC_FRESH, HEADER_SYNTHETIC_TRUSTED_SERVER}; use crate::error::TrustedServerError; use crate::integrations::{ AttributeRewriteAction, IntegrationAttributeContext, IntegrationAttributeRewriter, IntegrationEndpoint, IntegrationProxy, IntegrationRegistration, }; +use crate::proxy::{proxy_request, ProxyRequestConfig}; use crate::settings::{IntegrationConfig as IntegrationConfigTrait, Settings}; use crate::synthetic::{generate_synthetic_id, get_or_generate_synthetic_id}; use crate::tsjs; @@ -143,20 +143,20 @@ impl IntegrationProxy for TestlightIntegration { payload.user.id = Some(synthetic_id.clone()); - let mut upstream = Request::new(Method::POST, self.config.endpoint.clone()); - upstream.set_header(header::CONTENT_TYPE, "application/json"); - upstream - .set_body_json(&payload) + let payload_bytes = serde_json::to_vec(&payload) .change_context(Self::error("Failed to serialize request body"))?; - if let Some(user_agent) = req.get_header(header::USER_AGENT) { - upstream.set_header(header::USER_AGENT, user_agent); - } + let mut proxy_config = ProxyRequestConfig::new(&self.config.endpoint); + proxy_config.forward_synthetic_id = false; + proxy_config.body = Some(payload_bytes); + proxy_config.stream_passthrough = true; + proxy_config.headers.push(( + header::CONTENT_TYPE, + HeaderValue::from_static("application/json"), + )); - let backend = ensure_backend_from_url(&self.config.endpoint) - .change_context(Self::error("Failed to determine backend"))?; - let mut response = upstream - .send(backend) + let mut response = proxy_request(settings, req, proxy_config) + .await .change_context(Self::error("Failed to contact upstream integration"))?; // Attempt to parse response into structured form for logging/future transforms. diff --git a/crates/common/src/proxy.rs b/crates/common/src/proxy.rs index ac26694..64b72bc 100644 --- a/crates/common/src/proxy.rs +++ b/crates/common/src/proxy.rs @@ -24,6 +24,59 @@ struct ProxySignResp { base: String, } +/// Configuration for outbound proxying from integration routes. +#[derive(Debug, Clone)] +pub struct ProxyRequestConfig<'a> { + /// Target URL to proxy to (must be http/https). + pub target_url: &'a str, + /// Whether redirects should be followed automatically. + pub follow_redirects: bool, + /// Whether to append the caller's synthetic ID as a query param. + pub forward_synthetic_id: bool, + /// Optional body to send to the origin. + pub body: Option>, + /// Additional headers to forward to the origin. + pub headers: Vec<(header::HeaderName, HeaderValue)>, + /// When true, stream the origin response without HTML/CSS rewrites. + pub stream_passthrough: bool, +} + +impl<'a> ProxyRequestConfig<'a> { + /// Build a proxy configuration that follows redirects and forwards the synthetic ID. + #[must_use] + pub fn new(target_url: &'a str) -> Self { + Self { + target_url, + follow_redirects: true, + forward_synthetic_id: true, + body: None, + headers: Vec::new(), + stream_passthrough: false, + } + } + + /// Attach a request body to the proxied request. + #[must_use] + pub fn with_body(mut self, body: Vec) -> Self { + self.body = Some(body); + self + } + + /// Append an additional header to the proxied request. + #[must_use] + pub fn with_header(mut self, name: header::HeaderName, value: HeaderValue) -> Self { + self.headers.push((name, value)); + self + } + + /// Enable streaming passthrough (no HTML/CSS rewrites). + #[must_use] + pub fn with_streaming(mut self) -> Self { + self.stream_passthrough = true; + self + } +} + /// Copy a curated set of request headers to a proxied request. fn copy_proxy_forward_headers(src: &Request, dst: &mut Request) { for header_name in [ @@ -160,34 +213,131 @@ fn finalize_proxied_response( beresp } -/// Unified proxy endpoint for resources referenced by ad creatives. +fn finalize_proxied_response_streaming( + req: &Request, + target_url: &str, + mut beresp: Response, +) -> Response { + let status_code = beresp.get_status().as_u16(); + let ct_raw = beresp + .get_header(header::CONTENT_TYPE) + .and_then(|h| h.to_str().ok()) + .unwrap_or("") + .to_string(); + let cl_raw = beresp + .get_header(header::CONTENT_LENGTH) + .and_then(|h| h.to_str().ok()) + .unwrap_or("-"); + let accept_raw = req + .get_header(HEADER_ACCEPT) + .and_then(|h| h.to_str().ok()) + .unwrap_or("-"); + + let ct_for_log: &str = if ct_raw.is_empty() { "-" } else { &ct_raw }; + log::info!( + "proxy(stream): origin response status={} ct={} cl={} accept={} url={}", + status_code, + ct_for_log, + cl_raw, + accept_raw, + target_url + ); + + let ct = ct_raw.to_ascii_lowercase(); + + let req_accept_images = req + .get_header(HEADER_ACCEPT) + .and_then(|h| h.to_str().ok()) + .map(|s| s.to_ascii_lowercase().contains("image/")) + .unwrap_or(false); + + if ct.starts_with("image/") || req_accept_images { + if beresp.get_header(header::CONTENT_TYPE).is_none() { + beresp.set_header(header::CONTENT_TYPE, "image/*"); + } + + let mut is_pixel = false; + if let Some(cl) = beresp + .get_header(header::CONTENT_LENGTH) + .and_then(|h| h.to_str().ok()) + .and_then(|s| s.parse::().ok()) + { + if cl <= 256 { + is_pixel = true; + } + } + + if !is_pixel { + let lower = target_url.to_ascii_lowercase(); + if lower.contains("/pixel") + || lower.ends_with("/p.gif") + || lower.contains("1x1") + || lower.contains("/track") + { + is_pixel = true; + } + } + + if is_pixel { + log::info!( + "proxy(stream): likely pixel image fetched: {} ct={}", + target_url, + ct + ); + } + } + + beresp +} + +/// Proxy a request to a clear target URL while reusing creative rewrite logic. /// -/// Accepts: -/// - `u`: Base64 URL-safe (no padding) encoded URL of the third-party resource. +/// This forwards a curated header set, follows redirects when enabled, and can append +/// the caller's synthetic ID as a `synthetic_id` query parameter to the target URL. +/// Optional bodies/headers can be supplied via [`ProxyRequestConfig`]. /// -/// Behavior: -/// - Proxies the decoded URL via a dynamic backend derived from scheme/host/port. -/// - If the response `Content-Type` contains `text/html`, rewrites the HTML creative -/// (img/srcset/iframe to first-party) before returning `text/html; charset=utf-8`. -/// - If the response is an image or the request `Accept` indicates images, ensures a -/// generic `image/*` content type if origin omitted it, and logs likely 1×1 pixels -/// using simple size/URL heuristics. No special response (still proxied). -pub async fn handle_first_party_proxy( +/// # Errors +/// +/// - [`TrustedServerError::Proxy`] if the target URL is invalid, uses an unsupported +/// scheme, lacks a host, or the upstream fetch fails +pub async fn proxy_request( settings: &Settings, req: Request, + config: ProxyRequestConfig<'_>, ) -> Result> { - // Parse, reconstruct, and validate the signed target URL - let SignedTarget { target_url, .. } = - reconstruct_and_validate_signed_target(settings, req.get_url_str())?; - - // Validate URL - let Ok(mut target_url_parsed) = url::Url::parse(&target_url) else { - return Err(Report::new(TrustedServerError::Proxy { + let ProxyRequestConfig { + target_url, + follow_redirects, + forward_synthetic_id, + body, + headers, + stream_passthrough, + } = config; + + let mut target_url_parsed = url::Url::parse(target_url).map_err(|_| { + Report::new(TrustedServerError::Proxy { message: "invalid url".to_string(), - })); - }; + }) + })?; - let synthetic_id_param = match get_synthetic_id(&req) { + if forward_synthetic_id { + append_synthetic_id(&req, &mut target_url_parsed); + } + + proxy_with_redirects( + settings, + &req, + target_url_parsed, + follow_redirects, + body.as_deref(), + &headers, + stream_passthrough, + ) + .await +} + +fn append_synthetic_id(req: &Request, target_url_parsed: &mut url::Url) { + let synthetic_id_param = match get_synthetic_id(req) { Ok(id) => id, Err(e) => { log::warn!( @@ -224,8 +374,19 @@ pub async fn handle_first_party_proxy( } else { log::debug!("proxy: no synthetic_id to forward to origin"); } +} +async fn proxy_with_redirects( + settings: &Settings, + req: &Request, + target_url_parsed: url::Url, + follow_redirects: bool, + body: Option<&[u8]>, + headers: &[(header::HeaderName, HeaderValue)], + stream_passthrough: bool, +) -> Result> { const MAX_REDIRECTS: usize = 4; + let mut current_url = target_url_parsed.to_string(); let mut current_method: Method = req.get_method().clone(); @@ -253,7 +414,14 @@ pub async fn handle_first_party_proxy( let backend_name = crate::backend::ensure_origin_backend(&scheme, host, parsed_url.port())?; let mut proxy_req = Request::new(current_method.clone(), ¤t_url); - copy_proxy_forward_headers(&req, &mut proxy_req); + copy_proxy_forward_headers(req, &mut proxy_req); + if let Some(body_bytes) = body { + proxy_req.set_body(body_bytes.to_vec()); + } + + for (name, value) in headers { + proxy_req.set_header(name.clone(), value.clone()); + } let beresp = proxy_req .send(&backend_name) @@ -261,6 +429,14 @@ pub async fn handle_first_party_proxy( message: "Failed to proxy".to_string(), })?; + if !follow_redirects { + return Ok(if stream_passthrough { + finalize_proxied_response_streaming(req, ¤t_url, beresp) + } else { + finalize_proxied_response(settings, req, ¤t_url, beresp) + }); + } + let status = beresp.get_status(); let is_redirect = matches!( status, @@ -272,12 +448,11 @@ pub async fn handle_first_party_proxy( ); if !is_redirect { - return Ok(finalize_proxied_response( - settings, - &req, - ¤t_url, - beresp, - )); + return Ok(if stream_passthrough { + finalize_proxied_response_streaming(req, ¤t_url, beresp) + } else { + finalize_proxied_response(settings, req, ¤t_url, beresp) + }); } let Some(location) = beresp @@ -285,12 +460,11 @@ pub async fn handle_first_party_proxy( .and_then(|h| h.to_str().ok()) .filter(|value| !value.is_empty()) else { - return Ok(finalize_proxied_response( - settings, - &req, - ¤t_url, - beresp, - )); + return Ok(if stream_passthrough { + finalize_proxied_response_streaming(req, ¤t_url, beresp) + } else { + finalize_proxied_response(settings, req, ¤t_url, beresp) + }); }; if redirect_attempt == MAX_REDIRECTS { @@ -300,7 +474,7 @@ pub async fn handle_first_party_proxy( ); return Ok(finalize_proxied_response( settings, - &req, + req, ¤t_url, beresp, )); @@ -316,12 +490,11 @@ pub async fn handle_first_party_proxy( let next_scheme = next_url.scheme().to_ascii_lowercase(); if next_scheme != "http" && next_scheme != "https" { - return Ok(finalize_proxied_response( - settings, - &req, - ¤t_url, - beresp, - )); + return Ok(if stream_passthrough { + finalize_proxied_response_streaming(req, ¤t_url, beresp) + } else { + finalize_proxied_response(settings, req, ¤t_url, beresp) + }); } log::info!( @@ -342,6 +515,41 @@ pub async fn handle_first_party_proxy( })) } +/// Unified proxy endpoint for resources referenced by ad creatives. +/// +/// Accepts: +/// - `u`: Base64 URL-safe (no padding) encoded URL of the third-party resource. +/// +/// Behavior: +/// - Proxies the decoded URL via a dynamic backend derived from scheme/host/port. +/// - If the response `Content-Type` contains `text/html`, rewrites the HTML creative +/// (img/srcset/iframe to first-party) before returning `text/html; charset=utf-8`. +/// - If the response is an image or the request `Accept` indicates images, ensures a +/// generic `image/*` content type if origin omitted it, and logs likely 1×1 pixels +/// using simple size/URL heuristics. No special response (still proxied). +pub async fn handle_first_party_proxy( + settings: &Settings, + req: Request, +) -> Result> { + // Parse, reconstruct, and validate the signed target URL + let SignedTarget { target_url, .. } = + reconstruct_and_validate_signed_target(settings, req.get_url_str())?; + + proxy_request( + settings, + req, + ProxyRequestConfig { + target_url: &target_url, + follow_redirects: true, + forward_synthetic_id: true, + body: None, + headers: Vec::new(), + stream_passthrough: false, + }, + ) + .await +} + /// First-party click redirect endpoint. /// /// Accepts the same parameters as the proxy scheme, but instead of proxying the @@ -803,7 +1011,7 @@ mod tests { use super::{ copy_proxy_forward_headers, handle_first_party_click, handle_first_party_proxy, handle_first_party_proxy_rebuild, handle_first_party_proxy_sign, - reconstruct_and_validate_signed_target, + reconstruct_and_validate_signed_target, ProxyRequestConfig, }; use crate::error::{IntoHttpResponse, TrustedServerError}; use crate::test_support::tests::create_test_settings; @@ -815,7 +1023,7 @@ mod tests { creative, }; use error_stack::Report; - use fastly::http::{header, Method, StatusCode}; + use fastly::http::{header, HeaderValue, Method, StatusCode}; use fastly::{Request, Response}; #[tokio::test] @@ -878,6 +1086,30 @@ mod tests { assert_eq!(err.current_context().status_code(), StatusCode::BAD_GATEWAY); } + #[test] + fn proxy_request_config_supports_streaming_and_headers() { + let cfg = ProxyRequestConfig::new("https://example.com/asset") + .with_body(vec![1, 2, 3]) + .with_header( + header::CONTENT_TYPE, + HeaderValue::from_static("application/octet-stream"), + ) + .with_streaming(); + + assert_eq!(cfg.target_url, "https://example.com/asset"); + assert!(cfg.follow_redirects, "should follow redirects by default"); + assert!( + cfg.forward_synthetic_id, + "should forward synthetic id by default" + ); + assert_eq!(cfg.body.as_deref(), Some(&[1, 2, 3][..])); + assert_eq!(cfg.headers.len(), 1, "should include custom header"); + assert!( + cfg.stream_passthrough, + "should enable streaming passthrough" + ); + } + #[tokio::test] async fn reconstruct_rejects_expired_tsexp() { use std::time::{Duration, SystemTime, UNIX_EPOCH}; diff --git a/docs/integration_guide.md b/docs/integration_guide.md index 1291806..514af0e 100644 --- a/docs/integration_guide.md +++ b/docs/integration_guide.md @@ -135,6 +135,47 @@ already injects Trusted Server logging, headers, and error handling; the handler needs to deserialize the request, call the upstream endpoint, and stamp integration-specific headers. +#### Proxying upstream requests + +Use the shared helper in `crates/common/src/proxy.rs` to forward requests so you automatically get +the same header copying, redirect handling, HTML/CSS rewrite behavior, and synthetic ID handling the +first‑party proxy uses: + +```rust +use crate::proxy::{proxy_request, ProxyRequestConfig}; +use fastly::http::{header, HeaderValue}; + +let payload = serde_json::to_vec(&my_body)?; +let response = proxy_request( + settings, + req, + ProxyRequestConfig::new(&self.config.endpoint) + .with_body(payload) + .with_header(header::CONTENT_TYPE, HeaderValue::from_static("application/json")) + .with_streaming(), // stream passthrough; disable if you need HTML rewrites +) +.await?; +``` + +Set `forward_synthetic_id` to `false` if the upstream should not receive the caller’s synthetic ID +(`Testlight` does this), and disable `follow_redirects` if you need to surface redirects directly to +the caller. + +**Streaming passthrough example** + +```rust +let response = proxy_request( + settings, + req, + ProxyRequestConfig::new("https://example.com/pixel") + .with_streaming() // no HTML/CSS rewrites; preserves origin compression +); +``` + +Use streaming when the upstream response is binary or large and you do not need creative rewrites. +Keep the default (non-streaming) mode when you want HTML/CSS content rewritten through the existing +creative pipeline. + ### 5. Implement HTML rewrite hooks (optional) If the integration needs to rewrite script/link tags or inject HTML, implement