@@ -5,34 +5,122 @@ use crate::ResourceQuota;
5
5
use azure_core:: collect_pinned_stream;
6
6
use azure_core:: headers:: { continuation_token_from_headers_optional, session_token_from_headers} ;
7
7
use azure_core:: prelude:: * ;
8
- use azure_core:: Request as HttpRequest ;
9
8
use azure_core:: Response as HttpResponse ;
10
9
use chrono:: { DateTime , Utc } ;
10
+ use futures:: stream:: unfold;
11
+ use futures:: Stream ;
11
12
13
+ /// Macro for short cutting a stream on error
14
+ macro_rules! r#try {
15
+ ( $expr: expr $( , ) ?) => {
16
+ match $expr {
17
+ Result :: Ok ( val) => val,
18
+ Result :: Err ( err) => {
19
+ return Some ( ( Err ( err. into( ) ) , State :: Done ) ) ;
20
+ }
21
+ }
22
+ } ;
23
+ }
24
+
25
+ /// Stream state
26
+ #[ derive( Debug , Clone , PartialEq ) ]
27
+ enum State {
28
+ Init ,
29
+ Continuation ( String ) ,
30
+ Done ,
31
+ }
12
32
#[ derive( Debug , Clone ) ]
13
- pub struct ListCollectionsOptions {
33
+ pub struct ListCollectionsBuilder {
34
+ client : DatabaseClient ,
14
35
consistency_level : Option < ConsistencyLevel > ,
15
36
max_item_count : MaxItemCount ,
37
+ context : Context ,
16
38
}
17
39
18
- impl ListCollectionsOptions {
19
- pub fn new ( ) -> Self {
40
+ impl ListCollectionsBuilder {
41
+ pub ( crate ) fn new ( client : DatabaseClient ) -> Self {
20
42
Self {
43
+ client,
21
44
max_item_count : MaxItemCount :: new ( -1 ) ,
22
45
consistency_level : None ,
46
+ context : Context :: new ( ) ,
23
47
}
24
48
}
25
49
26
50
setters ! {
27
51
consistency_level: ConsistencyLevel => Some ( consistency_level) ,
28
52
max_item_count: i32 => MaxItemCount :: new( max_item_count) ,
53
+ context: Context => context,
29
54
}
30
55
31
- pub fn decorate_request ( & self , request : & mut HttpRequest ) -> crate :: Result < ( ) > {
32
- azure_core:: headers:: add_optional_header2 ( & self . consistency_level , request) ?;
33
- azure_core:: headers:: add_mandatory_header2 ( & self . max_item_count , request) ?;
56
+ pub fn into_stream (
57
+ self ,
58
+ ) -> impl Stream < Item = crate :: Result < ListCollectionsResponse > > + ' static {
59
+ unfold ( State :: Init , move |state : State | {
60
+ let this = self . clone ( ) ;
61
+ let ctx = self . context . clone ( ) ;
62
+ async move {
63
+ let response = match state {
64
+ State :: Init => {
65
+ let mut request = this. client . cosmos_client ( ) . prepare_request_pipeline (
66
+ & format ! ( "dbs/{}/colls" , this. client. database_name( ) ) ,
67
+ http:: Method :: GET ,
68
+ ) ;
69
+
70
+ r#try ! ( azure_core:: headers:: add_optional_header2(
71
+ & this. consistency_level,
72
+ & mut request,
73
+ ) ) ;
74
+ r#try ! ( azure_core:: headers:: add_mandatory_header2(
75
+ & this. max_item_count,
76
+ & mut request,
77
+ ) ) ;
78
+ let response = r#try ! (
79
+ this. client
80
+ . pipeline( )
81
+ . send( ctx. clone( ) . insert( ResourceType :: Collections ) , & mut request)
82
+ . await
83
+ ) ;
84
+ ListCollectionsResponse :: try_from ( response) . await
85
+ }
86
+ State :: Continuation ( continuation_token) => {
87
+ let continuation = Continuation :: new ( continuation_token. as_str ( ) ) ;
88
+ let mut request = this. client . cosmos_client ( ) . prepare_request_pipeline (
89
+ & format ! ( "dbs/{}/colls" , this. client. database_name( ) ) ,
90
+ http:: Method :: GET ,
91
+ ) ;
92
+
93
+ r#try ! ( azure_core:: headers:: add_optional_header2(
94
+ & this. consistency_level,
95
+ & mut request,
96
+ ) ) ;
97
+ r#try ! ( azure_core:: headers:: add_mandatory_header2(
98
+ & this. max_item_count,
99
+ & mut request,
100
+ ) ) ;
101
+ r#try ! ( continuation. add_as_header2( & mut request) ) ;
102
+ let response = r#try ! (
103
+ this. client
104
+ . pipeline( )
105
+ . send( ctx. clone( ) . insert( ResourceType :: Collections ) , & mut request)
106
+ . await
107
+ ) ;
108
+ ListCollectionsResponse :: try_from ( response) . await
109
+ }
110
+ State :: Done => return None ,
111
+ } ;
34
112
35
- Ok ( ( ) )
113
+ let response = r#try ! ( response) ;
114
+
115
+ let next_state = response
116
+ . continuation_token
117
+ . clone ( )
118
+ . map ( State :: Continuation )
119
+ . unwrap_or ( State :: Done ) ;
120
+
121
+ Some ( ( Ok ( response) , next_state) )
122
+ }
123
+ } )
36
124
}
37
125
}
38
126
@@ -90,3 +178,16 @@ impl ListCollectionsResponse {
90
178
} )
91
179
}
92
180
}
181
+
182
+ /// The future returned by calling `into_future` on the builder.
183
+ pub type ListCollections =
184
+ futures:: future:: BoxFuture < ' static , crate :: Result < ListCollectionsResponse > > ;
185
+
186
+ #[ cfg( feature = "into_future" ) ]
187
+ impl std:: future:: IntoFuture for ListCollectionsBuilder {
188
+ type Future = ListCollections ;
189
+ type Output = <ListCollections as std:: future:: Future >:: Output ;
190
+ fn into_future ( self ) -> Self :: Future {
191
+ Self :: into_future ( self )
192
+ }
193
+ }
0 commit comments