Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions .github/workflows/ci_core.yml
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,36 @@ jobs:
rustup target add wasm32-unknown-unknown
cargo build --target wasm32-unknown-unknown --no-default-features --features="${FEATURES[*]}" --locked

# Build under WASI to ensure WASI compatibility (uses std::time instead of web_time)
build_under_wasi:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- name: Setup Rust toolchain
uses: ./.github/actions/setup
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
- name: Build
working-directory: core
run: |
FEATURES=(
layers-capability-check
layers-chaos
layers-concurrent-limit
layers-immutable-index
layers-logging
layers-metrics
layers-mime-guess
layers-retry
layers-throttle
layers-timeout
services-dashmap
services-mini-moka
)
rustup toolchain install nightly
rustup target add wasm32-wasip2 --toolchain nightly
cargo +nightly build --lib --target wasm32-wasip2 --no-default-features --features="${FEATURES[*]}" --locked

unit:
runs-on: ubuntu-latest
steps:
Expand Down
1 change: 1 addition & 0 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ default = [
"layers-timeout",
]
executors-tokio = ["opendal-core/executors-tokio"]
http-client-reqwest = ["opendal-core/http-client-reqwest"]
internal-path-cache = ["opendal-core/internal-path-cache"]
internal-tokio-rt = ["opendal-core/internal-tokio-rt"]
layers-async-backtrace = ["dep:opendal-layer-async-backtrace"]
Expand Down
9 changes: 6 additions & 3 deletions core/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ all-features = true
[features]
default = ["reqwest-rustls-tls", "executors-tokio"]

# Enable default http client using reqwest.
http-client-reqwest = ["dep:reqwest"]

# Enable reqwest rustls tls support.
reqwest-rustls-tls = ["reqwest/rustls-tls"]
reqwest-rustls-tls = ["http-client-reqwest", "reqwest/rustls-tls"]

# Enable opendal's blocking support.
blocking = ["internal-tokio-rt"]
Expand Down Expand Up @@ -77,10 +80,10 @@ percent-encoding = "2"
quick-xml = { workspace = true, features = ["serialize", "overlapped-lists"] }
reqwest = { version = "0.12.24", features = [
"stream",
], default-features = false }
], default-features = false, optional = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["macros", "io-util"] }
tokio = { workspace = true, features = ["macros", "io-util"], optional = true }
url = { workspace = true }
uuid = { workspace = true, features = ["serde", "v4"] }

Expand Down
188 changes: 42 additions & 146 deletions core/core/src/raw/http_util/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,40 +15,20 @@
// specific language governing permissions and limitations
// under the License.

use std::convert::Infallible;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::future;
use std::mem;
use std::ops::Deref;
use std::pin::Pin;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::LazyLock;
use std::task::Context;
use std::task::Poll;

use bytes::Bytes;
use futures::Future;
use futures::TryStreamExt;
use http::Request;
use http::Response;
use http_body::Frame;
use http_body::SizeHint;
use raw::oio::Read;

use super::HttpBody;
use super::parse_content_encoding;
use super::parse_content_length;
use crate::raw::oio::Read;
use crate::raw::*;
use crate::*;

/// Http client used across opendal for loading credentials.
/// This is merely a temporary solution because reqsign requires a reqwest client to be passed.
/// We will remove it after the next major version of reqsign, which will enable users to provide their own client.
#[allow(dead_code)]
pub static GLOBAL_REQWEST_CLIENT: LazyLock<reqwest::Client> = LazyLock::new(reqwest::Client::new);

/// HttpFetcher is a type erased [`HttpFetch`].
pub type HttpFetcher = Arc<dyn HttpFetchDyn>;

Expand All @@ -69,21 +49,60 @@ impl Debug for HttpClient {
}
}

#[cfg(feature = "http-client-reqwest")]
impl Default for HttpClient {
fn default() -> Self {
Self {
fetcher: Arc::new(super::reqwest_impl::GLOBAL_REQWEST_CLIENT.clone()),
}
}
}

#[cfg(not(feature = "http-client-reqwest"))]
impl Default for HttpClient {
fn default() -> Self {
Self {
fetcher: Arc::new(GLOBAL_REQWEST_CLIENT.clone()),
fetcher: Arc::new(StubHttpClient),
}
}
}

/// A stub HTTP client that returns an error when used.
/// This is used when no HTTP client implementation is available.
#[cfg(not(feature = "http-client-reqwest"))]
struct StubHttpClient;

#[cfg(not(feature = "http-client-reqwest"))]
impl HttpFetch for StubHttpClient {
async fn fetch(&self, _req: Request<Buffer>) -> Result<Response<HttpBody>> {
Err(Error::new(
ErrorKind::ConfigInvalid,
"No HTTP client available. Enable the 'http-client-reqwest' feature or provide a custom HTTP client via HttpClient::with().",
))
}
}

