Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions core/services/obs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ http = { workspace = true }
log = { workspace = true }
opendal-core = { path = "../../core", version = "0.55.0", default-features = false }
quick-xml = { workspace = true, features = ["serialize", "overlapped-lists"] }
reqsign = { workspace = true, features = [
"services-huaweicloud",
"reqwest_request",
] }
reqsign-core = "2.0.2"
reqsign-file-read-tokio = "2.0.2"
reqsign-http-send-reqwest = "2.0.1"
reqsign-huaweicloud-obs = "2.0.2"
serde = { workspace = true, features = ["derive"] }

[dev-dependencies]
Expand Down
42 changes: 23 additions & 19 deletions core/services/obs/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,17 @@ use http::Response;
use http::StatusCode;
use http::Uri;
use log::debug;
use reqsign::HuaweicloudObsConfig;
use reqsign::HuaweicloudObsCredentialLoader;
use reqsign::HuaweicloudObsSigner;
use opendal_core::raw::*;
use opendal_core::*;
use reqsign_core::Context;
use reqsign_core::OsEnv;
use reqsign_core::ProvideCredentialChain;
use reqsign_core::Signer;
use reqsign_file_read_tokio::TokioFileRead;
use reqsign_http_send_reqwest::ReqwestHttpSend;
use reqsign_huaweicloud_obs::EnvCredentialProvider;
use reqsign_huaweicloud_obs::RequestSigner;
use reqsign_huaweicloud_obs::StaticCredentialProvider;

use super::OBS_SCHEME;
use super::config::ObsConfig;
Expand All @@ -36,8 +44,6 @@ use super::error::parse_error;
use super::lister::ObsLister;
use super::writer::ObsWriter;
use super::writer::ObsWriters;
use opendal_core::raw::*;
use opendal_core::*;

/// Huawei-Cloud Object Storage Service (OBS) support
#[doc = include_str!("docs.md")]
Expand Down Expand Up @@ -169,28 +175,27 @@ impl Builder for ObsBuilder {
};
debug!("backend use endpoint {}", &endpoint);

let mut cfg = HuaweicloudObsConfig::default();
// Load cfg from env first.
cfg = cfg.from_env();
let ctx = Context::new()
.with_file_read(TokioFileRead)
.with_http_send(ReqwestHttpSend::new(GLOBAL_REQWEST_CLIENT.clone()))
.with_env(OsEnv);

if let Some(v) = self.config.access_key_id {
cfg.access_key_id = Some(v);
}
let mut provider = ProvideCredentialChain::new().push(EnvCredentialProvider::new());

if let Some(v) = self.config.secret_access_key {
cfg.secret_access_key = Some(v);
if let (Some(ak), Some(sk)) = (&self.config.access_key_id, &self.config.secret_access_key) {
let static_provider = StaticCredentialProvider::new(ak, sk);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, do we need to add security_token to ObsConfig?

provider = provider.push_front(static_provider);
}

let loader = HuaweicloudObsCredentialLoader::new(cfg);

// Set the bucket name in CanonicalizedResource.
// 1. If the bucket is bound to a user domain name, use the user domain name as the bucket name,
// for example, `/obs.ccc.com/object`. `obs.ccc.com` is the user domain name bound to the bucket.
// 2. If you do not access OBS using a user domain name, this field is in the format of `/bucket/object`.
//
// Please refer to this doc for more details:
// https://support.huaweicloud.com/intl/en-us/api-obs/obs_04_0010.html
let signer = HuaweicloudObsSigner::new(if is_obs_default { &bucket } else { &endpoint });
let request_signer = RequestSigner::new(if is_obs_default { &bucket } else { &endpoint });
let signer = Signer::new(ctx, provider, request_signer);

debug!("backend build finished");
Ok(ObsBackend {
Expand Down Expand Up @@ -252,7 +257,6 @@ impl Builder for ObsBuilder {
root,
endpoint: format!("{}://{}", &scheme, &endpoint),
signer,
loader,
}),
})
}
Expand Down Expand Up @@ -390,8 +394,8 @@ impl Access for ObsBackend {
"operation is not supported",
)),
};
let mut req = req?;
self.core.sign_query(&mut req, args.expire()).await?;
let req = req?;
let req = self.core.sign_query(req, args.expire()).await?;

// We don't need this request anymore, consume it directly.
let (parts, _) = req.into_parts();
Expand Down
93 changes: 38 additions & 55 deletions core/services/obs/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,12 @@ use http::header::CONTENT_LENGTH;
use http::header::CONTENT_TYPE;
use http::header::IF_MATCH;
use http::header::IF_NONE_MATCH;
use reqsign::HuaweicloudObsCredential;
use reqsign::HuaweicloudObsCredentialLoader;
use reqsign::HuaweicloudObsSigner;
use serde::Deserialize;
use serde::Serialize;

use opendal_core::raw::*;
use opendal_core::*;
use reqsign_core::Signer;
use reqsign_huaweicloud_obs::Credential;
use serde::Deserialize;
use serde::Serialize;

