From 0413c7533961727e1e8836e98916e2f5f64c6016 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Mon, 10 Feb 2025 15:30:18 -0500 Subject: [PATCH 1/2] Thread through timeouts for Azure --- Cargo.lock | 2 ++ src/persist/Cargo.toml | 1 + src/persist/src/azure.rs | 54 ++++++++++++++++++++++++++++++++--- src/persist/src/cfg.rs | 1 + src/workspace-hack/Cargo.toml | 4 +-- 5 files changed, 56 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8e57532a56f10..46f0944b8a05e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6195,6 +6195,7 @@ dependencies = [ "prost", "prost-build", "rand 0.8.5", + "reqwest 0.12.4", "serde", "serde_json", "sha2", @@ -9149,6 +9150,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "sync_wrapper 0.1.2", + "system-configuration", "tokio", "tokio-native-tls", "tokio-util", diff --git a/src/persist/Cargo.toml b/src/persist/Cargo.toml index dc745e01acd50..f39b11179288c 100644 --- a/src/persist/Cargo.toml +++ b/src/persist/Cargo.toml @@ -57,6 +57,7 @@ proptest = { version = "1.6.0", default-features = false, features = ["std"] } proptest-derive = { version = "0.5.1", features = ["boxed_union"] } prost = { version = "0.13.4", features = ["no-recursion-limit"] } rand = { version = "0.8.5", features = ["small_rng"] } +reqwest = "0.12.4" serde = { version = "1.0.152", features = ["derive"] } timely = "0.17.1" tokio = { version = "1.38.0", default-features = false, features = ["fs", "macros", "sync", "rt", "rt-multi-thread"] } diff --git a/src/persist/src/azure.rs b/src/persist/src/azure.rs index 1bf2c931fb683..f6d0f9f844a34 100644 --- a/src/persist/src/azure.rs +++ b/src/persist/src/azure.rs @@ -9,12 +9,9 @@ //! An Azure Blob Storage implementation of [Blob] storage. -use std::fmt::Debug; -use std::sync::Arc; - use anyhow::{anyhow, Context}; use async_trait::async_trait; -use azure_core::StatusCode; +use azure_core::{ExponentialRetryOptions, RetryOptions, StatusCode, TransportOptions}; use azure_identity::create_default_credential; use azure_storage::{prelude::*, CloudLocation, EMULATOR_ACCOUNT}; use azure_storage_blobs::blob::operations::GetBlobResponse; @@ -22,6 +19,9 @@ use azure_storage_blobs::prelude::*; use bytes::Bytes; use futures_util::stream::FuturesOrdered; use futures_util::StreamExt; +use std::fmt::{Debug, Formatter}; +use std::sync::Arc; +use std::time::Duration; use tracing::{info, warn}; use url::Url; use uuid::Uuid; @@ -32,6 +32,7 @@ use mz_ore::cast::CastFrom; use mz_ore::lgbytes::{LgBytes, MetricsRegion}; use mz_ore::metrics::MetricsRegistry; +use crate::cfg::BlobKnobs; use crate::error::Error; use crate::location::{Blob, BlobMetadata, Determinate, ExternalError}; use crate::metrics::S3BlobMetrics; @@ -64,6 +65,7 @@ impl AzureBlobConfig { prefix: String, metrics: S3BlobMetrics, url: Url, + knobs: Box, cfg: Arc, ) -> Result { let client = if account == EMULATOR_ACCOUNT { @@ -75,6 +77,21 @@ impl AzureBlobConfig { }, StorageCredentials::emulator(), ) + .transport({ + // Azure uses reqwest / hyper internally, but we specify a client explicitly to + // plumb through our timeouts. + TransportOptions::new(Arc::new( + reqwest::ClientBuilder::new() + .timeout(knobs.operation_attempt_timeout()) + .read_timeout(knobs.read_timeout()) + .connect_timeout(knobs.connect_timeout()) + .build() + .expect("valid config for azure HTTP client"), + )) + }) + .retry(RetryOptions::exponential( + ExponentialRetryOptions::default().max_total_elapsed(knobs.operation_timeout()), + )) .blob_service_client() .container_client(container) } else { @@ -120,6 +137,34 @@ impl AzureBlobConfig { /// Returns a new [AzureBlobConfig] for use in unit tests. pub fn new_for_test() -> Result, Error> { + struct TestBlobKnobs; + impl Debug for TestBlobKnobs { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TestBlobKnobs").finish_non_exhaustive() + } + } + impl BlobKnobs for TestBlobKnobs { + fn operation_timeout(&self) -> Duration { + Duration::from_secs(30) + } + + fn operation_attempt_timeout(&self) -> Duration { + Duration::from_secs(10) + } + + fn connect_timeout(&self) -> Duration { + Duration::from_secs(5) + } + + fn read_timeout(&self) -> Duration { + Duration::from_secs(5) + } + + fn is_cc_active(&self) -> bool { + false + } + } + let container_name = match std::env::var(Self::EXTERNAL_TESTS_AZURE_CONTAINER) { Ok(container) => container, Err(_) => { @@ -139,6 +184,7 @@ impl AzureBlobConfig { prefix, metrics, Url::parse(&format!("http://localhost:40111/{}", container_name)).expect("valid url"), + Box::new(TestBlobKnobs), Arc::new(ConfigSet::default()), )?; diff --git a/src/persist/src/cfg.rs b/src/persist/src/cfg.rs index 94adab685dfae..3851659bd1e5b 100644 --- a/src/persist/src/cfg.rs +++ b/src/persist/src/cfg.rs @@ -177,6 +177,7 @@ impl BlobConfig { "".to_string(), metrics, url.clone().into_redacted(), + knobs, cfg, )?)) } else { diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index df5cfd3c02bcf..cff33837c8ac5 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -109,7 +109,7 @@ rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka.git", feat regex = { version = "1.10.5" } regex-automata = { version = "0.4.7", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa", "perf", "unicode"] } regex-syntax = { version = "0.8.3" } -reqwest-5ef9efb8ec2df382 = { package = "reqwest", version = "0.12.4", default-features = false, features = ["blocking", "charset", "default-tls", "http2", "json", "stream"] } +reqwest-5ef9efb8ec2df382 = { package = "reqwest", version = "0.12.4", features = ["blocking", "json", "stream"] } reqwest-a6292c17cd707f01 = { package = "reqwest", version = "0.11.24", features = ["blocking", "json", "multipart", "native-tls-vendored", "stream"] } schemars = { version = "0.8.11", features = ["uuid1"] } scopeguard = { version = "1.1.0" } @@ -244,7 +244,7 @@ rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka.git", feat regex = { version = "1.10.5" } regex-automata = { version = "0.4.7", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa", "perf", "unicode"] } regex-syntax = { version = "0.8.3" } -reqwest-5ef9efb8ec2df382 = { package = "reqwest", version = "0.12.4", default-features = false, features = ["blocking", "charset", "default-tls", "http2", "json", "stream"] } +reqwest-5ef9efb8ec2df382 = { package = "reqwest", version = "0.12.4", features = ["blocking", "json", "stream"] } reqwest-a6292c17cd707f01 = { package = "reqwest", version = "0.11.24", features = ["blocking", "json", "multipart", "native-tls-vendored", "stream"] } schemars = { version = "0.8.11", features = ["uuid1"] } scopeguard = { version = "1.1.0" } From da9f1560781f534711ce4289f61d5b06f96a2c6c Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Mon, 10 Feb 2025 16:43:46 -0500 Subject: [PATCH 2/2] PersistConfig is integrated with LD! --- src/persist-client/src/cfg.rs | 37 ++++++++++++++++++++++++++++++----- 1 file changed, 32 insertions(+), 5 deletions(-) diff --git a/src/persist-client/src/cfg.rs b/src/persist-client/src/cfg.rs index 9f0c285141d28..62f8c04bfec34 100644 --- a/src/persist-client/src/cfg.rs +++ b/src/persist-client/src/cfg.rs @@ -326,6 +326,10 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { .add(&crate::batch::STRUCTURED_KEY_LOWER_LEN) .add(&crate::batch::MAX_RUN_LEN) .add(&crate::batch::MAX_RUNS) + .add(&BLOB_OPERATION_TIMEOUT) + .add(&BLOB_OPERATION_ATTEMPT_TIMEOUT) + .add(&BLOB_CONNECT_TIMEOUT) + .add(&BLOB_READ_TIMEOUT) .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_TTL_STAGGER) .add(&crate::cfg::CONSENSUS_CONNECTION_POOL_TTL) .add(&crate::cfg::CRDB_CONNECT_TIMEOUT) @@ -591,22 +595,45 @@ impl RetryParameters { } } -// TODO: Replace with dynamic values when PersistConfig is integrated with LD +pub(crate) const BLOB_OPERATION_TIMEOUT: Config = Config::new( + "persist_blob_operation_timeout", + Duration::from_secs(180), + "Maximum time allowed for a network call, including retry attempts.", +); + +pub(crate) const BLOB_OPERATION_ATTEMPT_TIMEOUT: Config = Config::new( + "persist_blob_operation_attempt_timeout", + Duration::from_secs(90), + "Maximum time allowed for a single network call.", +); + +pub(crate) const BLOB_CONNECT_TIMEOUT: Config = Config::new( + "persist_blob_connect_timeout", + Duration::from_secs(7), + "Maximum time to wait for a socket connection to be made.", +); + +pub(crate) const BLOB_READ_TIMEOUT: Config = Config::new( + "persist_blob_read_timeout", + Duration::from_secs(10), + "Maximum time to wait to read the first byte of a response, including connection time.", +); + impl BlobKnobs for PersistConfig { fn operation_timeout(&self) -> Duration { - Duration::from_secs(180) + BLOB_OPERATION_TIMEOUT.get(self) } fn operation_attempt_timeout(&self) -> Duration { - Duration::from_secs(90) + BLOB_OPERATION_ATTEMPT_TIMEOUT.get(self) } fn connect_timeout(&self) -> Duration { - Duration::from_secs(7) + BLOB_CONNECT_TIMEOUT.get(self) } fn read_timeout(&self) -> Duration { - Duration::from_secs(10) + BLOB_READ_TIMEOUT.get(self) } fn is_cc_active(&self) -> bool {