Skip to content

Commit edd64a2

Browse files
authored
Finish Cosmos database client (#455)
* Do delete database operation * Move list_collections to pipeline * Add test for listing collection * Format recorded request/responses * Delete database in the test
1 parent 4655918 commit edd64a2

27 files changed

+473
-377
lines changed

sdk/cosmos/examples/database_01.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use azure_core::Context;
22
use azure_cosmos::prelude::*;
3+
use futures::stream::StreamExt;
34
use std::error::Error;
45

56
#[tokio::main]
@@ -17,7 +18,11 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
1718
let database_client = client.into_database_client("pollo");
1819
println!("database_name == {}", database_client.database_name());
1920

20-
let collections = database_client.list_collections().execute().await?;
21+
let collections =
22+
Box::pin(database_client.list_collections(Context::new(), ListCollectionsOptions::new()))
23+
.next()
24+
.await
25+
.unwrap()?;
2126
println!("collections == {:#?}", collections);
2227

2328
let collection_client = database_client.into_collection_client("cnt");

sdk/cosmos/src/clients/cosmos_client.rs

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,16 @@ impl CosmosClient {
307307
time,
308308
)
309309
};
310-
self.prepare_request_with_signature(uri_path, http_method, time, &auth)
310+
trace!("prepare_request::auth == {:?}", auth);
311+
let uri = format!("{}/{}", self.cloud_location.url(), uri_path);
312+
debug!("building request. uri: {}", uri);
313+
314+
RequestBuilder::new()
315+
.method(http_method)
316+
.uri(uri)
317+
.header(HEADER_DATE, time.to_string())
318+
.header(HEADER_VERSION, HeaderValue::from_static(AZURE_VERSION))
319+
.header(header::AUTHORIZATION, auth)
311320
}
312321

313322
/// Prepares' an `azure_core::Request`. This function will
@@ -332,28 +341,6 @@ impl CosmosClient {
332341
.into()
333342
}
334343

335-
fn prepare_request_with_signature(
336-
&self,
337-
uri_path: &str,
338-
http_method: http::Method,
339-
time_nonce: TimeNonce,
340-
signature: &str,
341-
) -> RequestBuilder {
342-
trace!("prepare_request::auth == {:?}", signature);
343-
let uri = format!("{}/{}", self.cloud_location.url(), uri_path);
344-
debug!(
345-
"cosmos::client::prepare_request_with_resource_signature::uri == {:?}",
346-
uri
347-
);
348-
349-
RequestBuilder::new()
350-
.method(http_method)
351-
.uri(uri)
352-
.header(HEADER_DATE, time_nonce.to_string())
353-
.header(HEADER_VERSION, HeaderValue::from_static(AZURE_VERSION))
354-
.header(header::AUTHORIZATION, signature)
355-
}
356-
357344
pub(crate) fn pipeline(&self) -> &Pipeline<CosmosContext> {
358345
&self.pipeline
359346
}

sdk/cosmos/src/clients/database_client.rs

Lines changed: 104 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,33 @@ use super::*;
22
use crate::authorization_policy::CosmosContext;
33
use crate::operations::*;
44
use crate::resources::ResourceType;
5-
use crate::{requests, ReadonlyString};
5+
use crate::ReadonlyString;
66
use azure_core::pipeline::Pipeline;
77
use azure_core::prelude::Continuation;
8-
use azure_core::{AddAsHeader, Context, HttpClient, PipelineContext};
8+
use azure_core::{AddAsHeader, Context, PipelineContext};
99
use futures::stream::unfold;
1010
use futures::Stream;
1111

