Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 32 additions & 5 deletions src/persist-client/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,10 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&crate::batch::STRUCTURED_KEY_LOWER_LEN)
.add(&crate::batch::MAX_RUN_LEN)
.add(&crate::batch::MAX_RUNS)
.add(&BLOB_OPERATION_TIMEOUT)
.add(&BLOB_OPERATION_ATTEMPT_TIMEOUT)
.add(&BLOB_CONNECT_TIMEOUT)
.add(&BLOB_READ_TIMEOUT)
.add(&crate::cfg::CONSENSUS_CONNECTION_POOL_TTL_STAGGER)
.add(&crate::cfg::CONSENSUS_CONNECTION_POOL_TTL)
.add(&crate::cfg::CRDB_CONNECT_TIMEOUT)
Expand Down Expand Up @@ -591,22 +595,45 @@ impl RetryParameters {
}
}

// TODO: Replace with dynamic values when PersistConfig is integrated with LD
pub(crate) const BLOB_OPERATION_TIMEOUT: Config<Duration> = Config::new(
"persist_blob_operation_timeout",
Duration::from_secs(180),
"Maximum time allowed for a network call, including retry attempts.",
);

pub(crate) const BLOB_OPERATION_ATTEMPT_TIMEOUT: Config<Duration> = Config::new(
"persist_blob_operation_attempt_timeout",
Duration::from_secs(90),
"Maximum time allowed for a single network call.",
);

pub(crate) const BLOB_CONNECT_TIMEOUT: Config<Duration> = Config::new(
"persist_blob_connect_timeout",
Duration::from_secs(7),
"Maximum time to wait for a socket connection to be made.",
);

pub(crate) const BLOB_READ_TIMEOUT: Config<Duration> = Config::new(
"persist_blob_read_timeout",
Duration::from_secs(10),
"Maximum time to wait to read the first byte of a response, including connection time.",
);

