Skip to content

Commit ec9cc59

Browse files
authored
add "create path" operation (files only for now) to Azure Storage data_lake (using Pipeline architecture) (#358)
* Ignore IDEA files. * Very rough draft. * Fix compiler error: align types of pipeline and pipeline_context. * Fix compiler error: align types of pipeline and pipeline_context. * Fix compiler error around request builder. * Match create_collection implementation. * Fix error conversion. * Fix data lake example. * Format code. * Fix typo. * Test path create. Failing... * Add missing content length header. * Clean up comment. * Remove commented code. Add timestamp to file system name. Rename variables. Fix newlines. Don't panic when create path fails. * WIP * Fail e2e test if create path did not work, while still deleting file system. * Use actual bearer token in auth header. * Use actual bearer token. * Code format. * Remove unneeded bearer token reference from DataLakeClient. * Print debug info. * Send proper auth header. * Code format. * Code format. * Adopt new way to do options. * Remove TODO comment. * No need to set body to empty byte array. * Remove comment. * Fix env var names. * Improve create_path response headers. * Ignore batch files. * Add assertions to e2e test. * Remove unused. * Format code. Fix bug. * Consolidate two examples into one. Align example with e2e test. * Add step to e2e test and example that fails to create file that already exists. * Add mock testing framework support code. But can't switch e2e test until all operations are using pipeline architecture. * Introduce actual DataLakeContext.
1 parent 495e085 commit ec9cc59

File tree

14 files changed

+518
-69
lines changed

14 files changed

+518
-69
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,6 @@ sample/
1616
__azurite*.json
1717
__blobstorage__
1818
__queuestorage__
19+
.idea/
20+
*.iml
21+
*.bat
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
use crate::AddAsHeader;
2+
use http::request::Builder;
3+
4+
#[derive(Debug, Clone, Copy)]
5+
pub struct ContentLength(i32);
6+
7+
impl ContentLength {
8+
/// Create a new `ContentLength`
9+
pub fn new(count: i32) -> Self {
10+
Self(count)
11+
}
12+
}
13+
14+
impl AddAsHeader for ContentLength {
15+
fn add_as_header(&self, builder: Builder) -> Builder {
16+
if self.0 <= 0 {
17+
builder.header(http::header::CONTENT_LENGTH, -1)
18+
} else {
19+
builder.header(http::header::CONTENT_LENGTH, self.0)
20+
}
21+
}
22+
23+
fn add_as_header2(
24+
&self,
25+
request: &mut crate::Request,
26+
) -> Result<(), crate::errors::HTTPHeaderError> {
27+
if self.0 >= 0 {
28+
let (header_name, header_value) = (http::header::CONTENT_LENGTH, self.0);
29+
request
30+
.headers_mut()
31+
.append(header_name, http::HeaderValue::from(header_value));
32+
};
33+
34+
Ok(())
35+
}
36+
}

sdk/core/src/request_options/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ mod client_request_id;
44
mod content_disposition;
55
mod content_encoding;
66
mod content_language;
7+
mod content_length;
78
mod content_range;
89
mod content_type;
910
mod continuation;
@@ -35,6 +36,7 @@ pub use client_request_id::ClientRequestId;
3536
pub use content_disposition::ContentDisposition;
3637
pub use content_encoding::ContentEncoding;
3738
pub use content_language::ContentLanguage;
39+
pub use content_length::ContentLength;
3840
pub use content_range::ContentRange;
3941
pub use content_type::ContentType;
4042
pub use continuation::Continuation;

sdk/storage/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ categories = ["api-bindings"]
1313
edition = "2018"
1414

1515
[dependencies]
16+
async-trait = "0.1"
1617
azure_core = { path = "../core", version = "0.1.0" }
1718
ring = "0.16"
1819
base64 = "0.13"
@@ -41,6 +42,7 @@ reqwest = "0.11"
4142
[features]
4243
default = ["account", "blob", "queue", "table", "data_lake"]
4344
test_e2e = ["account", "blob", "queue", "table", "data_lake"]
45+
mock_transport_framework = [ "azure_core/mock_transport_framework"]
4446
test_integration = ["account", "blob", "queue", "table", "data_lake"]
4547
account = []
4648
azurite_workaround = []

sdk/storage/examples/data_lake_00.rs

Lines changed: 88 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,68 +1,127 @@
1+
use azure_core::prelude::IfMatchCondition;
12
use azure_core::prelude::*;
3+
use azure_identity::token_credentials::DefaultCredential;
4+
use azure_identity::token_credentials::TokenCredential;
25
use azure_storage::core::prelude::*;
36
use azure_storage::data_lake::prelude::*;
7+
use chrono::Utc;
48
use futures::stream::StreamExt;
59
use std::error::Error;
610
use std::num::NonZeroU32;
711

812
#[tokio::main]
913
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
10-
// First we retrieve the account name and master key from environment variables.
11-
let account = std::env::var("ADSL_STORAGE_ACCOUNT")
12-
.expect("Set env variable ADSL_STORAGE_ACCOUNT first!");
13-
let master_key = std::env::var("ADSL_STORAGE_MASTER_KEY")
14-
.expect("Set env variable ADSL_STORAGE_MASTER_KEY first!");
14+
let account = std::env::var("ADLSGEN2_STORAGE_ACCOUNT")
15+
.expect("Set env variable ADLSGEN2_STORAGE_ACCOUNT first!");
16+
let master_key = std::env::var("ADLSGEN2_STORAGE_MASTER_KEY")
17+
.expect("Set env variable ADLSGEN2_STORAGE_MASTER_KEY first!");
1518

16-
let file_system_name = std::env::args()
17-
.nth(1)
18-
.expect("please specify the file system name as first parameter");
19+
let now = Utc::now();
20+
let file_system_name = format!("azurerustsdk-datalake-example-{}", now.timestamp());
1921

2022
let http_client = new_http_client();
2123

2224
let storage_account_client =
2325
StorageAccountClient::new_access_key(http_client.clone(), &account, &master_key);
2426

27+
let resource_id = "https://storage.azure.com/";
28+
println!("getting bearer token for '{}'...", resource_id);
29+
let bearer_token = DefaultCredential::default().get_token(resource_id).await?;
30+
println!("token expires on {}", bearer_token.expires_on);
31+
println!();
32+
2533
let data_lake = storage_account_client
2634
.as_storage_client()
27-
.as_data_lake_client(account)?;
35+
.as_data_lake_client(account, bearer_token.token.secret().to_owned())?;
36+
37+
let file_system = data_lake.as_file_system_client(&file_system_name)?;
2838

29-
let file_system = data_lake.as_file_system_client(file_system_name)?;
39+
let mut fs_properties = Properties::new();
40+
fs_properties.insert("AddedVia", "Azure SDK for Rust");
3041

31-
// let's add some metadata. We call them "properties" to be consistent with the REST API definition from
32-
// [https://docs.microsoft.com/rest/api/storageservices/datalakestoragegen2/filesystem/create#request-headers](https://docs.microsoft.com/rest/api/storageservices/datalakestoragegen2/filesystem/create#request-headers)
33-
let mut properties = Properties::new();
34-
properties.insert("AddedVia", "Azure SDK for Rust");
35-
properties.insert("CreatedAt", chrono::Utc::now().to_string());
36-
let response = file_system
42+
println!("creating file system '{}'...", &file_system_name);
43+
let create_fs_response = file_system
3744
.create()
38-
.properties(&properties)
45+
.properties(&fs_properties)
3946
.execute()
4047
.await?;
41-
println!("repsonse == {:?}", response);
48+
println!("create file system response == {:?}", create_fs_response);
49+
println!();
4250

51+
println!("listing file systems...");
4352
let mut stream = Box::pin(
4453
data_lake
4554
.list()
4655
.max_results(NonZeroU32::new(3).unwrap())
4756
.stream(),
4857
);
49-
50-
while let Some(response) = stream.next().await {
51-
println!("response == {:?}\n\n", response);
58+
while let Some(list_fs_response) = stream.next().await {
59+
println!("list file system response == {:?}", list_fs_response);
60+
println!();
5261
}
5362

54-
properties.insert("ModifiedBy", "Iota");
55-
let response = file_system
56-
.set_properties(Some(&properties))
63+
println!("getting file system properties...");
64+
let get_fs_props_response = file_system.get_properties().execute().await?;
65+
println!(
66+
"get file system properties response == {:?}",
67+
get_fs_props_response
68+
);
69+
println!();
70+
71+
let file_name = "example-file.txt";
72+
73+
println!("creating path '{}'...", file_name);
74+
let create_path_response = file_system
75+
.create_path(Context::default(), file_name, CreatePathOptions::default())
76+
.await?;
77+
println!("create path response == {:?}", create_path_response);
78+
println!();
79+
80+
println!("creating path '{}' (overwrite)...", file_name);
81+
let create_path_response = file_system
82+
.create_path(Context::default(), file_name, CreatePathOptions::default())
83+
.await?;
84+
println!("create path response == {:?}", create_path_response);
85+
println!();
86+
87+
println!("creating path '{}' (do not overwrite)...", file_name);
88+
let do_not_overwrite =
89+
CreatePathOptions::new().if_match_condition(IfMatchCondition::NotMatch("*"));
90+
let create_path_result = file_system
91+
.create_path(Context::default(), file_name, do_not_overwrite)
92+
.await;
93+
println!(
94+
"create path result (should fail) == {:?}",
95+
create_path_result
96+
);
97+
println!();
98+
99+
println!("setting file system properties...");
100+
fs_properties.insert("ModifiedBy", "Iota");
101+
let set_fs_props_response = file_system
102+
.set_properties(Some(&fs_properties))
57103
.execute()
58104
.await?;
59-
println!("response == {:?}\n\n", response);
105+
println!(
106+
"set file system properties response == {:?}",
107+
set_fs_props_response
108+
);
109+
println!();
110+
111+
println!("getting file system properties...");
112+
let get_fs_props_response = file_system.get_properties().execute().await?;
113+
println!(
114+
"get file system properties response == {:?}",
115+
get_fs_props_response
116+
);
117+
println!();
60118

61-
let response = file_system.get_properties().execute().await?;
62-
println!("response == {:?}\n\n", response);
119+
println!("deleting file system...");
120+
let delete_fs_response = file_system.delete().execute().await?;
121+
println!("delete file system response == {:?}", delete_fs_response);
122+
println!();
63123

64-
let response = file_system.delete().execute().await?;
65-
println!("response == {:?}\n\n", response);
124+
println!("data lake example done.");
66125

67126
Ok(())
68127
}

sdk/storage/src/core/errors.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,14 @@ pub enum Error {
6767
CRC64Not8BytesLong(usize),
6868
#[error("At least one of these headers must be present: {0:?}")]
6969
HeadersNotFound(Vec<String>),
70+
#[error("error writing the header value: {0}")]
71+
InvalidHeaderValue(#[from] azure_core::HTTPHeaderError),
72+
}
73+
74+
impl From<azure_core::HttpError> for Error {
75+
fn from(error: azure_core::HttpError) -> Self {
76+
Self::CoreError(azure_core::Error::HttpError(error))
77+
}
7078
}
7179

7280
#[non_exhaustive]
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
use azure_core::{PipelineContext, Policy, PolicyResult, Request, Response};
2+
use http::header::AUTHORIZATION;
3+
use http::HeaderValue;
4+
use std::sync::Arc;
5+
6+
#[derive(Debug, Clone, PartialEq, Eq)]
7+
pub struct AuthorizationPolicy {
8+
bearer_token: String,
9+
}
10+
11+
impl AuthorizationPolicy {
12+
pub(crate) fn new(bearer_token: String) -> Self {
13+
Self { bearer_token }
14+
}
15+
}
16+
17+
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
18+
pub(crate) struct DataLakeContext {}
19+
20+
#[async_trait::async_trait]
21+
impl Policy<DataLakeContext> for AuthorizationPolicy {
22+
async fn send(
23+
&self,
24+
ctx: &mut PipelineContext<DataLakeContext>,
25+
request: &mut Request,
26+
next: &[Arc<dyn Policy<DataLakeContext>>],
27+
) -> PolicyResult<Response> {
28+
let auth_header_value = format!("Bearer {}", &self.bearer_token);
29+
30+
request
31+
.headers_mut()
32+
.append(AUTHORIZATION, HeaderValue::from_str(&auth_header_value)?);
33+
34+
// now next[0] is safe (will not panic) because we checked
35+
// at the beginning of the function.
36+
next[0].send(ctx, request, &next[1..]).await
37+
}
38+
}

0 commit comments

Comments
 (0)