Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ lnurl-auth = ["dep:bitcoin", "dep:url", "dep:serde", "dep:serde_json", "reqwest/

[dependencies]
prost = "0.11.6"
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls"] }
reqwest = { version = "0.12.23", default-features = false, features = ["rustls-tls"] }
tokio = { version = "1", default-features = false, features = ["time"] }
rand = "0.8.5"
async-trait = "0.1.77"
Expand All @@ -34,7 +34,7 @@ chacha20-poly1305 = "0.1.2"

[target.'cfg(genproto)'.build-dependencies]
prost-build = { version = "0.11.3" }
reqwest = { version = "0.12", default-features = false, features = ["rustls-tls", "blocking"] }
reqwest = { version = "0.12.23", default-features = false, features = ["rustls-tls", "blocking"] }

[dev-dependencies]
mockito = "0.28.0"
Expand Down
121 changes: 59 additions & 62 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,48 +11,39 @@ use crate::types::{
DeleteObjectRequest, DeleteObjectResponse, GetObjectRequest, GetObjectResponse,
ListKeyVersionsRequest, ListKeyVersionsResponse, PutObjectRequest, PutObjectResponse,
};
use crate::util::retry::{retry, RetryPolicy};

const APPLICATION_OCTET_STREAM: &str = "application/octet-stream";
const DEFAULT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);
const DEFAULT_RETRIES: u32 = 2;