impl BlobKnobs for PersistConfig {
fn operation_timeout(&self) -> Duration {
Duration::from_secs(180)
BLOB_OPERATION_TIMEOUT.get(self)
}

fn operation_attempt_timeout(&self) -> Duration {
Duration::from_secs(90)
BLOB_OPERATION_ATTEMPT_TIMEOUT.get(self)
}

fn connect_timeout(&self) -> Duration {
Duration::from_secs(7)
BLOB_CONNECT_TIMEOUT.get(self)
}

fn read_timeout(&self) -> Duration {
Duration::from_secs(10)
BLOB_READ_TIMEOUT.get(self)
}

fn is_cc_active(&self) -> bool {
Expand Down
1 change: 1 addition & 0 deletions src/persist/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ proptest = { version = "1.6.0", default-features = false, features = ["std"] }
proptest-derive = { version = "0.5.1", features = ["boxed_union"] }
prost = { version = "0.13.4", features = ["no-recursion-limit"] }
rand = { version = "0.8.5", features = ["small_rng"] }
reqwest = "0.12.4"
serde = { version = "1.0.152", features = ["derive"] }
timely = "0.17.1"
tokio = { version = "1.38.0", default-features = false, features = ["fs", "macros", "sync", "rt", "rt-multi-thread"] }
Expand Down
54 changes: 50 additions & 4 deletions src/persist/src/azure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@

//! An Azure Blob Storage implementation of [Blob] storage.

use std::fmt::Debug;
use std::sync::Arc;

use anyhow::{anyhow, Context};
use async_trait::async_trait;
use azure_core::StatusCode;
use azure_core::{ExponentialRetryOptions, RetryOptions, StatusCode, TransportOptions};
use azure_identity::create_default_credential;
use azure_storage::{prelude::*, CloudLocation, EMULATOR_ACCOUNT};
use azure_storage_blobs::blob::operations::GetBlobResponse;
use azure_storage_blobs::prelude::*;
use bytes::Bytes;
use futures_util::stream::FuturesOrdered;
use futures_util::StreamExt;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
use std::time::Duration;
use tracing::{info, warn};
use url::Url;
use uuid::Uuid;
Expand All @@ -32,6 +32,7 @@ use mz_ore::cast::CastFrom;
use mz_ore::lgbytes::{LgBytes, MetricsRegion};
use mz_ore::metrics::MetricsRegistry;

use crate::cfg::BlobKnobs;
use crate::error::Error;
use crate::location::{Blob, BlobMetadata, Determinate, ExternalError};
use crate::metrics::S3BlobMetrics;
Expand Down Expand Up @@ -64,6 +65,7 @@ impl AzureBlobConfig {
prefix: String,
metrics: S3BlobMetrics,
url: Url,
knobs: Box<dyn BlobKnobs>,
cfg: Arc<ConfigSet>,
) -> Result<Self, Error> {
let client = if account == EMULATOR_ACCOUNT {
Expand All @@ -75,6 +77,21 @@ impl AzureBlobConfig {
},
StorageCredentials::emulator(),
)
.transport({
// Azure uses reqwest / hyper internally, but we specify a client explicitly to
// plumb through our timeouts.
TransportOptions::new(Arc::new(
reqwest::ClientBuilder::new()
.timeout(knobs.operation_attempt_timeout())
.read_timeout(knobs.read_timeout())
.connect_timeout(knobs.connect_timeout())
.build()
.expect("valid config for azure HTTP client"),
))
})
.retry(RetryOptions::exponential(
ExponentialRetryOptions::default().max_total_elapsed(knobs.operation_timeout()),
))
.blob_service_client()
.container_client(container)
} else {
Expand Down Expand Up @@ -120,6 +137,34 @@ impl AzureBlobConfig {

/// Returns a new [AzureBlobConfig] for use in unit tests.
pub fn new_for_test() -> Result<Option<Self>, Error> {
struct TestBlobKnobs;
impl Debug for TestBlobKnobs {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TestBlobKnobs").finish_non_exhaustive()
}
}
impl BlobKnobs for TestBlobKnobs {
fn operation_timeout(&self) -> Duration {
Duration::from_secs(30)
}

fn operation_attempt_timeout(&self) -> Duration {
Duration::from_secs(10)
}

fn connect_timeout(&self) -> Duration {
Duration::from_secs(5)
}

fn read_timeout(&self) -> Duration {
Duration::from_secs(5)
}

fn is_cc_active(&self) -> bool {
false
}
}

let container_name = match std::env::var(Self::EXTERNAL_TESTS_AZURE_CONTAINER) {
Ok(container) => container,
Err(_) => {
Expand All @@ -139,6 +184,7 @@ impl AzureBlobConfig {
prefix,
metrics,
Url::parse(&format!("http://localhost:40111/{}", container_name)).expect("valid url"),
Box::new(TestBlobKnobs),
Arc::new(ConfigSet::default()),
)?;

Expand Down
1 change: 1 addition & 0 deletions src/persist/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ impl BlobConfig {
"".to_string(),
metrics,
url.clone().into_redacted(),
knobs,
cfg,
)?))
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/workspace-hack/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka.git", feat
regex = { version = "1.10.5" }
regex-automata = { version = "0.4.7", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa", "perf", "unicode"] }
regex-syntax = { version = "0.8.3" }
reqwest-5ef9efb8ec2df382 = { package = "reqwest", version = "0.12.4", default-features = false, features = ["blocking", "charset", "default-tls", "http2", "json", "stream"] }
reqwest-5ef9efb8ec2df382 = { package = "reqwest", version = "0.12.4", features = ["blocking", "json", "stream"] }
reqwest-a6292c17cd707f01 = { package = "reqwest", version = "0.11.24", features = ["blocking", "json", "multipart", "native-tls-vendored", "stream"] }
schemars = { version = "0.8.11", features = ["uuid1"] }
scopeguard = { version = "1.1.0" }
Expand Down Expand Up @@ -244,7 +244,7 @@ rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka.git", feat
regex = { version = "1.10.5" }
regex-automata = { version = "0.4.7", default-features = false, features = ["dfa-onepass", "hybrid", "meta", "nfa", "perf", "unicode"] }
regex-syntax = { version = "0.8.3" }
reqwest-5ef9efb8ec2df382 = { package = "reqwest", version = "0.12.4", default-features = false, features = ["blocking", "charset", "default-tls", "http2", "json", "stream"] }
reqwest-5ef9efb8ec2df382 = { package = "reqwest", version = "0.12.4", features = ["blocking", "json", "stream"] }
reqwest-a6292c17cd707f01 = { package = "reqwest", version = "0.11.24", features = ["blocking", "json", "multipart", "native-tls-vendored", "stream"] }
schemars = { version = "0.8.11", features = ["uuid1"] }
scopeguard = { version = "1.1.0" }
Expand Down