impl HttpClient {
/// Create a new http client in async context.
#[cfg(feature = "http-client-reqwest")]
pub fn new() -> Result<Self> {
Ok(Self::default())
}

/// Construct `Self` with given [`reqwest::Client`]
/// Create a new http client in async context.
///
/// Returns an error when no HTTP client implementation is available.
/// Either enable the `http-client-reqwest` feature or use [`HttpClient::with`]
/// to provide a custom client.
#[cfg(not(feature = "http-client-reqwest"))]
pub fn new() -> Result<Self> {
Err(Error::new(
ErrorKind::ConfigInvalid,
"No HTTP client available. Enable the 'http-client-reqwest' feature or provide a custom HTTP client via HttpClient::with().",
))
}

/// Construct `Self` with given client that implements [`HttpFetch`]
pub fn with(client: impl HttpFetch) -> Self {
let fetcher = Arc::new(client);
Self { fetcher }
Expand Down Expand Up @@ -140,126 +159,3 @@ impl<T: HttpFetchDyn + ?Sized> HttpFetch for Arc<T> {
self.deref().fetch_dyn(req).await
}
}

impl HttpFetch for reqwest::Client {
async fn fetch(&self, req: Request<Buffer>) -> Result<Response<HttpBody>> {
// Uri stores all string alike data in `Bytes` which means
// the clone here is cheap.
let uri = req.uri().clone();
let is_head = req.method() == http::Method::HEAD;

let (parts, body) = req.into_parts();

let url = reqwest::Url::from_str(&uri.to_string()).map_err(|err| {
Error::new(ErrorKind::Unexpected, "request url is invalid")
.with_operation("http_util::Client::send::fetch")
.with_context("url", uri.to_string())
.set_source(err)
})?;

let mut req_builder = self.request(parts.method, url).headers(parts.headers);

// Client under wasm doesn't support set version.
#[cfg(not(target_arch = "wasm32"))]
{
req_builder = req_builder.version(parts.version);
}

// Don't set body if body is empty.
if !body.is_empty() {
#[cfg(not(target_arch = "wasm32"))]
{
req_builder = req_builder.body(reqwest::Body::wrap(HttpBufferBody(body)))
}
#[cfg(target_arch = "wasm32")]
{
req_builder = req_builder.body(reqwest::Body::from(body.to_bytes()))
}
}

let mut resp = req_builder.send().await.map_err(|err| {
Error::new(ErrorKind::Unexpected, "send http request")
.with_operation("http_util::Client::send")
.with_context("url", uri.to_string())
.with_temporary(is_temporary_error(&err))
.set_source(err)
})?;

// Get content length from header so that we can check it.
//
// - If the request method is HEAD, we will ignore content length.
// - If response contains content_encoding, we should omit its content length.
let content_length = if is_head || parse_content_encoding(resp.headers())?.is_some() {
None
} else {
parse_content_length(resp.headers())?
};

let mut hr = Response::builder()
.status(resp.status())
// Insert uri into response extension so that we can fetch
// it later.
.extension(uri.clone());

// Response builder under wasm doesn't support set version.
#[cfg(not(target_arch = "wasm32"))]
{
hr = hr.version(resp.version());
}

// Swap headers directly instead of copy the entire map.
mem::swap(hr.headers_mut().unwrap(), resp.headers_mut());

let bs = HttpBody::new(
resp.bytes_stream()
.try_filter(|v| future::ready(!v.is_empty()))
.map_ok(Buffer::from)
.map_err(move |err| {
Error::new(ErrorKind::Unexpected, "read data from http response")
.with_operation("http_util::Client::send")
.with_context("url", uri.to_string())
.with_temporary(is_temporary_error(&err))
.set_source(err)
}),
content_length,
);

let resp = hr.body(bs).expect("response must build succeed");
Ok(resp)
}
}

#[inline]
fn is_temporary_error(err: &reqwest::Error) -> bool {
// error sending request
err.is_request()||
// request or response body error
err.is_body() ||
// error decoding response body, for example, connection reset.
err.is_decode()
}

struct HttpBufferBody(Buffer);

impl http_body::Body for HttpBufferBody {
type Data = Bytes;
type Error = Infallible;

fn poll_frame(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.0.next() {
Some(bs) => Poll::Ready(Some(Ok(Frame::data(bs)))),
None => Poll::Ready(None),
}
}

fn is_end_stream(&self) -> bool {
self.0.is_empty()
}

fn size_hint(&self) -> SizeHint {
SizeHint::with_exact(self.0.len() as u64)
}
}
8 changes: 5 additions & 3 deletions core/core/src/raw/http_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
//! it easier to develop services and layers outside opendal.

mod client;
/// temporary client used by several features
#[allow(unused_imports)]
pub use client::GLOBAL_REQWEST_CLIENT;
pub use client::HttpClient;
pub use client::HttpFetch;
pub use client::HttpFetcher;

#[cfg(feature = "http-client-reqwest")]
pub(crate) mod reqwest_impl;
#[cfg(feature = "http-client-reqwest")]
pub use self::reqwest_impl::GLOBAL_REQWEST_CLIENT;

mod body;
pub use body::HttpBody;

Expand Down
Loading
Loading