diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 9867be7a..a92e756c 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -30,7 +30,6 @@ on: pull_request: jobs: - # Run cargo fmt for all crates lint: name: Lint (cargo fmt) @@ -60,6 +59,8 @@ jobs: cargo update -p tokio --precise 1.29.1 cargo update -p url --precise 2.5.0 cargo update -p once_cell --precise 1.20.3 + cargo update -p tracing-core --precise 0.1.33 + cargo update -p tracing-attributes --precise 0.1.28 - name: Check run: | # run `cargo msrv verify` to see problems diff --git a/Cargo.toml b/Cargo.toml index e0d2aa3f..1264eb79 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -51,7 +51,7 @@ http-body-util = { version = "0.1.2", optional = true } httparse = { version = "1.8.0", default-features = false, features = ["std"], optional = true } hyper = { version = "1.2", default-features = false, optional = true } md-5 = { version = "0.10.6", default-features = false, optional = true } -quick-xml = { version = "0.37.0", features = ["serialize", "overlapped-lists"], optional = true } +quick-xml = { version = "0.38.0", features = ["serialize", "overlapped-lists"], optional = true } rand = { version = "0.9", default-features = false, features = ["std", "std_rng", "thread_rng"], optional = true } reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-native-roots", "http2"], optional = true } ring = { version = "0.17", default-features = false, features = ["std"], optional = true } diff --git a/README.md b/README.md index db9d9a33..b9cc2338 100644 --- a/README.md +++ b/README.md @@ -96,6 +96,6 @@ Planned Release Schedule | Approximate Date | Version | Notes | Ticket | |------------------|----------|--------------------------------|:-------------------------------------------------------------------| -| June 2025 | `0.12.2` | Minor, NO breaking API changes | [#307](https://github.com/apache/arrow-rs-object-store/issues/307) | -| June 2025 | `0.13.0` | Major, breaking API changes | [#367](https://github.com/apache/arrow-rs-object-store/issues/367) | +| July 2025 | `0.12.3` | Minor, NO breaking API changes | [#428](https://github.com/apache/arrow-rs-object-store/issues/428) | +| TBD | `0.13.0` | Major, breaking API changes | [#367](https://github.com/apache/arrow-rs-object-store/issues/367) | | TBD | `0.13.1` | Minor, NO breaking API changes | [#393](https://github.com/apache/arrow-rs-object-store/issues/393) | diff --git a/src/aws/builder.rs b/src/aws/builder.rs index 27742716..e8400ae3 100644 --- a/src/aws/builder.rs +++ b/src/aws/builder.rs @@ -1587,7 +1587,7 @@ mod tests { config_key ); } else { - panic!("{} not propagated as ClientConfigKey", key); + panic!("{key} not propagated as ClientConfigKey"); } } @@ -1606,7 +1606,7 @@ mod tests { let s3 = builder.build().expect("should build successfully"); let creds = &s3.client.config.credentials; - let debug_str = format!("{:?}", creds); + let debug_str = format!("{creds:?}"); assert!( debug_str.contains("EKSPodCredentialProvider"), "expected EKS provider but got: {debug_str}" diff --git a/src/aws/client.rs b/src/aws/client.rs index ce5a091f..22d55016 100644 --- a/src/aws/client.rs +++ b/src/aws/client.rs @@ -36,7 +36,7 @@ use crate::client::{GetOptionsExt, HttpClient, HttpError, HttpResponse}; use crate::list::{PaginatedListOptions, PaginatedListResult}; use crate::multipart::PartId; use crate::{ - Attribute, Attributes, ClientOptions, GetOptions, MultipartId, Path, PutMultipartOpts, + Attribute, Attributes, ClientOptions, GetOptions, MultipartId, Path, PutMultipartOptions, PutPayload, PutResult, Result, RetryConfig, TagSet, }; use async_trait::async_trait; @@ -375,7 +375,7 @@ impl Request<'_> { builder.header(CONTENT_TYPE, v.as_ref()) } Attribute::Metadata(k_suffix) => builder.header( - &format!("{}{}", USER_DEFINED_METADATA_HEADER_PREFIX, k_suffix), + &format!("{USER_DEFINED_METADATA_HEADER_PREFIX}{k_suffix}"), v.as_ref(), ), }; @@ -631,9 +631,9 @@ impl S3Client { pub(crate) async fn create_multipart( &self, location: &Path, - opts: PutMultipartOpts, + opts: PutMultipartOptions, ) -> Result { - let PutMultipartOpts { + let PutMultipartOptions { tags, attributes, extensions, diff --git a/src/aws/credential.rs b/src/aws/credential.rs index 60cc6a4b..7e2681d4 100644 --- a/src/aws/credential.rs +++ b/src/aws/credential.rs @@ -1284,7 +1284,7 @@ mod tests { token: Some("temp_token".to_string()), }; - let debug_output = format!("{:?}", cred); + let debug_output = format!("{cred:?}"); assert!(debug_output.contains("key_id: \"AKIAXXX\"")); assert!(debug_output.contains("secret_key: \"******\"")); diff --git a/src/aws/mod.rs b/src/aws/mod.rs index 6a5b849a..4abf3748 100644 --- a/src/aws/mod.rs +++ b/src/aws/mod.rs @@ -45,7 +45,7 @@ use crate::signer::Signer; use crate::util::STRICT_ENCODE_SET; use crate::{ Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, - ObjectStore, Path, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, + ObjectStore, Path, PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, UploadPart, }; @@ -245,7 +245,7 @@ impl ObjectStore for AmazonS3 { async fn put_multipart_opts( &self, location: &Path, - opts: PutMultipartOpts, + opts: PutMultipartOptions, ) -> Result> { let upload_id = self.client.create_multipart(location, opts).await?; @@ -331,7 +331,7 @@ impl ObjectStore for AmazonS3 { Some(S3CopyIfNotExists::Multipart) => { let upload_id = self .client - .create_multipart(to, PutMultipartOpts::default()) + .create_multipart(to, PutMultipartOptions::default()) .await?; let res = async { @@ -460,7 +460,7 @@ impl MultipartUpload for S3MultiPartUpload { impl MultipartStore for AmazonS3 { async fn create_multipart(&self, path: &Path) -> Result { self.client - .create_multipart(path, PutMultipartOpts::default()) + .create_multipart(path, PutMultipartOptions::default()) .await } @@ -536,7 +536,7 @@ mod tests { let str = "test.bin"; let path = Path::parse(str).unwrap(); - let opts = PutMultipartOpts::default(); + let opts = PutMultipartOptions::default(); let mut upload = store.put_multipart_opts(&path, opts).await.unwrap(); upload @@ -567,7 +567,7 @@ mod tests { let str = "test.bin"; let path = Path::parse(str).unwrap(); - let opts = PutMultipartOpts::default(); + let opts = PutMultipartOptions::default(); let mut upload = store.put_multipart_opts(&path, opts).await.unwrap(); upload diff --git a/src/aws/precondition.rs b/src/aws/precondition.rs index d23887cf..2f11e4f9 100644 --- a/src/aws/precondition.rs +++ b/src/aws/precondition.rs @@ -76,7 +76,7 @@ pub enum S3CopyIfNotExists { impl std::fmt::Display for S3CopyIfNotExists { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Self::Header(k, v) => write!(f, "header: {}: {}", k, v), + Self::Header(k, v) => write!(f, "header: {k}: {v}"), Self::HeaderWithStatus(k, v, code) => { write!(f, "header-with-status: {k}: {v}: {}", code.as_u16()) } diff --git a/src/aws/resolve.rs b/src/aws/resolve.rs index db899ea9..66d1511c 100644 --- a/src/aws/resolve.rs +++ b/src/aws/resolve.rs @@ -49,7 +49,7 @@ impl From for crate::Error { pub async fn resolve_bucket_region(bucket: &str, client_options: &ClientOptions) -> Result { use reqwest::StatusCode; - let endpoint = format!("https://{}.s3.amazonaws.com", bucket); + let endpoint = format!("https://{bucket}.s3.amazonaws.com"); let client = client_options.client()?; diff --git a/src/azure/builder.rs b/src/azure/builder.rs index b4620654..182bdf04 100644 --- a/src/azure/builder.rs +++ b/src/azure/builder.rs @@ -658,8 +658,8 @@ impl MicrosoftAzureBuilder { }; match parsed.scheme() { - "az" | "adl" | "azure" => self.container_name = Some(validate(host)?), - "abfs" | "abfss" => { + "adl" | "azure" => self.container_name = Some(validate(host)?), + "az" | "abfs" | "abfss" => { // abfs(s) might refer to the fsspec convention abfs:/// // or the convention for the hadoop driver abfs[s]://@.dfs.core.windows.net/ if parsed.username().is_empty() { @@ -1103,6 +1103,14 @@ mod tests { assert_eq!(builder.container_name, Some("file_system".to_string())); assert!(!builder.use_fabric_endpoint.get().unwrap()); + let mut builder = MicrosoftAzureBuilder::new(); + builder + .parse_url("az://container@account.dfs.core.windows.net/path-part/file") + .unwrap(); + assert_eq!(builder.account_name, Some("account".to_string())); + assert_eq!(builder.container_name, Some("container".to_string())); + assert!(!builder.use_fabric_endpoint.get().unwrap()); + let mut builder = MicrosoftAzureBuilder::new(); builder .parse_url("abfss://file_system@account.dfs.fabric.microsoft.com/") @@ -1245,7 +1253,7 @@ mod tests { config_key ); } else { - panic!("{} not propagated as ClientConfigKey", key); + panic!("{key} not propagated as ClientConfigKey"); } } } diff --git a/src/azure/client.rs b/src/azure/client.rs index 428a99b7..c7440a07 100644 --- a/src/azure/client.rs +++ b/src/azure/client.rs @@ -29,7 +29,7 @@ use crate::multipart::PartId; use crate::util::{deserialize_rfc1123, GetRange}; use crate::{ Attribute, Attributes, ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutMode, - PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, RetryConfig, TagSet, + PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, RetryConfig, TagSet, }; use async_trait::async_trait; use base64::prelude::{BASE64_STANDARD, BASE64_STANDARD_NO_PAD}; @@ -243,7 +243,7 @@ impl PutRequest<'_> { builder.header(&MS_CONTENT_TYPE, v.as_ref()) } Attribute::Metadata(k_suffix) => builder.header( - &format!("{}{}", USER_DEFINED_METADATA_HEADER_PREFIX, k_suffix), + &format!("{USER_DEFINED_METADATA_HEADER_PREFIX}{k_suffix}"), v.as_ref(), ), }; @@ -350,7 +350,7 @@ fn serialize_part_delete_request( // Encode the subrequest request-line extend(dst, b"DELETE "); - extend(dst, format!("/{} ", relative_url).as_bytes()); + extend(dst, format!("/{relative_url} ").as_bytes()); extend(dst, b"HTTP/1.1"); extend(dst, b"\r\n"); @@ -597,9 +597,9 @@ impl AzureClient { &self, path: &Path, parts: Vec, - opts: PutMultipartOpts, + opts: PutMultipartOptions, ) -> Result { - let PutMultipartOpts { + let PutMultipartOptions { tags, attributes, extensions, @@ -716,7 +716,7 @@ impl AzureClient { .query(&[("restype", "container"), ("comp", "batch")]) .header( CONTENT_TYPE, - HeaderValue::from_str(format!("multipart/mixed; boundary={}", boundary).as_str()) + HeaderValue::from_str(format!("multipart/mixed; boundary={boundary}").as_str()) .unwrap(), ) .header(CONTENT_LENGTH, HeaderValue::from(body_bytes.len())) diff --git a/src/azure/credential.rs b/src/azure/credential.rs index 27f8776b..c34a8e3c 100644 --- a/src/azure/credential.rs +++ b/src/azure/credential.rs @@ -261,7 +261,7 @@ impl<'a> AzureAuthorizer<'a> { AzureCredential::BearerToken(token) => { request.headers_mut().append( AUTHORIZATION, - HeaderValue::from_str(format!("Bearer {}", token).as_str()).unwrap(), + HeaderValue::from_str(format!("Bearer {token}").as_str()).unwrap(), ); } AzureCredential::SASToken(query_pairs) => { diff --git a/src/azure/mod.rs b/src/azure/mod.rs index d686bac2..f65bf9f3 100644 --- a/src/azure/mod.rs +++ b/src/azure/mod.rs @@ -27,7 +27,7 @@ use crate::{ path::Path, signer::Signer, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore, - PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart, + PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, UploadPart, }; use async_trait::async_trait; use futures::stream::{BoxStream, StreamExt, TryStreamExt}; @@ -99,7 +99,7 @@ impl ObjectStore for MicrosoftAzure { async fn put_multipart_opts( &self, location: &Path, - opts: PutMultipartOpts, + opts: PutMultipartOptions, ) -> Result> { Ok(Box::new(AzureMultiPartUpload { part_idx: 0, @@ -221,7 +221,7 @@ impl Signer for MicrosoftAzure { struct AzureMultiPartUpload { part_idx: usize, state: Arc, - opts: PutMultipartOpts, + opts: PutMultipartOptions, } #[derive(Debug)] diff --git a/src/buffered.rs b/src/buffered.rs index a767cb65..f189c534 100644 --- a/src/buffered.rs +++ b/src/buffered.rs @@ -19,7 +19,7 @@ use crate::path::Path; use crate::{ - Attributes, ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayloadMut, TagSet, + Attributes, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayloadMut, TagSet, WriteMultipart, }; use bytes::Bytes; @@ -337,7 +337,7 @@ impl BufWriter { } else { let buffer = std::mem::take(b); let path = std::mem::take(path); - let opts = PutMultipartOpts { + let opts = PutMultipartOptions { attributes: self.attributes.take().unwrap_or_default(), tags: self.tags.take().unwrap_or_default(), extensions: self.extensions.take().unwrap_or_default(), @@ -397,7 +397,7 @@ impl AsyncWrite for BufWriter { if b.content_length().saturating_add(buf.len()) >= cap { let buffer = std::mem::take(b); let path = std::mem::take(path); - let opts = PutMultipartOpts { + let opts = PutMultipartOptions { attributes: self.attributes.take().unwrap_or_default(), tags: self.tags.take().unwrap_or_default(), extensions: self.extensions.take().unwrap_or_default(), diff --git a/src/chunked.rs b/src/chunked.rs index 2bb30b90..8af3b2c4 100644 --- a/src/chunked.rs +++ b/src/chunked.rs @@ -29,7 +29,7 @@ use futures::StreamExt; use crate::path::Path; use crate::{ GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore, - PutMultipartOpts, PutOptions, PutResult, + PutMultipartOptions, PutOptions, PutResult, }; use crate::{PutPayload, Result}; @@ -78,7 +78,7 @@ impl ObjectStore for ChunkedStore { async fn put_multipart_opts( &self, location: &Path, - opts: PutMultipartOpts, + opts: PutMultipartOptions, ) -> Result> { self.inner.put_multipart_opts(location, opts).await } diff --git a/src/client/builder.rs b/src/client/builder.rs index aca36a05..257cb570 100644 --- a/src/client/builder.rs +++ b/src/client/builder.rs @@ -153,7 +153,7 @@ impl HttpRequestBuilder { #[cfg(feature = "gcp")] pub(crate) fn bearer_auth(mut self, token: &str) -> Self { - let value = HeaderValue::try_from(format!("Bearer {}", token)); + let value = HeaderValue::try_from(format!("Bearer {token}")); match (value, &mut self.request) { (Ok(mut v), Ok(r)) => { v.set_sensitive(true); diff --git a/src/client/retry.rs b/src/client/retry.rs index b6097d92..9eac2b1a 100644 --- a/src/client/retry.rs +++ b/src/client/retry.rs @@ -32,7 +32,7 @@ use tracing::info; use web_time::{Duration, Instant}; /// Retry request error -#[derive(Debug, thiserror::Error)] +#[derive(Debug)] pub struct RetryError(Box); /// Box error to avoid large error variant @@ -66,6 +66,12 @@ impl std::fmt::Display for RetryError { } } +impl std::error::Error for RetryError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + Some(&self.0.inner) + } +} + /// Context of the retry loop /// /// Most use-cases should use [`RetryExt`] and [`RetryableRequestBuilder`], however, @@ -396,6 +402,8 @@ impl RetryableRequest { let status = r.status(); if ctx.exhausted() || !(status.is_server_error() + || status == StatusCode::TOO_MANY_REQUESTS + || status == StatusCode::REQUEST_TIMEOUT || (self.retry_on_conflict && status == StatusCode::CONFLICT)) { let source = match status.is_client_error() { @@ -504,6 +512,7 @@ mod tests { use hyper::header::LOCATION; use hyper::Response; use reqwest::{Client, Method, StatusCode}; + use std::error::Error; use std::time::Duration; #[test] @@ -586,6 +595,28 @@ mod tests { let r = do_request().await.unwrap(); assert_eq!(r.status(), StatusCode::OK); + // Should retry 429 Too Many Requests + mock.push( + Response::builder() + .status(StatusCode::TOO_MANY_REQUESTS) + .body(String::new()) + .unwrap(), + ); + + let r = do_request().await.unwrap(); + assert_eq!(r.status(), StatusCode::OK); + + // Should retry 408 Request Timeout + mock.push( + Response::builder() + .status(StatusCode::REQUEST_TIMEOUT) + .body(String::new()) + .unwrap(), + ); + + let r = do_request().await.unwrap(); + assert_eq!(r.status(), StatusCode::OK); + // Accepts 204 status code mock.push( Response::builder() @@ -657,11 +688,16 @@ mod tests { ); } - let e = do_request().await.unwrap_err().to_string(); + let e = do_request().await.unwrap_err(); assert!( - e.contains(" after 2 retries, max_retries: 2, retry_timeout: 1000s - Server returned non-2xx status code: 502 Bad Gateway"), + e.to_string().contains(" after 2 retries, max_retries: 2, retry_timeout: 1000s - Server returned non-2xx status code: 502 Bad Gateway"), "{e}" ); + // verify e.source() is available as well for users who need programmatic access + assert_eq!( + e.source().unwrap().to_string(), + "Server returned non-2xx status code: 502 Bad Gateway: ", + ); // Panic results in an incomplete message error in the client mock.push_fn::<_, String>(|_| panic!()); @@ -672,11 +708,16 @@ mod tests { for _ in 0..=retry.max_retries { mock.push_fn::<_, String>(|_| panic!()); } - let e = do_request().await.unwrap_err().to_string(); + let e = do_request().await.unwrap_err(); assert!( - e.contains("after 2 retries, max_retries: 2, retry_timeout: 1000s - HTTP error: error sending request"), + e.to_string().contains("after 2 retries, max_retries: 2, retry_timeout: 1000s - HTTP error: error sending request"), "{e}" ); + // verify e.source() is available as well for users who need programmatic access + assert_eq!( + e.source().unwrap().to_string(), + "HTTP error: error sending request", + ); // Retries on client timeout mock.push_async_fn(|_| async move { diff --git a/src/gcp/builder.rs b/src/gcp/builder.rs index 87e28d5a..f7607eea 100644 --- a/src/gcp/builder.rs +++ b/src/gcp/builder.rs @@ -733,7 +733,7 @@ mod tests { config_key ); } else { - panic!("{} not propagated as ClientConfigKey", key); + panic!("{key} not propagated as ClientConfigKey"); } } } diff --git a/src/gcp/client.rs b/src/gcp/client.rs index 0378843c..a988cc45 100644 --- a/src/gcp/client.rs +++ b/src/gcp/client.rs @@ -32,7 +32,7 @@ use crate::multipart::PartId; use crate::path::Path; use crate::util::hex_encode; use crate::{ - Attribute, Attributes, ClientOptions, GetOptions, MultipartId, PutMode, PutMultipartOpts, + Attribute, Attributes, ClientOptions, GetOptions, MultipartId, PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, RetryConfig, }; use async_trait::async_trait; @@ -202,7 +202,7 @@ impl Request<'_> { builder.header(CONTENT_TYPE, v.as_ref()) } Attribute::Metadata(k_suffix) => builder.header( - &format!("{}{}", USER_DEFINED_METADATA_HEADER_PREFIX, k_suffix), + &format!("{USER_DEFINED_METADATA_HEADER_PREFIX}{k_suffix}"), v.as_ref(), ), }; @@ -327,8 +327,7 @@ impl GoogleCloudStorageClient { }; let url = format!( - "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{}:signBlob", - client_email + "https://iamcredentials.googleapis.com/v1/projects/-/serviceAccounts/{client_email}:signBlob" ); let response = self @@ -444,9 +443,9 @@ impl GoogleCloudStorageClient { pub(crate) async fn multipart_initiate( &self, path: &Path, - opts: PutMultipartOpts, + opts: PutMultipartOptions, ) -> Result { - let PutMultipartOpts { + let PutMultipartOptions { // not supported by GCP tags: _, attributes, diff --git a/src/gcp/credential.rs b/src/gcp/credential.rs index 1d224fdd..1e067f53 100644 --- a/src/gcp/credential.rs +++ b/src/gcp/credential.rs @@ -89,6 +89,9 @@ pub enum Error { #[error("Error getting token response body: {}", source)] TokenResponseBody { source: HttpError }, + + #[error("Error reading pem file: {}", source)] + ReadPem { source: std::io::Error }, } impl From for crate::Error { @@ -130,11 +133,13 @@ impl ServiceAccountKey { let mut cursor = Cursor::new(encoded); let mut reader = BufReader::new(&mut cursor); - // Reading from string is infallible - match rustls_pemfile::read_one(&mut reader).unwrap() { - Some(Item::Pkcs8Key(key)) => Self::from_pkcs8(key.secret_pkcs8_der()), - Some(Item::Pkcs1Key(key)) => Self::from_der(key.secret_pkcs1_der()), - _ => Err(Error::MissingKey), + match rustls_pemfile::read_one(&mut reader) { + Ok(item) => match item { + Some(Item::Pkcs8Key(key)) => Self::from_pkcs8(key.secret_pkcs8_der()), + Some(Item::Pkcs1Key(key)) => Self::from_der(key.secret_pkcs1_der()), + _ => Err(Error::MissingKey), + }, + Err(e) => Err(Error::ReadPem { source: e }), } } @@ -769,7 +774,7 @@ impl GCSAuthorizer { let email = &self.credential.email; let date = self.date.unwrap_or_else(Utc::now); let scope = self.scope(date); - let credential_with_scope = format!("{}/{}", email, scope); + let credential_with_scope = format!("{email}/{scope}"); let mut headers = HeaderMap::new(); headers.insert("host", DEFAULT_GCS_SIGN_BLOB_HOST.parse().unwrap()); @@ -821,8 +826,7 @@ impl GCSAuthorizer { let (canonical_headers, signed_headers) = Self::canonicalize_headers(headers); format!( - "{}\n{}\n{}\n{}\n\n{}\n{}", - verb, path, query, canonical_headers, signed_headers, DEFAULT_GCS_PLAYLOAD_STRING + "{verb}\n{path}\n{query}\n{canonical_headers}\n\n{signed_headers}\n{DEFAULT_GCS_PLAYLOAD_STRING}" ) } diff --git a/src/gcp/mod.rs b/src/gcp/mod.rs index dfd638a8..442b24fe 100644 --- a/src/gcp/mod.rs +++ b/src/gcp/mod.rs @@ -42,7 +42,7 @@ use crate::gcp::credential::GCSAuthorizer; use crate::signer::Signer; use crate::{ multipart::PartId, path::Path, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, - ObjectMeta, ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, + ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, UploadPart, }; use async_trait::async_trait; @@ -161,7 +161,7 @@ impl ObjectStore for GoogleCloudStorage { async fn put_multipart_opts( &self, location: &Path, - opts: PutMultipartOpts, + opts: PutMultipartOptions, ) -> Result> { let upload_id = self.client.multipart_initiate(location, opts).await?; @@ -213,7 +213,7 @@ impl ObjectStore for GoogleCloudStorage { impl MultipartStore for GoogleCloudStorage { async fn create_multipart(&self, path: &Path) -> Result { self.client - .multipart_initiate(path, PutMultipartOpts::default()) + .multipart_initiate(path, PutMultipartOptions::default()) .await } diff --git a/src/http/mod.rs b/src/http/mod.rs index 8b1f5053..8581f923 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -46,7 +46,8 @@ use crate::http::client::Client; use crate::path::Path; use crate::{ ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, - ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, RetryConfig, + ObjectStore, PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, + RetryConfig, }; mod client; @@ -122,7 +123,7 @@ impl ObjectStore for HttpStore { async fn put_multipart_opts( &self, _location: &Path, - _opts: PutMultipartOpts, + _opts: PutMultipartOptions, ) -> Result> { Err(crate::Error::NotImplemented) } diff --git a/src/lib.rs b/src/lib.rs index 80e91d72..06edd33c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -614,7 +614,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { /// /// For more advanced multipart uploads see [`MultipartStore`](multipart::MultipartStore) async fn put_multipart(&self, location: &Path) -> Result> { - self.put_multipart_opts(location, PutMultipartOpts::default()) + self.put_multipart_opts(location, PutMultipartOptions::default()) .await } @@ -627,7 +627,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { async fn put_multipart_opts( &self, location: &Path, - opts: PutMultipartOpts, + opts: PutMultipartOptions, ) -> Result>; /// Return the bytes that are stored at the specified location. @@ -823,7 +823,7 @@ macro_rules! as_ref_impl { async fn put_multipart_opts( &self, location: &Path, - opts: PutMultipartOpts, + opts: PutMultipartOptions, ) -> Result> { self.as_ref().put_multipart_opts(location, opts).await } @@ -1234,9 +1234,14 @@ impl From for PutOptions { } } +// See . +#[doc(hidden)] +#[deprecated(note = "Use PutMultipartOptions", since = "0.13.0")] +pub type PutMultipartOpts = PutMultipartOptions; + /// Options for [`ObjectStore::put_multipart_opts`] #[derive(Debug, Clone, Default)] -pub struct PutMultipartOpts { +pub struct PutMultipartOptions { /// Provide a [`TagSet`] for this object /// /// Implementations that don't support object tagging should ignore this @@ -1254,7 +1259,7 @@ pub struct PutMultipartOpts { pub extensions: ::http::Extensions, } -impl PartialEq for PutMultipartOpts { +impl PartialEq for PutMultipartOptions { fn eq(&self, other: &Self) -> bool { let Self { tags, @@ -1270,9 +1275,9 @@ impl PartialEq for PutMultipartOpts { } } -impl Eq for PutMultipartOpts {} +impl Eq for PutMultipartOptions {} -impl From for PutMultipartOpts { +impl From for PutMultipartOptions { fn from(tags: TagSet) -> Self { Self { tags, @@ -1281,7 +1286,7 @@ impl From for PutMultipartOpts { } } -impl From for PutMultipartOpts { +impl From for PutMultipartOptions { fn from(attributes: Attributes) -> Self { Self { attributes, diff --git a/src/limit.rs b/src/limit.rs index 330a0da0..85714967 100644 --- a/src/limit.rs +++ b/src/limit.rs @@ -19,7 +19,7 @@ use crate::{ BoxStream, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, - ObjectStore, Path, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, StreamExt, + ObjectStore, Path, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, StreamExt, UploadPart, }; use async_trait::async_trait; @@ -96,7 +96,7 @@ impl ObjectStore for LimitStore { async fn put_multipart_opts( &self, location: &Path, - opts: PutMultipartOpts, + opts: PutMultipartOptions, ) -> Result> { let upload = self.inner.put_multipart_opts(location, opts).await?; Ok(Box::new(LimitUpload { diff --git a/src/local.rs b/src/local.rs index ccf6e34d..3404bc89 100644 --- a/src/local.rs +++ b/src/local.rs @@ -38,7 +38,8 @@ use crate::{ path::{absolute_path_to_url, Path}, util::InvalidGetRange, Attributes, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, - ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart, + ObjectStore, PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, + UploadPart, }; /// A specialized `Error` for filesystem object store-related errors @@ -388,7 +389,7 @@ impl ObjectStore for LocalFileSystem { async fn put_multipart_opts( &self, location: &Path, - opts: PutMultipartOpts, + opts: PutMultipartOptions, ) -> Result> { if !opts.attributes.is_empty() { return Err(crate::Error::NotImplemented); @@ -1497,7 +1498,7 @@ mod tests { let root_path = root.path(); for i in 0..5 { - let filename = format!("test{}.parquet", i); + let filename = format!("test{i}.parquet"); let file = root_path.join(filename); std::fs::write(file, "test").unwrap(); } diff --git a/src/memory.rs b/src/memory.rs index f03dbc6d..e15c2465 100644 --- a/src/memory.rs +++ b/src/memory.rs @@ -30,7 +30,7 @@ use crate::multipart::{MultipartStore, PartId}; use crate::util::InvalidGetRange; use crate::{ path::Path, Attributes, GetRange, GetResult, GetResultPayload, ListResult, MultipartId, - MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutResult, + MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutMultipartOptions, PutOptions, PutResult, Result, UpdateVersion, UploadPart, }; use crate::{GetOptions, PutPayload}; @@ -224,7 +224,7 @@ impl ObjectStore for InMemory { async fn put_multipart_opts( &self, location: &Path, - opts: PutMultipartOpts, + opts: PutMultipartOptions, ) -> Result> { Ok(Box::new(InMemoryUpload { location: location.clone(), diff --git a/src/parse.rs b/src/parse.rs index 41886094..b1f653c5 100644 --- a/src/parse.rs +++ b/src/parse.rs @@ -92,7 +92,7 @@ impl ObjectStoreScheme { /// assert_eq!(scheme, ObjectStoreScheme::Local); /// assert_eq!(path.as_ref(), "path/to/my/file"); /// - /// let url: Url = "https://blob.core.windows.net/path/to/my/file".parse().unwrap(); + /// let url: Url = "https://blob.core.windows.net/container/path/to/my/file".parse().unwrap(); /// let (scheme, path) = ObjectStoreScheme::parse(&url).unwrap(); /// assert_eq!(scheme, ObjectStoreScheme::MicrosoftAzure); /// assert_eq!(path.as_ref(), "path/to/my/file"); @@ -110,9 +110,8 @@ impl ObjectStoreScheme { ("memory", None) => (Self::Memory, url.path()), ("s3" | "s3a", Some(_)) => (Self::AmazonS3, url.path()), ("gs", Some(_)) => (Self::GoogleCloudStorage, url.path()), - ("az" | "adl" | "azure" | "abfs" | "abfss", Some(_)) => { - (Self::MicrosoftAzure, url.path()) - } + ("az", Some(_)) => (Self::MicrosoftAzure, strip_bucket().unwrap_or_default()), + ("adl" | "azure" | "abfs" | "abfss", Some(_)) => (Self::MicrosoftAzure, url.path()), ("http", Some(_)) => (Self::Http, url.path()), ("https", Some(host)) => { if host.ends_with("dfs.core.windows.net") @@ -120,7 +119,7 @@ impl ObjectStoreScheme { || host.ends_with("dfs.fabric.microsoft.com") || host.ends_with("blob.fabric.microsoft.com") { - (Self::MicrosoftAzure, url.path()) + (Self::MicrosoftAzure, strip_bucket().unwrap_or_default()) } else if host.ends_with("amazonaws.com") { match host.starts_with("s3") { true => (Self::AmazonS3, strip_bucket().unwrap_or_default()), @@ -220,6 +219,7 @@ where builder_opts!(crate::http::HttpBuilder, url, _options) } #[cfg(not(all( + feature = "fs", feature = "aws", feature = "azure", feature = "gcp", @@ -286,10 +286,26 @@ mod tests { "https://account.dfs.core.windows.net", (ObjectStoreScheme::MicrosoftAzure, ""), ), + ( + "https://account.dfs.core.windows.net/container/path", + (ObjectStoreScheme::MicrosoftAzure, "path"), + ), ( "https://account.blob.core.windows.net", (ObjectStoreScheme::MicrosoftAzure, ""), ), + ( + "https://account.blob.core.windows.net/container/path", + (ObjectStoreScheme::MicrosoftAzure, "path"), + ), + ( + "az://account/container", + (ObjectStoreScheme::MicrosoftAzure, ""), + ), + ( + "az://account/container/path", + (ObjectStoreScheme::MicrosoftAzure, "path"), + ), ( "gs://bucket/path", (ObjectStoreScheme::GoogleCloudStorage, "path"), @@ -335,7 +351,11 @@ mod tests { ), ( "https://account.dfs.fabric.microsoft.com/container", - (ObjectStoreScheme::MicrosoftAzure, "container"), + (ObjectStoreScheme::MicrosoftAzure, ""), + ), + ( + "https://account.dfs.fabric.microsoft.com/container/path", + (ObjectStoreScheme::MicrosoftAzure, "path"), ), ( "https://account.blob.fabric.microsoft.com/", @@ -343,7 +363,11 @@ mod tests { ), ( "https://account.blob.fabric.microsoft.com/container", - (ObjectStoreScheme::MicrosoftAzure, "container"), + (ObjectStoreScheme::MicrosoftAzure, ""), + ), + ( + "https://account.blob.fabric.microsoft.com/container/path", + (ObjectStoreScheme::MicrosoftAzure, "path"), ), ]; diff --git a/src/prefix.rs b/src/prefix.rs index c2802c1d..e5a917aa 100644 --- a/src/prefix.rs +++ b/src/prefix.rs @@ -22,8 +22,8 @@ use std::ops::Range; use crate::path::Path; use crate::{ - GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOpts, - PutOptions, PutPayload, PutResult, Result, + GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore, + PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, }; /// Store wrapper that applies a constant prefix to all paths handled by the store. @@ -121,7 +121,7 @@ impl ObjectStore for PrefixStore { async fn put_multipart_opts( &self, location: &Path, - opts: PutMultipartOpts, + opts: PutMultipartOptions, ) -> Result> { let full_path = self.full_path(location); self.inner.put_multipart_opts(&full_path, opts).await diff --git a/src/throttle.rs b/src/throttle.rs index efe29491..dec642a7 100644 --- a/src/throttle.rs +++ b/src/throttle.rs @@ -23,7 +23,7 @@ use std::{convert::TryInto, sync::Arc}; use crate::multipart::{MultipartStore, PartId}; use crate::{ path::Path, GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload, ObjectMeta, - ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, + ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, }; use crate::{GetOptions, UploadPart}; use async_trait::async_trait; @@ -174,7 +174,7 @@ impl ObjectStore for ThrottledStore { async fn put_multipart_opts( &self, location: &Path, - opts: PutMultipartOpts, + opts: PutMultipartOptions, ) -> Result> { let upload = self.inner.put_multipart_opts(location, opts).await?; Ok(Box::new(ThrottledUpload { @@ -404,7 +404,9 @@ impl MultipartUpload for ThrottledUpload { #[cfg(test)] mod tests { use super::*; - use crate::{integration::*, memory::InMemory, GetResultPayload}; + #[cfg(target_os = "linux")] + use crate::GetResultPayload; + use crate::{integration::*, memory::InMemory}; use futures::TryStreamExt; use tokio::time::Duration; use tokio::time::Instant; diff --git a/tests/get_range_file.rs b/tests/get_range_file.rs index 6790c11e..d5ac8e39 100644 --- a/tests/get_range_file.rs +++ b/tests/get_range_file.rs @@ -49,7 +49,7 @@ impl ObjectStore for MyStore { async fn put_multipart_opts( &self, _location: &Path, - _opts: PutMultipartOpts, + _opts: PutMultipartOptions, ) -> Result> { todo!() }