12+
/// Macro for short cutting a stream on error
13+
macro_rules! r#try {
14+
($expr:expr $(,)?) => {
15+
match $expr {
16+
Result::Ok(val) => val,
17+
Result::Err(err) => {
18+
return Some((Err(err.into()), State::Done));
19+
}
20+
}
21+
};
22+
}
23+
24+
/// Stream state
25+
#[derive(Debug, Clone, PartialEq)]
26+
enum State {
27+
Init,
28+
Continuation(String),
29+
Done,
30+
}
31+
1232
/// A client for Cosmos database resources.
1333
#[derive(Debug, Clone)]
1434
pub struct DatabaseClient {
@@ -59,14 +79,90 @@ impl DatabaseClient {
5979
Ok(GetDatabaseResponse::try_from(response).await?)
6080
}
6181

62-
/// List collections in the database
63-
pub fn list_collections(&self) -> requests::ListCollectionsBuilder<'_> {
64-
requests::ListCollectionsBuilder::new(self)
82+
/// Delete the database
83+
pub async fn delete_database(
84+
&self,
85+
ctx: Context,
86+
options: DeleteDatabaseOptions,
87+
) -> crate::Result<DeleteDatabaseResponse> {
88+
let mut request = self
89+
.cosmos_client()
90+
.prepare_request_pipeline(&format!("dbs/{}", self.database_name()), http::Method::GET);
91+
let mut pipeline_context = PipelineContext::new(ctx, ResourceType::Databases.into());
92+
93+
options.decorate_request(&mut request)?;
94+
let response = self
95+
.pipeline()
96+
.send(&mut pipeline_context, &mut request)
97+
.await?
98+
.validate(http::StatusCode::OK)
99+
.await?;
100+
101+
Ok(DeleteDatabaseResponse::try_from(response).await?)
65102
}
66103

67-
/// Delete the database
68-
pub fn delete_database(&self) -> requests::DeleteDatabaseBuilder<'_> {
69-
requests::DeleteDatabaseBuilder::new(self)
104+
/// List collections in the database
105+
pub fn list_collections(
106+
&self,
107+
ctx: Context,
108+
options: ListCollectionsOptions,
109+
) -> impl Stream<Item = crate::Result<ListCollectionsResponse>> + '_ {
110+
unfold(State::Init, move |state: State| {
111+
let this = self.clone();
112+
let ctx = ctx.clone();
113+
let options = options.clone();
114+
async move {
115+
let response = match state {
116+
State::Init => {
117+
let mut request = this.cosmos_client().prepare_request_pipeline(
118+
&format!("dbs/{}/colls", this.database_name()),
119+
http::Method::GET,
120+
);
121+
let mut pipeline_context =
122+
PipelineContext::new(ctx.clone(), ResourceType::Collections.into());
123+
124+
r#try!(options.decorate_request(&mut request));
125+
let response = r#try!(
126+
this.pipeline()
127+
.send(&mut pipeline_context, &mut request)
128+
.await
129+
);
130+
let response = r#try!(response.validate(http::StatusCode::OK).await);
131+
ListCollectionsResponse::try_from(response).await
132+
}
133+
State::Continuation(continuation_token) => {
134+
let continuation = Continuation::new(continuation_token.as_str());
135+
let mut request = this.cosmos_client().prepare_request_pipeline(
136+
&format!("dbs/{}/colls", self.database_name()),
137+
http::Method::GET,
138+
);
139+
let mut pipeline_context =
140+
PipelineContext::new(ctx.clone(), ResourceType::Collections.into());
141+
142+
r#try!(options.decorate_request(&mut request));
143+
r#try!(continuation.add_as_header2(&mut request));
144+
let response = r#try!(
145+
this.pipeline()
146+
.send(&mut pipeline_context, &mut request)
147+
.await
148+
);
149+
let response = r#try!(response.validate(http::StatusCode::OK).await);
150+
ListCollectionsResponse::try_from(response).await
151+
}
152+
State::Done => return None,
153+
};
154+
155+
let response = r#try!(response);
156+
157+
let next_state = response
158+
.continuation_token
159+
.clone()
160+
.map(State::Continuation)
161+
.unwrap_or(State::Done);
162+
163+
Some((Ok(response), next_state))
164+
}
165+
})
70166
}
71167

