1313// limitations under the License.
1414
1515use std:: collections:: HashMap ;
16+ use std:: sync:: Arc ;
1617use std:: time:: Duration ;
1718use std:: time:: Instant ;
1819
1920use reqwest:: header:: HeaderMap ;
2021use reqwest:: header:: HeaderValue ;
2122use reqwest:: Client ;
2223use reqwest:: ClientBuilder ;
23- use reqwest:: Response ;
2424use serde:: Deserialize ;
2525use sqllogictest:: DBOutput ;
2626use sqllogictest:: DefaultColumnType ;
2727
28- use crate :: error :: DSqlLogicTestError :: Databend ;
28+ use crate :: client :: global_cookie_store :: GlobalCookieStore ;
2929use crate :: error:: Result ;
3030use crate :: util:: parser_rows;
3131use crate :: util:: HttpSessionConf ;
3232
33- const SESSION_HEADER : & str = "X-DATABEND-SESSION" ;
34-
3533pub struct HttpClient {
3634 pub client : Client ,
3735 pub session_token : String ,
38- pub session_headers : HeaderMap ,
3936 pub debug : bool ,
4037 pub session : Option < HttpSessionConf > ,
4138 pub port : u16 ,
@@ -86,59 +83,28 @@ impl HttpClient {
8683 header. insert ( "Accept" , HeaderValue :: from_str ( "application/json" ) . unwrap ( ) ) ;
8784 header. insert (
8885 "X-DATABEND-CLIENT-CAPS" ,
89- HeaderValue :: from_str ( "session_header " ) . unwrap ( ) ,
86+ HeaderValue :: from_str ( "session_cookie " ) . unwrap ( ) ,
9087 ) ;
88+ let cookie_provider = GlobalCookieStore :: new ( ) ;
9189 let client = ClientBuilder :: new ( )
90+ . cookie_provider ( Arc :: new ( cookie_provider) )
9291 . default_headers ( header)
9392 // https://github.com/hyperium/hyper/issues/2136#issuecomment-589488526
9493 . http2_keep_alive_timeout ( Duration :: from_secs ( 15 ) )
9594 . pool_max_idle_per_host ( 0 )
9695 . build ( ) ?;
97- let mut session_headers = HeaderMap :: new ( ) ;
98- session_headers. insert ( SESSION_HEADER , HeaderValue :: from_str ( "" ) . unwrap ( ) ) ;
99- let mut res = Self {
100- client,
101- session_token : "" . to_string ( ) ,
102- session_headers,
103- session : None ,
104- debug : false ,
105- port,
106- } ;
107- res. login ( ) . await ?;
108- Ok ( res)
109- }
11096
111- async fn update_session_header ( & mut self , response : Response ) -> Result < Response > {
112- if let Some ( value) = response. headers ( ) . get ( SESSION_HEADER ) {
113- let session_header = value. to_str ( ) . unwrap ( ) . to_owned ( ) ;
114- if !session_header. is_empty ( ) {
115- self . session_headers
116- . insert ( SESSION_HEADER , value. to_owned ( ) ) ;
117- return Ok ( response) ;
118- }
119- }
120- let meta = format ! ( "response={response:?}" ) ;
121- let data = response. text ( ) . await . unwrap ( ) ;
122- Err ( Databend (
123- format ! ( "{} is empty, {meta}, {data}" , SESSION_HEADER , ) . into ( ) ,
124- ) )
125- }
97+ let url = format ! ( "http://127.0.0.1:{}/v1/session/login" , port) ;
12698
127- async fn login ( & mut self ) -> Result < ( ) > {
128- let url = format ! ( "http://127.0.0.1:{}/v1/session/login" , self . port) ;
129- let response = self
130- . client
99+ let session_token = client
131100 . post ( & url)
132- . headers ( self . session_headers . clone ( ) )
133101 . body ( "{}" )
134102 . basic_auth ( "root" , Some ( "" ) )
135103 . send ( )
136104 . await
137105 . inspect_err ( |e| {
138106 println ! ( "fail to send to {}: {:?}" , url, e) ;
139- } ) ?;
140- let response = self . update_session_header ( response) . await ?;
141- self . session_token = response
107+ } ) ?
142108 . json :: < LoginResponse > ( )
143109 . await
144110 . inspect_err ( |e| {
@@ -147,7 +113,14 @@ impl HttpClient {
147113 . tokens
148114 . unwrap ( )
149115 . session_token ;
150- Ok ( ( ) )
116+
117+ Ok ( Self {
118+ client,
119+ session_token,
120+ session : None ,
121+ debug : false ,
122+ port,
123+ } )
151124 }
152125
153126 pub async fn query ( & mut self , sql : & str ) -> Result < DBOutput < DefaultColumnType > > {
@@ -204,43 +177,43 @@ impl HttpClient {
204177 }
205178
206179 // Send request and get response by json format
207- async fn post_query ( & mut self , sql : & str , url : & str ) -> Result < QueryResponse > {
180+ async fn post_query ( & self , sql : & str , url : & str ) -> Result < QueryResponse > {
208181 let mut query = HashMap :: new ( ) ;
209182 query. insert ( "sql" , serde_json:: to_value ( sql) ?) ;
210183 if let Some ( session) = & self . session {
211184 query. insert ( "session" , serde_json:: to_value ( session) ?) ;
212185 }
213- let response = self
186+ Ok ( self
214187 . client
215188 . post ( url)
216- . headers ( self . session_headers . clone ( ) )
217189 . json ( & query)
218190 . bearer_auth ( & self . session_token )
219191 . send ( )
220192 . await
221193 . inspect_err ( |e| {
222194 println ! ( "fail to send to {}: {:?}" , url, e) ;
223- } ) ?;
224- let response = self . update_session_header ( response) . await ?;
225- Ok ( response. json :: < QueryResponse > ( ) . await . inspect_err ( |e| {
226- println ! ( "fail to decode json when call {}: {:?}" , url, e) ;
227- } ) ?)
195+ } ) ?
196+ . json :: < QueryResponse > ( )
197+ . await
198+ . inspect_err ( |e| {
199+ println ! ( "fail to decode json when call {}: {:?}" , url, e) ;
200+ } ) ?)
228201 }
229202
230- async fn poll_query_result ( & mut self , url : & str ) -> Result < QueryResponse > {
231- let response = self
203+ async fn poll_query_result ( & self , url : & str ) -> Result < QueryResponse > {
204+ Ok ( self
232205 . client
233206 . get ( url)
234207 . bearer_auth ( & self . session_token )
235- . headers ( self . session_headers . clone ( ) )
236208 . send ( )
237209 . await
238210 . inspect_err ( |e| {
239211 println ! ( "fail to send to {}: {:?}" , url, e) ;
240- } ) ?;
241- let response = self . update_session_header ( response) . await ?;
242- Ok ( response. json :: < QueryResponse > ( ) . await . inspect_err ( |e| {
243- println ! ( "fail to decode json when call {}: {:?}" , url, e) ;
244- } ) ?)
212+ } ) ?
213+ . json :: < QueryResponse > ( )
214+ . await
215+ . inspect_err ( |e| {
216+ println ! ( "fail to decode json when call {}: {:?}" , url, e) ;
217+ } ) ?)
245218 }
246219}
0 commit comments