diff --git a/Cargo.lock b/Cargo.lock index b83621bf8..76d00c686 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2010,7 +2010,6 @@ dependencies = [ "derive_builder", "derive_more 2.0.1", "docsrs-metadata", - "fastly-api", "flate2", "fn-error-context", "font-awesome-as-a-crate", @@ -2227,19 +2226,6 @@ dependencies = [ "serde", ] -[[package]] -name = "fastly-api" -version = "12.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6aa65a56cdc77f4d8a1c6e8ff46e752823b7bbc7ed2b708e2fb488e892e8f9f" -dependencies = [ - "reqwest", - "serde", - "serde_derive", - "serde_json", - "url", -] - [[package]] name = "fastrand" version = "2.3.0" @@ -6753,7 +6739,6 @@ dependencies = [ "js-sys", "log", "mime", - "mime_guess", "native-tls", "percent-encoding", "pin-project-lite", diff --git a/Cargo.toml b/Cargo.toml index e2db3d3a5..3cabb69ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -110,7 +110,6 @@ chrono = { version = "0.4.11", default-features = false, features = ["clock", "s # Transitive dependencies we don't use directly but need to have specific versions of thread_local = "1.1.3" constant_time_eq = "0.4.2" -fastly-api = "12.0.0" md5 = "0.8.0" [dev-dependencies] diff --git a/src/cdn/fastly.rs b/src/cdn/fastly.rs index dfb7f6667..0e0f10e8b 100644 --- a/src/cdn/fastly.rs +++ b/src/cdn/fastly.rs @@ -1,18 +1,60 @@ use crate::{ + APP_USER_AGENT, cdn::CdnMetrics, config::Config, - web::headers::{SurrogateKey, SurrogateKeys}, + web::headers::{SURROGATE_KEY, SurrogateKey, SurrogateKeys}, }; -use anyhow::{Result, bail}; -use chrono::{DateTime, Utc}; -use fastly_api::apis::{ - configuration::{ApiKey, Configuration}, - purge_api::{BulkPurgeTagParams, bulk_purge_tag}, +use anyhow::{Result, anyhow, bail}; +use chrono::{DateTime, TimeZone as _, Utc}; +use http::{ + HeaderMap, HeaderName, HeaderValue, + header::{ACCEPT, USER_AGENT}, }; use itertools::Itertools as _; use opentelemetry::KeyValue; +use std::sync::OnceLock; use tracing::error; +const FASTLY_KEY: HeaderName = HeaderName::from_static("fastly-key"); + +// https://www.fastly.com/documentation/reference/api/#rate-limiting +const FASTLY_RATELIMIT_REMAINING: HeaderName = + HeaderName::from_static("fastly-ratelimit-remaining"); +const FASTLY_RATELIMIT_RESET: HeaderName = HeaderName::from_static("fastyly-ratelimit-reset"); + +static CLIENT: OnceLock> = OnceLock::new(); + +fn fastly_client(api_token: impl AsRef) -> anyhow::Result<&'static reqwest::Client> { + CLIENT + .get_or_init(|| -> Result<_> { + let mut headers = HeaderMap::new(); + headers.insert(USER_AGENT, HeaderValue::from_static(APP_USER_AGENT)); + headers.insert(ACCEPT, HeaderValue::from_static("application/json")); + headers.insert(FASTLY_KEY, HeaderValue::from_str(api_token.as_ref())?); + + Ok(reqwest::Client::builder() + .default_headers(headers) + .build()?) + }) + .as_ref() + .map_err(|err| anyhow!("reqwest Client init failed: {}", err)) +} + +fn fetch_rate_limit_state(headers: &HeaderMap) -> (Option, Option>) { + // https://www.fastly.com/documentation/reference/api/#rate-limiting + ( + headers + .get(FASTLY_RATELIMIT_REMAINING) + .and_then(|hv| hv.to_str().ok()) + .and_then(|s| s.parse().ok()), + headers + .get(FASTLY_RATELIMIT_RESET) + .and_then(|hv| hv.to_str().ok()) + .and_then(|s| s.parse::().ok()) + .and_then(|ts| Utc.timestamp_opt(ts, 0).single()), + ) +} + /// Purge the given surrogate keys from all configured fastly services. /// /// Accepts any number of surrogate keys, and splits them into appropriately sized @@ -29,26 +71,40 @@ where bail!("Fastly API token not configured"); }; - let mut cfg = Configuration { - api_key: Some(ApiKey { - prefix: None, - key: api_token.to_owned(), - }), - ..Default::default() - }; + let client = fastly_client(api_token)?; + + let record_rate_limit_metrics = + |limit_remaining: Option, limit_reset: Option>| { + if let Some(limit_remaining) = limit_remaining { + metrics + .fastly_rate_limit_remaining + .record(limit_remaining, &[]); + } + + if let Some(limit_reset) = limit_reset { + metrics + .fastly_time_until_rate_limit_reset + .record((limit_reset - Utc::now()).num_seconds() as u64, &[]); + } + }; // the `bulk_purge_tag` supports up to 256 surrogate keys in its list, // but I believe we also have to respect the length limits for the full - // surrogate key header. + // surrogate key header we send in this purge request. + // see https://www.fastly.com/documentation/reference/api/purging/ for encoded_surrogate_keys in keys.into_iter().batching(|it| { const MAX_SURROGATE_KEYS_IN_BATCH_PURGE: usize = 256; // SurrogateKeys::from_iter::until_full only consumes as many elements as will fit into // the header. // The rest is up to the next `batching` iteration. - Some(SurrogateKeys::from_iter_until_full( - it.take(MAX_SURROGATE_KEYS_IN_BATCH_PURGE), - )) + let keys = SurrogateKeys::from_iter_until_full(it.take(MAX_SURROGATE_KEYS_IN_BATCH_PURGE)); + + if keys.key_count() > 0 { + Some(keys) + } else { + None + } }) { for sid in config .fastly_service_sid_web @@ -59,51 +115,258 @@ where // We can then see if we need retries or escalation to full purges. let kv = [KeyValue::new("service_sid", sid.clone())]; - match bulk_purge_tag( - &mut cfg, - BulkPurgeTagParams { - service_id: sid.to_owned(), - // TODO: investigate how they could help & test - // soft purge. But later, after the initial migration. - fastly_soft_purge: None, - surrogate_key: Some(encoded_surrogate_keys.to_string()), - ..Default::default() - }, - ) - .await + + // https://www.fastly.com/documentation/reference/api/purging/ + // TODO: investigate how they could help & test + // soft purge. But later, after the initial migration. + match client + .post( + config + .fastly_api_host + .join(&format!("/service/{}/purge", sid))?, + ) + .header(&SURROGATE_KEY, encoded_surrogate_keys.to_string()) + .send() + .await { - Ok(_) => { + Ok(response) if response.status().is_success() => { metrics.fastly_batch_purges_with_surrogate.add(1, &kv); metrics .fastly_purge_surrogate_keys .add(encoded_surrogate_keys.key_count() as u64, &kv); + + let (limit_remaining, limit_reset) = fetch_rate_limit_state(response.headers()); + record_rate_limit_metrics(limit_remaining, limit_reset); + } + Ok(error_response) => { + metrics.fastly_batch_purge_errors.add(1, &kv); + + let (limit_remaining, limit_reset) = + fetch_rate_limit_state(error_response.headers()); + record_rate_limit_metrics(limit_remaining, limit_reset); + + let limit_reset = limit_reset.map(|dt| dt.to_rfc3339()); + + let status = error_response.status(); + let content = error_response.text().await.unwrap_or_default(); + error!( + sid, + %status, + content, + %encoded_surrogate_keys, + rate_limit_remaining=limit_remaining, + rate_limit_reset=limit_reset, + "Failed to purge Fastly surrogate keys for service" + ); } Err(err) => { + // connection errors or similar, where we don't have a response metrics.fastly_batch_purge_errors.add(1, &kv); - let rate_limit_reset = - DateTime::::from_timestamp(cfg.rate_limit_reset as i64, 0) - .map(|dt| dt.to_rfc3339()); error!( sid, ?err, %encoded_surrogate_keys, - rate_limit_remaining=cfg.rate_limit_remaining, - rate_limit_reset, "Failed to purge Fastly surrogate keys for service" ); } - } + }; } } - metrics - .fastly_rate_limit_remaining - .record(cfg.rate_limit_remaining, &[]); - metrics.fastly_time_until_rate_limit_reset.record( - cfg.rate_limit_reset - .saturating_sub(Utc::now().timestamp() as u64), - &[], - ); - Ok(()) } + +#[cfg(test)] +mod tests { + use super::*; + use crate::test::{TestEnvironment, setup_test_meter_provider}; + use chrono::TimeZone; + use std::str::FromStr as _; + + #[test] + fn test_read_rate_limit() { + // https://www.fastly.com/documentation/reference/api/#rate-limiting + let mut hm = HeaderMap::new(); + hm.insert(FASTLY_RATELIMIT_REMAINING, HeaderValue::from_static("999")); + hm.insert( + FASTLY_RATELIMIT_RESET, + HeaderValue::from_static("1452032384"), + ); + + let (remaining, reset) = fetch_rate_limit_state(&hm); + assert_eq!(remaining, Some(999)); + assert_eq!( + reset, + Some(Utc.timestamp_opt(1452032384, 0).single().unwrap()) + ); + } + + #[tokio::test] + async fn test_purge() -> Result<()> { + let mut fastly_api = mockito::Server::new_async().await; + + let config = TestEnvironment::base_config() + .fastly_api_host(fastly_api.url().parse().unwrap()) + .fastly_api_token(Some("test-token".into())) + .fastly_service_sid_web(Some("test-sid-1".into())) + .build()?; + + let m = fastly_api + .mock("POST", "/service/test-sid-1/purge") + .match_header(FASTLY_KEY, "test-token") + .match_header(&SURROGATE_KEY, "crate-foo crate-bar") + .with_status(200) + .create_async() + .await; + + let (_exporter, meter_provider) = setup_test_meter_provider(); + let metrics = CdnMetrics::new(&meter_provider); + + purge_surrogate_keys( + &config, + &metrics, + vec![ + SurrogateKey::from_str("crate-foo").unwrap(), + SurrogateKey::from_str("crate-bar").unwrap(), + ], + ) + .await?; + + m.assert_async().await; + + Ok(()) + } + + #[tokio::test] + async fn test_purge_both() -> Result<()> { + let mut fastly_api = mockito::Server::new_async().await; + + let config = TestEnvironment::base_config() + .fastly_api_host(fastly_api.url().parse().unwrap()) + .fastly_api_token(Some("test-token".into())) + .fastly_service_sid_web(Some("test-sid-1".into())) + .fastly_service_sid_static(Some("test-sid-2".into())) + .build()?; + + let m1 = fastly_api + .mock("POST", "/service/test-sid-1/purge") + .match_header(FASTLY_KEY, "test-token") + .match_header(&SURROGATE_KEY, "crate-foo crate-bar") + .with_status(200) + .create_async() + .await; + + let m2 = fastly_api + .mock("POST", "/service/test-sid-2/purge") + .match_header(FASTLY_KEY, "test-token") + .match_header(&SURROGATE_KEY, "crate-foo crate-bar") + .with_status(200) + .create_async() + .await; + + let (_exporter, meter_provider) = setup_test_meter_provider(); + let metrics = CdnMetrics::new(&meter_provider); + + purge_surrogate_keys( + &config, + &metrics, + vec![ + SurrogateKey::from_str("crate-foo").unwrap(), + SurrogateKey::from_str("crate-bar").unwrap(), + ], + ) + .await?; + + m1.assert_async().await; + m2.assert_async().await; + + Ok(()) + } + + #[tokio::test] + async fn test_purge_err_doesnt_err() -> Result<()> { + let mut fastly_api = mockito::Server::new_async().await; + + let config = TestEnvironment::base_config() + .fastly_api_host(fastly_api.url().parse().unwrap()) + .fastly_api_token(Some("test-token".into())) + .fastly_service_sid_web(Some("test-sid-1".into())) + .build()?; + + let m = fastly_api + .mock("POST", "/service/test-sid-1/purge") + .match_header(FASTLY_KEY, "test-token") + .match_header(&SURROGATE_KEY, "crate-foo crate-bar") + .with_status(500) + .create_async() + .await; + + let (_exporter, meter_provider) = setup_test_meter_provider(); + let metrics = CdnMetrics::new(&meter_provider); + + assert!( + purge_surrogate_keys( + &config, + &metrics, + vec![ + SurrogateKey::from_str("crate-foo").unwrap(), + SurrogateKey::from_str("crate-bar").unwrap(), + ], + ) + .await + .is_ok() + ); + + m.assert_async().await; + + Ok(()) + } + + #[tokio::test] + async fn test_purge_split_requests() -> Result<()> { + let mut fastly_api = mockito::Server::new_async().await; + + let config = TestEnvironment::base_config() + .fastly_api_host(fastly_api.url().parse().unwrap()) + .fastly_api_token(Some("test-token".into())) + .fastly_service_sid_web(Some("test-sid-1".into())) + .build()?; + + let m = fastly_api + .mock("POST", "/service/test-sid-1/purge") + .match_header(FASTLY_KEY, "test-token") + .match_request(|request| { + let [surrogate_keys] = request.header(&SURROGATE_KEY)[..] else { + panic!("expected one SURROGATE_KEY header"); + }; + let surrogate_keys: SurrogateKeys = + surrogate_keys.to_str().unwrap().parse().unwrap(); + + assert!( + // first request + surrogate_keys.key_count() == 256 || + // second request + surrogate_keys.key_count() == 94 + ); + + true + }) + .expect(2) // 300 keys below + .with_status(200) + .create_async() + .await; + + let (_exporter, meter_provider) = setup_test_meter_provider(); + let metrics = CdnMetrics::new(&meter_provider); + + let keys: Vec<_> = (0..350) + .map(|n| SurrogateKey::from_str(&format!("crate-foo-{n}")).unwrap()) + .collect(); + + purge_surrogate_keys(&config, &metrics, keys).await?; + + m.assert_async().await; + + Ok(()) + } +} diff --git a/src/config.rs b/src/config.rs index 4cf784f7d..160534a52 100644 --- a/src/config.rs +++ b/src/config.rs @@ -125,6 +125,9 @@ pub struct Config { /// same for the `static.docs.rs` distribution pub cloudfront_distribution_id_static: Option, + /// Fastly API host, typically only overwritten for testing + pub fastly_api_host: Url, + /// Fastly API token for purging the services below. pub fastly_api_token: Option, @@ -233,6 +236,10 @@ impl Config { .cdn_max_queued_age(Duration::from_secs(env("DOCSRS_CDN_MAX_QUEUED_AGE", 3600)?)) .cloudfront_distribution_id_web(maybe_env("CLOUDFRONT_DISTRIBUTION_ID_WEB")?) .cloudfront_distribution_id_static(maybe_env("CLOUDFRONT_DISTRIBUTION_ID_STATIC")?) + .fastly_api_host(env( + "DOCSRS_FASTLY_API_HOST", + "https://api.fastly.com".parse().unwrap(), + )?) .fastly_api_token(maybe_env("DOCSRS_FASTLY_API_TOKEN")?) .fastly_service_sid_web(maybe_env("DOCSRS_FASTLY_SERVICE_SID_WEB")?) .fastly_service_sid_static(maybe_env("DOCSRS_FASTLY_SERVICE_SID_STATIC")?) diff --git a/src/test/mod.rs b/src/test/mod.rs index 44419a4bc..730dd5392 100644 --- a/src/test/mod.rs +++ b/src/test/mod.rs @@ -2,7 +2,10 @@ mod fakes; pub(crate) mod headers; mod test_metrics; -pub(crate) use self::fakes::{FakeBuild, fake_release_that_failed_before_build}; +pub(crate) use self::{ + fakes::{FakeBuild, fake_release_that_failed_before_build}, + test_metrics::setup_test_meter_provider, +}; use crate::{ AsyncBuildQueue, BuildQueue, Config, Context, InstanceMetrics, cdn::{CdnMetrics, cloudfront::CdnBackend}, @@ -26,7 +29,7 @@ use fn_error_context::context; use futures_util::stream::TryStreamExt; use http::header::CACHE_CONTROL; use http_body_util::BodyExt; -use opentelemetry_sdk::metrics::{InMemoryMetricExporter, PeriodicReader}; +use opentelemetry_sdk::metrics::InMemoryMetricExporter; use serde::de::DeserializeOwned; use sqlx::Connection as _; use std::{fs, future::Future, panic, rc::Rc, str::FromStr, sync::Arc}; @@ -377,12 +380,7 @@ impl TestEnvironment { // create index directory fs::create_dir_all(config.registry_index_path.clone())?; - let metric_exporter = InMemoryMetricExporter::default(); - let meter_provider: AnyMeterProvider = Arc::new( - opentelemetry_sdk::metrics::SdkMeterProvider::builder() - .with_reader(PeriodicReader::builder(metric_exporter.clone()).build()) - .build(), - ); + let (metric_exporter, meter_provider) = setup_test_meter_provider(); let instance_metrics = Arc::new(InstanceMetrics::new()?); let test_db = TestDatabase::new(&config, instance_metrics.clone(), &meter_provider) diff --git a/src/test/test_metrics.rs b/src/test/test_metrics.rs index 02f52cee6..be5663aba 100644 --- a/src/test/test_metrics.rs +++ b/src/test/test_metrics.rs @@ -1,9 +1,31 @@ +use std::sync::Arc; + use anyhow::{Result, anyhow}; use derive_more::Deref; -use opentelemetry_sdk::metrics::data::{ - AggregatedMetrics, HistogramDataPoint, Metric, MetricData, ResourceMetrics, SumDataPoint, +use opentelemetry_sdk::metrics::{ + InMemoryMetricExporter, PeriodicReader, + data::{ + AggregatedMetrics, HistogramDataPoint, Metric, MetricData, ResourceMetrics, SumDataPoint, + }, }; +use crate::metrics::otel::AnyMeterProvider; + +/// set up a standalone InMemoryMetricExporter and MeterProvider for testing purposes. +/// For when you want to collect metrics, and then inspect what was collected. +pub(crate) fn setup_test_meter_provider() -> (InMemoryMetricExporter, AnyMeterProvider) { + let metric_exporter = InMemoryMetricExporter::default(); + + ( + metric_exporter.clone(), + Arc::new( + opentelemetry_sdk::metrics::SdkMeterProvider::builder() + .with_reader(PeriodicReader::builder(metric_exporter.clone()).build()) + .build(), + ), + ) +} + /// small wrapper around the collected result of the InMemoryMetricExporter. /// For convenience in tests. #[derive(Debug)] diff --git a/src/web/headers/surrogate_key.rs b/src/web/headers/surrogate_key.rs index cf1ff64f9..e1e70871d 100644 --- a/src/web/headers/surrogate_key.rs +++ b/src/web/headers/surrogate_key.rs @@ -163,6 +163,19 @@ impl SurrogateKeys { } } +#[cfg(test)] +impl FromStr for SurrogateKeys { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + let keys = s + .split(' ') + .map(SurrogateKey::from_str) + .collect::, _>>()?; + Ok(SurrogateKeys(keys)) + } +} + #[cfg(test)] mod tests { use super::*;