72168
/// Create a collection
@@ -99,24 +195,6 @@ impl DatabaseClient {
99195
ctx: Context,
100196
options: ListUsersOptions,
101197
) -> impl Stream<Item = crate::Result<ListUsersResponse>> + '_ {
102-
macro_rules! r#try {
103-
($expr:expr $(,)?) => {
104-
match $expr {
105-
Result::Ok(val) => val,
106-
Result::Err(err) => {
107-
return Some((Err(err.into()), State::Done));
108-
}
109-
}
110-
};
111-
}
112-
113-
#[derive(Debug, Clone, PartialEq)]
114-
enum State {
115-
Init,
116-
Continuation(String),
117-
Done,
118-
}
119-
120198
unfold(State::Init, move |state: State| {
121199
let this = self.clone();
122200
let ctx = ctx.clone();
@@ -188,21 +266,6 @@ impl DatabaseClient {
188266
UserClient::new(self, user_name)
189267
}
190268

191-
pub(crate) fn prepare_request_with_database_name(
192-
&self,
193-
method: http::Method,
194-
) -> http::request::Builder {
195-
self.cosmos_client().prepare_request(
196-
&format!("dbs/{}", self.database_name()),
197-
method,
198-
ResourceType::Databases,
199-
)
200-
}
201-
202-
pub(crate) fn http_client(&self) -> &dyn HttpClient {
203-
self.cosmos_client().http_client()
204-
}
205-
206269
fn pipeline(&self) -> &Pipeline<CosmosContext> {
207270
self.cosmos_client.pipeline()
208271
}

sdk/cosmos/src/responses/delete_database_response.rs renamed to sdk/cosmos/src/operations/delete_database.rs

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,31 @@
11
use crate::headers::from_headers::*;
2+
use crate::prelude::*;
23
use crate::ResourceQuota;
34
use azure_core::headers::session_token_from_headers;
4-
use http::response::Response;
5+
use azure_core::Request as HttpRequest;
6+
use azure_core::Response as HttpResponse;
7+
8+
#[derive(Debug, Clone)]
9+
pub struct DeleteDatabaseOptions {
10+
consistency_level: Option<ConsistencyLevel>,
11+
}
12+
13+
impl DeleteDatabaseOptions {
14+
pub fn new() -> Self {
15+
Self {
16+
consistency_level: None,
17+
}
18+
}
19+
20+
setters! {
21+
consistency_level: ConsistencyLevel => Some(consistency_level),
22+
}
23+
24+
pub fn decorate_request(&self, request: &mut HttpRequest) -> crate::Result<()> {
25+
azure_core::headers::add_optional_header2(&self.consistency_level, request)?;
26+
Ok(())
27+
}
28+
}
529

630
#[derive(Debug, Clone)]
731
pub struct DeleteDatabaseResponse {
@@ -12,10 +36,8 @@ pub struct DeleteDatabaseResponse {
1236
pub resource_usage: Vec<ResourceQuota>,
1337
}
1438

15-
impl std::convert::TryFrom<Response<bytes::Bytes>> for DeleteDatabaseResponse {
16-
type Error = crate::Error;
17-
18-
fn try_from(response: Response<bytes::Bytes>) -> Result<Self, Self::Error> {
39+
impl DeleteDatabaseResponse {
40+
pub async fn try_from(response: HttpResponse) -> crate::Result<Self> {
1941
let headers = response.headers();
2042

2143
let charge = request_charge_from_headers(headers)?;
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
use crate::headers::from_headers::*;
2+
use crate::prelude::*;
3+
use crate::resources::Collection;
4+
use crate::ResourceQuota;
5+
use azure_core::collect_pinned_stream;
6+
use azure_core::headers::{continuation_token_from_headers_optional, session_token_from_headers};
7+
use azure_core::prelude::*;
8+
use azure_core::Request as HttpRequest;
9+
use azure_core::Response as HttpResponse;
10+
use chrono::{DateTime, Utc};
11+
12+
#[derive(Debug, Clone)]
13+
pub struct ListCollectionsOptions {
14+
consistency_level: Option<ConsistencyLevel>,
15+
max_item_count: MaxItemCount,
16+
}
17+
18+
impl ListCollectionsOptions {
19+
pub fn new() -> Self {
20+
Self {
21+
max_item_count: MaxItemCount::new(-1),
22+
consistency_level: None,
23+
}
24+
}
25+
26+
setters! {
27+
consistency_level: ConsistencyLevel => Some(consistency_level),
28+
max_item_count: i32 => MaxItemCount::new(max_item_count),
29+
}
30+
31+
pub fn decorate_request(&self, request: &mut HttpRequest) -> crate::Result<()> {
32+
azure_core::headers::add_optional_header2(&self.consistency_level, request)?;
33+
azure_core::headers::add_mandatory_header2(&self.max_item_count, request)?;
34+
35+
Ok(())
36+
}
37+
}
38+
39+
#[derive(Debug, Clone, PartialEq)]
40+
pub struct ListCollectionsResponse {
41+
pub rid: String,
42+
pub collections: Vec<Collection>,
43+
pub count: u32,
44+
pub last_state_change: DateTime<Utc>,
45+
pub resource_quota: Vec<ResourceQuota>,
46+
pub resource_usage: Vec<ResourceQuota>,
47+
pub schema_version: String,
48+
pub alt_content_path: String,
49+
pub content_path: String,
50+
pub charge: f64,
51+
pub service_version: String,
52+
pub activity_id: uuid::Uuid,
53+
pub session_token: String,
54+
pub gateway_version: String,
55+
pub continuation_token: Option<String>,
56+
}
57+
58+
impl ListCollectionsResponse {
59+
pub async fn try_from(response: HttpResponse) -> crate::Result<Self> {
60+
let (_status_code, headers, pinned_stream) = response.deconstruct();
61+
let body = collect_pinned_stream(pinned_stream).await?;
62+
63+
#[derive(Deserialize, Debug)]
64+
pub struct Response {
65+
_rid: String,
66+
#[serde(rename = "DocumentCollections")]
67+
pub collections: Vec<Collection>,
68+
#[serde(rename = "_count")]
69+
pub count: u32,
70+
}
71+
72+
let response: Response = serde_json::from_slice(&*body)?;
73+
74+
Ok(Self {
75+
rid: response._rid,
76+
collections: response.collections,
77+
count: response.count,
78+
last_state_change: last_state_change_from_headers(&headers)?,
79+
resource_quota: resource_quota_from_headers(&headers)?,
80+
resource_usage: resource_usage_from_headers(&headers)?,
81+
schema_version: schema_version_from_headers(&headers)?.to_owned(),
82+
alt_content_path: alt_content_path_from_headers(&headers)?.to_owned(),
83+
content_path: content_path_from_headers(&headers)?.to_owned(),
84+
charge: request_charge_from_headers(&headers)?,
85+
service_version: service_version_from_headers(&headers)?.to_owned(),
86+
activity_id: activity_id_from_headers(&headers)?,
87+
session_token: session_token_from_headers(&headers)?,
88+
gateway_version: gateway_version_from_headers(&headers)?.to_owned(),
89+
continuation_token: continuation_token_from_headers_optional(&headers)?,
90+
})
91+
}
92+
}

0 commit comments

Comments
 (0)