pub mod constants {
pub const X_OBS_META_PREFIX: &str = "x-obs-meta-";
Expand All @@ -47,8 +45,7 @@ pub struct ObsCore {
pub root: String,
pub endpoint: String,

pub signer: HuaweicloudObsSigner,
pub loader: HuaweicloudObsCredentialLoader,
pub signer: Signer<Credential>,
}

impl Debug for ObsCore {
Expand All @@ -62,40 +59,26 @@ impl Debug for ObsCore {
}

impl ObsCore {
async fn load_credential(&self) -> Result<Option<HuaweicloudObsCredential>> {
let cred = self
.loader
.load()
.await
.map_err(new_request_credential_error)?;

if let Some(cred) = cred {
Ok(Some(cred))
} else {
Ok(None)
}
}
pub async fn sign<T>(&self, req: Request<T>) -> Result<Request<T>> {
let (mut parts, body) = req.into_parts();

pub async fn sign<T>(&self, req: &mut Request<T>) -> Result<()> {
let cred = if let Some(cred) = self.load_credential().await? {
cred
} else {
return Ok(());
};
self.signer
.sign(&mut parts, None)
.await
.map_err(|e| new_request_sign_error(e.into()))?;

self.signer.sign(req, &cred).map_err(new_request_sign_error)
Ok(Request::from_parts(parts, body))
}

pub async fn sign_query<T>(&self, req: &mut Request<T>, duration: Duration) -> Result<()> {
let cred = if let Some(cred) = self.load_credential().await? {
cred
} else {
return Ok(());
};
pub async fn sign_query<T>(&self, req: Request<T>, duration: Duration) -> Result<Request<T>> {
let (mut parts, body) = req.into_parts();

self.signer
.sign_query(req, duration, &cred)
.map_err(new_request_sign_error)
.sign(&mut parts, Some(duration))
.await
.map_err(|e| new_request_sign_error(e.into()))?;

Ok(Request::from_parts(parts, body))
}

#[inline]
Expand All @@ -111,9 +94,9 @@ impl ObsCore {
range: BytesRange,
args: &OpRead,
) -> Result<Response<HttpBody>> {
let mut req = self.obs_get_object_request(path, range, args)?;
let req = self.obs_get_object_request(path, range, args)?;

self.sign(&mut req).await?;
let req = self.sign(req).await?;

self.info.http_client().fetch(req).await
}
Expand Down Expand Up @@ -190,9 +173,9 @@ impl ObsCore {
}

pub async fn obs_head_object(&self, path: &str, args: &OpStat) -> Result<Response<Buffer>> {
let mut req = self.obs_head_object_request(path, args)?;
let req = self.obs_head_object_request(path, args)?;

self.sign(&mut req).await?;
let req = self.sign(req).await?;

self.send(req).await
}
Expand Down Expand Up @@ -230,12 +213,12 @@ impl ObsCore {

let req = Request::delete(&url);

let mut req = req
let req = req
.extension(Operation::Delete)
.body(Buffer::new())
.map_err(new_request_build_error)?;

self.sign(&mut req).await?;
let req = self.sign(req).await?;

self.send(req).await
}
Expand Down Expand Up @@ -287,13 +270,13 @@ impl ObsCore {
let source = format!("/{}/{}", self.bucket, percent_encode_path(&source));
let url = format!("{}/{}", self.endpoint, percent_encode_path(&target));

let mut req = Request::put(&url)
let req = Request::put(&url)
.extension(Operation::Copy)
.header("x-obs-copy-source", &source)
.body(Buffer::new())
.map_err(new_request_build_error)?;

self.sign(&mut req).await?;
let req = self.sign(req).await?;

self.send(req).await
}
Expand Down Expand Up @@ -321,12 +304,12 @@ impl ObsCore {
url = url.push("marker", next_marker);
}

let mut req = Request::get(url.finish())
let req = Request::get(url.finish())
.extension(Operation::List)
.body(Buffer::new())
.map_err(new_request_build_error)?;

self.sign(&mut req).await?;
let req = self.sign(req).await?;

self.send(req).await
}
Expand All @@ -344,12 +327,12 @@ impl ObsCore {
req = req.header(CONTENT_TYPE, mime)
}

let mut req = req
let req = req
.extension(Operation::Write)
.body(Buffer::new())
.map_err(new_request_build_error)?;

self.sign(&mut req).await?;
let req = self.sign(req).await?;

self.send(req).await
}
Expand Down Expand Up @@ -377,13 +360,13 @@ impl ObsCore {
req = req.header(CONTENT_LENGTH, size);
}

let mut req = req
let req = req
.extension(Operation::Write)
// Set body
.body(body)
.map_err(new_request_build_error)?;

self.sign(&mut req).await?;
let req = self.sign(req).await?;

self.send(req).await
}
Expand Down Expand Up @@ -413,12 +396,12 @@ impl ObsCore {
// Set content-type to `application/xml` to avoid mixed with form post.
let req = req.header(CONTENT_TYPE, "application/xml");

let mut req = req
let req = req
.extension(Operation::Write)
.body(Buffer::from(Bytes::from(content)))
.map_err(new_request_build_error)?;

self.sign(&mut req).await?;
let req = self.sign(req).await?;
self.send(req).await
}

Expand All @@ -437,12 +420,12 @@ impl ObsCore {
percent_encode_path(upload_id)
);

let mut req = Request::delete(&url)
let req = Request::delete(&url)
.extension(Operation::Write)
.body(Buffer::new())
.map_err(new_request_build_error)?;

self.sign(&mut req).await?;
let req = self.sign(req).await?;
self.send(req).await
}
}
Expand Down Expand Up @@ -500,7 +483,7 @@ pub struct CompleteMultipartUploadRequestPart {

/// Output of `CompleteMultipartUpload` operation
#[derive(Debug, Default, Deserialize)]
#[serde[default, rename_all = "PascalCase"]]
#[serde(default, rename_all = "PascalCase")]
pub struct CompleteMultipartUploadResult {
pub location: String,
pub bucket: String,
Expand Down
Loading
Loading