/// Thin-client to access a hosted instance of Versioned Storage Service (VSS).
/// The provided [`VssClient`] API is minimalistic and is congruent to the VSS server-side API.
#[derive(Clone)]
pub struct VssClient<R>
where
R: RetryPolicy<E = VssError>,
{
pub struct VssClient {
base_url: String,
client: Client,
retry_policy: R,
header_provider: Arc<dyn VssHeaderProvider>,
}

impl<R: RetryPolicy<E = VssError>> VssClient<R> {
impl VssClient {
/// Constructs a [`VssClient`] using `base_url` as the VSS server endpoint.
pub fn new(base_url: String, retry_policy: R) -> Self {
let client = Client::new();
Self::from_client(base_url, client, retry_policy)
pub fn new(base_url: String) -> Result<Self, VssError> {
let client = build_client(&base_url)?;
Ok(Self::from_client(base_url, client))
}

/// Constructs a [`VssClient`] from a given [`reqwest::Client`], using `base_url` as the VSS server endpoint.
pub fn from_client(base_url: String, client: Client, retry_policy: R) -> Self {
Self {
base_url,
client,
retry_policy,
header_provider: Arc::new(FixedHeaders::new(HashMap::new())),
}
pub fn from_client(base_url: String, client: Client) -> Self {
Self { base_url, client, header_provider: Arc::new(FixedHeaders::new(HashMap::new())) }
}

/// Constructs a [`VssClient`] using `base_url` as the VSS server endpoint.
///
/// HTTP headers will be provided by the given `header_provider`.
pub fn new_with_headers(
base_url: String, retry_policy: R, header_provider: Arc<dyn VssHeaderProvider>,
) -> Self {
let client = Client::new();
Self { base_url, client, retry_policy, header_provider }
base_url: String, header_provider: Arc<dyn VssHeaderProvider>,
) -> Result<Self, VssError> {
let client = build_client(&base_url)?;
Ok(Self { base_url, client, header_provider })
}

/// Returns the underlying base URL.
Expand All @@ -66,22 +57,17 @@ impl<R: RetryPolicy<E = VssError>> VssClient<R> {
pub async fn get_object(
&self, request: &GetObjectRequest,
) -> Result<GetObjectResponse, VssError> {
retry(
|| async {
let url = format!("{}/getObject", self.base_url);
self.post_request(request, &url).await.and_then(|response: GetObjectResponse| {
if response.value.is_none() {
Err(VssError::InternalServerError(
"VSS Server API Violation, expected value in GetObjectResponse but found none".to_string(),
))
} else {
Ok(response)
}
})
},
&self.retry_policy,
)
.await
let url = format!("{}/getObject", self.base_url);
self.post_request(request, &url).await.and_then(|response: GetObjectResponse| {
if response.value.is_none() {
Err(VssError::InternalServerError(
"VSS Server API Violation, expected value in GetObjectResponse but found none"
.to_string(),
))
} else {
Ok(response)
}
})
}

/// Writes multiple [`PutObjectRequest::transaction_items`] as part of a single transaction.
Expand All @@ -91,14 +77,8 @@ impl<R: RetryPolicy<E = VssError>> VssClient<R> {
pub async fn put_object(
&self, request: &PutObjectRequest,
) -> Result<PutObjectResponse, VssError> {
retry(
|| async {
let url = format!("{}/putObjects", self.base_url);
self.post_request(request, &url).await
},
&self.retry_policy,
)
.await
let url = format!("{}/putObjects", self.base_url);
self.post_request(request, &url).await
}

/// Deletes the given `key` and `value` in `request`.
Expand All @@ -107,14 +87,8 @@ impl<R: RetryPolicy<E = VssError>> VssClient<R> {
pub async fn delete_object(
&self, request: &DeleteObjectRequest,
) -> Result<DeleteObjectResponse, VssError> {
retry(
|| async {
let url = format!("{}/deleteObject", self.base_url);
self.post_request(request, &url).await
},
&self.retry_policy,
)
.await
let url = format!("{}/deleteObject", self.base_url);
self.post_request(request, &url).await
}

/// Lists keys and their corresponding version for a given [`ListKeyVersionsRequest::store_id`].
Expand All @@ -123,14 +97,8 @@ impl<R: RetryPolicy<E = VssError>> VssClient<R> {
pub async fn list_key_versions(
&self, request: &ListKeyVersionsRequest,
) -> Result<ListKeyVersionsResponse, VssError> {
retry(
|| async {
let url = format!("{}/listKeyVersions", self.base_url);
self.post_request(request, &url).await
},
&self.retry_policy,
)
.await
let url = format!("{}/listKeyVersions", self.base_url);
self.post_request(request, &url).await
}

async fn post_request<Rq: Message, Rs: Message + Default>(
Expand Down Expand Up @@ -162,3 +130,32 @@ impl<R: RetryPolicy<E = VssError>> VssClient<R> {
}
}
}

fn build_client(base_url: &str) -> Result<Client, VssError> {
let url = reqwest::Url::parse(base_url).map_err(|_| VssError::InvalidUrlError)?;
let host_str = url.host_str().ok_or(VssError::InvalidUrlError)?.to_string();
// TODO: Once the response `payload` is available in `classify_fn`, we should filter out any
// error types for which retrying doesn't make sense (i.e., `NoSuchKeyError`,
// `InvalidRequestError`,`ConflictError`).
let retry = reqwest::retry::for_host(host_str)
.max_retries_per_request(DEFAULT_RETRIES)
.classify_fn(|req_rep| match req_rep.status() {
// VSS uses INTERNAL_SERVER_ERROR when sending back error repsonses. These are
// currently still covered by our `RetryPolicy`, so we tell `reqwest` not to retry them.
Some(reqwest::StatusCode::INTERNAL_SERVER_ERROR) => req_rep.success(),
Some(reqwest::StatusCode::BAD_REQUEST) => req_rep.success(),
Some(reqwest::StatusCode::UNAUTHORIZED) => req_rep.success(),
Some(reqwest::StatusCode::NOT_FOUND) => req_rep.success(),
Some(reqwest::StatusCode::CONFLICT) => req_rep.success(),
Some(reqwest::StatusCode::OK) => req_rep.success(),
_ => req_rep.retryable(),
});
let client = Client::builder()
.timeout(DEFAULT_TIMEOUT)
.connect_timeout(DEFAULT_TIMEOUT)
.read_timeout(DEFAULT_TIMEOUT)
.retry(retry)
.build()
.unwrap();
Ok(client)
}
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ pub enum VssError {
/// There is an unknown error, it could be a client-side bug, unrecognized error-code, network error
/// or something else.
InternalError(String),

/// The provided URL is invalid.
InvalidUrlError,
}

impl VssError {
Expand Down Expand Up @@ -67,6 +70,9 @@ impl Display for VssError {
VssError::InternalError(message) => {
write!(f, "InternalError: {}", message)
},
VssError::InvalidUrlError => {
write!(f, "The provided URL is invalid")
},
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
#![deny(rustdoc::private_intra_doc_links)]
#![deny(missing_docs)]

// Crate re-exports
pub use reqwest;

/// Implements a thin-client ([`client::VssClient`]) to access a hosted instance of Versioned Storage Service (VSS).
pub mod client;

Expand Down
3 changes: 0 additions & 3 deletions src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
/// [`StorableBuilder`]: storable_builder::StorableBuilder
pub mod storable_builder;

/// Contains retry utilities.
pub mod retry;

/// Contains [`KeyObfuscator`] utility.
///
/// [`KeyObfuscator`]: key_obfuscator::KeyObfuscator
Expand Down
Loading