25
25
//! let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap();
26
26
//! let client = InfluxDbClient::new("http://localhost:8086", "test");
27
27
//! let query = InfluxDbQuery::raw_read_query("SELECT temperature FROM /weather_[a-z]*$/ WHERE time > now() - 1m ORDER BY DESC");
28
- //! let _result = rt.block_on(client.json_query::<WeatherWithoutCityName, _ >(query))
28
+ //! let _result = rt.block_on(client.json_query::<WeatherWithoutCityName>(query))
29
29
//! .map(|it| {
30
30
//! it.map(|series_vec| {
31
31
//! series_vec
36
36
//! }).collect::<Vec<Weather>>()
37
37
//! })
38
38
//! });
39
+ //! ```
39
40
40
41
use crate :: client:: InfluxDbClient ;
41
42
@@ -49,7 +50,11 @@ use serde::Deserialize;
49
50
use serde_json;
50
51
51
52
use crate :: error:: InfluxDbError ;
52
- use crate :: query:: { InfluxDbQuery , QueryType } ;
53
+
54
+ use crate :: query:: read_query:: InfluxDbReadQuery ;
55
+ use crate :: query:: InfluxDbQuery ;
56
+
57
+ use url:: form_urlencoded;
53
58
54
59
#[ derive( Deserialize ) ]
55
60
#[ doc( hidden) ]
@@ -77,55 +82,41 @@ pub struct InfluxDbSeries<T> {
77
82
}
78
83
79
84
impl InfluxDbClient {
80
- pub fn json_query < T : ' static , Q > (
85
+ pub fn json_query < T : ' static > (
81
86
& self ,
82
- q : Q ,
87
+ q : InfluxDbReadQuery ,
83
88
) -> Box < dyn Future < Item = Option < Vec < InfluxDbSeries < T > > > , Error = InfluxDbError > >
84
89
where
85
- Q : InfluxDbQuery ,
86
90
T : DeserializeOwned ,
87
91
{
88
92
use futures:: future;
89
93
90
- let q_type = q. get_type ( ) ;
91
- let query = match q. build ( ) {
92
- Err ( err) => {
94
+ let query = q. build ( ) . unwrap ( ) ;
95
+
96
+ let client = {
97
+ let read_query = query. get ( ) ;
98
+ let encoded: String = form_urlencoded:: Serializer :: new ( String :: new ( ) )
99
+ . append_pair ( "db" , self . database_name ( ) )
100
+ . append_pair ( "q" , & read_query)
101
+ . finish ( ) ;
102
+ let http_query_string = format ! (
103
+ "{url}/query?{encoded}" ,
104
+ url = self . database_url( ) ,
105
+ encoded = encoded
106
+ ) ;
107
+
108
+ if read_query. contains ( "SELECT" ) || read_query. contains ( "SHOW" ) {
109
+ Client :: new ( ) . get ( http_query_string. as_str ( ) )
110
+ } else {
93
111
let error = InfluxDbError :: InvalidQueryError {
94
- error : format ! ( "{}" , err) ,
112
+ error : String :: from (
113
+ "Only SELECT and SHOW queries supported with JSON deserialization" ,
114
+ ) ,
95
115
} ;
96
116
return Box :: new (
97
117
future:: err :: < Option < Vec < InfluxDbSeries < T > > > , InfluxDbError > ( error) ,
98
118
) ;
99
119
}
100
- Ok ( query) => query,
101
- } ;
102
-
103
- let client = match q_type {
104
- QueryType :: ReadQuery => {
105
- let read_query = query. get ( ) ;
106
- let http_query_string = format ! (
107
- "{url}/query?db={db}&q={read_query}" ,
108
- url = self . database_url( ) ,
109
- db = self . database_name( ) ,
110
- read_query = read_query,
111
- ) ;
112
-
113
- if read_query. contains ( "SELECT" ) || read_query. contains ( "SHOW" ) {
114
- Client :: new ( ) . get ( http_query_string. as_str ( ) )
115
- } else {
116
- Client :: new ( ) . post ( http_query_string. as_str ( ) )
117
- }
118
- }
119
- QueryType :: WriteQuery => Client :: new ( )
120
- . post (
121
- format ! (
122
- "{url}/write?db={db}" ,
123
- url = self . database_url( ) ,
124
- db = self . database_name( ) ,
125
- )
126
- . as_str ( ) ,
127
- )
128
- . body ( query. get ( ) ) ,
129
120
} ;
130
121
131
122
Box :: new (
0 commit comments