Skip to content

Commit ae07a4a

Browse files
feat: add parameterized queries (#96)
* feat: add query params to model * feat: add new resource types * feat: add parameterized queries Co-authored-by: Jakub Bednar <[email protected]>
1 parent 33873c3 commit ae07a4a

File tree

8 files changed

+164
-21
lines changed

8 files changed

+164
-21
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
## 2.2.0 [unreleased]
22

3+
### Features
4+
1. [#96](https://github.com/influxdata/influxdb-client-ruby/pull/96): Add support for Parameterized Queries
5+
6+
### Documentation
7+
1. [#96](https://github.com/influxdata/influxdb-client-ruby/pull/96): Add Parameterized Queries example
8+
39
## 2.1.0 [2021-10-22]
410

511
### Features

README.md

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,33 @@ query_api.query_stream(query: query).each do |record|
152152
end
153153
```
154154

155+
#### Parameterized queries
156+
InfluxDB Cloud supports [Parameterized Queries](https://docs.influxdata.com/influxdb/cloud/query-data/parameterized-queries/)
157+
that let you dynamically change values in a query using the InfluxDB API. Parameterized queries make Flux queries more
158+
reusable and can also be used to help prevent injection attacks.
159+
160+
InfluxDB Cloud inserts the params object into the Flux query as a Flux record named `params`. Use dot or bracket
161+
notation to access parameters in the `params` record in your Flux query. Parameterized Flux queries support only `int`
162+
, `float`, and `string` data types. To convert the supported data types into
163+
other [Flux basic data types, use Flux type conversion functions](https://docs.influxdata.com/influxdb/cloud/query-data/parameterized-queries/#supported-parameter-data-types).
164+
165+
Parameterized query example:
166+
> :warning: Parameterized Queries are supported only in InfluxDB Cloud, currently there is no support in InfluxDB OSS.
167+
168+
```ruby
169+
client = InfluxDB2::Client.new('https://localhost:8086', 'my-token',
170+
bucket: 'my-bucket',
171+
org: 'my-org')
172+
173+
query = 'from(bucket: params.bucketParam) |> range(start: duration(v: params.startParam))'
174+
params = { 'bucketParam' => 'my-bucket', 'startParam' => '-1h' }
175+
176+
query_api = client.create_query_api
177+
result = query_api.query(query: query, params: params)
178+
179+
result[0].records.each { |record| puts "#{record.time} #{record.measurement}: #{record.field} #{record.value}" }
180+
```
181+
155182
### Writing data
156183
The [WriteApi](https://github.com/influxdata/influxdb-client-ruby/blob/master/lib/influxdb2/client/write_api.rb) supports synchronous and batching writes into InfluxDB 2.0. In default api uses synchronous write. To enable batching you can use WriteOption.
157184

apis/lib/influxdb2/apis/generated/models/resource.rb

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
=begin
2-
#Influx OSS API Service
2+
#InfluxDB OSS API Service
33
4-
#No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
4+
#The InfluxDB v2 API provides a programmatic interface for all interactions with InfluxDB. Access the InfluxDB API using the `/api/v2/` endpoint.
55
66
The version of the OpenAPI document: 2.0.0
77
@@ -76,10 +76,6 @@ def self.openapi_types
7676
# List of attributes with nullable: true
7777
def self.openapi_nullable
7878
Set.new([
79-
:'id',
80-
:'name',
81-
:'org_id',
82-
:'org'
8379
])
8480
end
8581

@@ -134,15 +130,15 @@ def list_invalid_properties
134130
# @return true if the model is valid
135131
def valid?
136132
return false if @type.nil?
137-
type_validator = EnumAttributeValidator.new('String', ["authorizations", "buckets", "dashboards", "orgs", "sources", "tasks", "telegrafs", "users", "variables", "scrapers", "secrets", "labels", "views", "documents", "notificationRules", "notificationEndpoints", "checks", "dbrp", "notebooks", "annotations"])
133+
type_validator = EnumAttributeValidator.new('String', ["authorizations", "buckets", "dashboards", "orgs", "sources", "tasks", "telegrafs", "users", "variables", "scrapers", "secrets", "labels", "views", "documents", "notificationRules", "notificationEndpoints", "checks", "dbrp", "notebooks", "annotations", "remotes", "replications"])
138134
return false unless type_validator.valid?(@type)
139135
true
140136
end
141137

142138
# Custom attribute writer method checking allowed values (enum).
143139
# @param [Object] type Object to be assigned
144140
def type=(type)
145-
validator = EnumAttributeValidator.new('String', ["authorizations", "buckets", "dashboards", "orgs", "sources", "tasks", "telegrafs", "users", "variables", "scrapers", "secrets", "labels", "views", "documents", "notificationRules", "notificationEndpoints", "checks", "dbrp", "notebooks", "annotations"])
141+
validator = EnumAttributeValidator.new('String', ["authorizations", "buckets", "dashboards", "orgs", "sources", "tasks", "telegrafs", "users", "variables", "scrapers", "secrets", "labels", "views", "documents", "notificationRules", "notificationEndpoints", "checks", "dbrp", "notebooks", "annotations", "remotes", "replications"])
146142
unless validator.valid?(type)
147143
fail ArgumentError, "invalid value for \"type\", must be one of #{validator.allowable_values}."
148144
end

examples/README.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# Examples
2+
3+
## Writes
4+
5+
6+
## Queries
7+
- [influxdb_18_example.rb](influxdb_18_example.rb) - How to connect to InfluxDB 1.8
8+
- [parameterized_query.rb](parameterized_query.rb) - How to use parameterized Flux queries
9+
10+
## Management API
11+
- [create_new_bucket.rb](create_new_bucket.rb) - How to create Buckets
12+
13+
## Others

examples/parameterized_query.rb

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
$LOAD_PATH.unshift File.expand_path('../lib', __dir__)
2+
require 'influxdb-client'
3+
4+
# warning: Parameterized Queries are supported only in InfluxDB Cloud, currently there is no support in InfluxDB OSS.
5+
6+
url = 'https://europe-west1-1.gcp.cloud2.influxdata.com'
7+
token = 'my-token'
8+
bucket = 'my-bucket'
9+
org = 'my-org'
10+
11+
client = InfluxDB2::Client.new(url,
12+
token,
13+
bucket: bucket,
14+
org: org,
15+
precision: InfluxDB2::WritePrecision::NANOSECOND)
16+
17+
puts '*** Write Points ***'
18+
19+
write_api = client.create_write_api
20+
point = InfluxDB2::Point.new(name: 'weather')
21+
.add_tag('location', 'Praque')
22+
.add_field('temperature', 21)
23+
puts point.to_line_protocol
24+
write_api.write(data: point)
25+
26+
puts '*** Query Points ***'
27+
28+
query_api = client.create_query_api
29+
query = 'from(bucket: params.bucketParam) |> range(start: duration(v: params.startParam))'
30+
params = { 'bucketParam' => 'my-bucket', 'startParam' => '-1h' }
31+
result = query_api.query(query: query, params: params)
32+
result[0].records.each { |record| puts "#{record.time} #{record.measurement}: #{record.field} #{record.value}" }
33+
34+
client.close!

lib/influxdb2/client/models/query.rb

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
=begin
2-
#Influx OSS API Service
2+
#InfluxDB OSS API Service
33
4-
#No description provided (generated by Openapi Generator https://github.com/openapitools/openapi-generator)
4+
#The InfluxDB v2 API provides a programmatic interface for all interactions with InfluxDB. Access the InfluxDB API using the `/api/v2/` endpoint.
55
66
The version of the OpenAPI document: 2.0.0
77
@@ -24,6 +24,9 @@ class Query
2424
# The type of query. Must be \"flux\".
2525
attr_reader :type
2626

27+
# Enumeration of key/value pairs that respresent parameters to be injected into query (can only specify either this field or extern and not both)
28+
attr_accessor :params
29+
2730
attr_accessor :dialect
2831

2932
# Specifies the time that should be reported as \"now\" in the query. Default is the server's now time.
@@ -57,6 +60,7 @@ def self.attribute_map
5760
:'extern' => :'extern',
5861
:'query' => :'query',
5962
:'type' => :'type',
63+
:'params' => :'params',
6064
:'dialect' => :'dialect',
6165
:'now' => :'now',
6266
}
@@ -68,6 +72,7 @@ def self.openapi_types
6872
:'extern' => :'File',
6973
:'query' => :'String',
7074
:'type' => :'String',
75+
:'params' => :'Hash<String, Object>',
7176
:'dialect' => :'Dialect',
7277
:'now' => :'Time'
7378
}
@@ -106,6 +111,12 @@ def initialize(attributes = {})
106111
self.type = attributes[:'type']
107112
end
108113

114+
if attributes.key?(:'params')
115+
if (value = attributes[:'params']).is_a?(Hash)
116+
self.params = value
117+
end
118+
end
119+
109120
if attributes.key?(:'dialect')
110121
self.dialect = attributes[:'dialect']
111122
end
@@ -153,6 +164,7 @@ def ==(o)
153164
extern == o.extern &&
154165
query == o.query &&
155166
type == o.type &&
167+
params == o.params &&
156168
dialect == o.dialect &&
157169
now == o.now
158170
end
@@ -166,7 +178,7 @@ def eql?(o)
166178
# Calculates hash code according to all attributes.
167179
# @return [Integer] Hash code
168180
def hash
169-
[extern, query, type, dialect, now].hash
181+
[extern, query, type, params, dialect, now].hash
170182
end
171183

172184
# Builds the object from hash

lib/influxdb2/client/query_api.rb

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,18 @@ def initialize(options:)
3636

3737
# @param [Object] query the flux query to execute. The data could be represent by [String], [Query]
3838
# @param [String] org specifies the source organization
39+
# @param [Enumerable] params represent key/value pairs parameters to be injected into query
3940
# @return [String] result of query
40-
def query_raw(query: nil, org: nil, dialect: DEFAULT_DIALECT)
41-
_post_query(query: query, org: org, dialect: dialect).read_body
41+
def query_raw(query: nil, org: nil, dialect: DEFAULT_DIALECT, params: nil)
42+
_post_query(query: query, org: org, dialect: dialect, params: params).read_body
4243
end
4344

4445
# @param [Object] query the flux query to execute. The data could be represent by [String], [Query]
4546
# @param [String] org specifies the source organization
47+
# @param [Enumerable] params represent key/value pairs parameters to be injected into query
4648
# @return [Array] list of FluxTables which are matched the query
47-
def query(query: nil, org: nil, dialect: DEFAULT_DIALECT)
48-
response = query_raw(query: query, org: org, dialect: dialect)
49+
def query(query: nil, org: nil, dialect: DEFAULT_DIALECT, params: nil)
50+
response = query_raw(query: query, org: org, dialect: dialect, params: params)
4951
parser = InfluxDB2::FluxCsvParser.new(response)
5052

5153
parser.parse
@@ -54,20 +56,21 @@ def query(query: nil, org: nil, dialect: DEFAULT_DIALECT)
5456

5557
# @param [Object] query the flux query to execute. The data could be represent by [String], [Query]
5658
# @param [String] org specifies the source organization
59+
# @param [Enumerable] params represent key/value pairs parameters to be injected into query
5760
# @return stream of Flux Records
58-
def query_stream(query: nil, org: nil, dialect: DEFAULT_DIALECT)
59-
response = _post_query(query: query, org: org, dialect: dialect)
61+
def query_stream(query: nil, org: nil, dialect: DEFAULT_DIALECT, params: nil)
62+
response = _post_query(query: query, org: org, dialect: dialect, params: params)
6063

6164
InfluxDB2::FluxCsvParser.new(response, stream: true)
6265
end
6366

6467
private
6568

66-
def _post_query(query: nil, org: nil, dialect: DEFAULT_DIALECT)
69+
def _post_query(query: nil, org: nil, dialect: DEFAULT_DIALECT, params: nil)
6770
org_param = org || @options[:org]
6871
_check('org', org_param)
6972

70-
payload = _generate_payload(query, dialect)
73+
payload = _generate_payload(query: query, dialect: dialect, params: params)
7174
return nil if payload.nil?
7275

7376
uri = _parse_uri('/api/v2/query')
@@ -76,16 +79,17 @@ def _post_query(query: nil, org: nil, dialect: DEFAULT_DIALECT)
7679
_post_json(payload.to_body.to_json, uri)
7780
end
7881

79-
def _generate_payload(query, dialect)
82+
def _generate_payload(query: nil, dialect: nil, params: nil)
8083
if query.nil?
8184
nil
8285
elsif query.is_a?(Query)
86+
query.params = params unless params.nil?
8387
query
8488
elsif query.is_a?(String)
8589
if query.empty?
8690
nil
8791
else
88-
Query.new(query: query, dialect: dialect, type: nil)
92+
Query.new(query: query, dialect: dialect, type: nil, params: params)
8993
end
9094
end
9195
end

test/influxdb/query_api_test.rb

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,27 @@ def test_query_raw
4949
assert_equal result, SUCCESS_DATA
5050
end
5151

52+
def test_parameterized_query_raw
53+
body = '{"query":"from(bucket: params.bucketParam) |> range(start: duration(v: params.startParam)) |> last()",' \
54+
'"params":{"bucketParam":"my-bucket","startParam":"1970-01-01T00:00:00.000000001Z"},' \
55+
'"dialect":{"header":true,"delimiter":",","annotations":["datatype","group","default"],' \
56+
'"commentPrefix":"#","dateTimeFormat":"RFC3339"}}'
57+
stub_request(:post, 'http://localhost:8086/api/v2/query?org=my-org')
58+
.with(body: body)
59+
.to_return(body: SUCCESS_DATA)
60+
61+
client = InfluxDB2::Client.new('http://localhost:8086', 'my-token',
62+
bucket: 'my-bucket',
63+
org: 'my-org',
64+
use_ssl: false)
65+
66+
query = 'from(bucket: params.bucketParam) |> range(start: duration(v: params.startParam)) |> last()'
67+
params = Hash['bucketParam' => 'my-bucket', 'startParam' => '1970-01-01T00:00:00.000000001Z']
68+
result = client.create_query_api.query_raw(query: query, params: params)
69+
70+
assert_equal result, SUCCESS_DATA
71+
end
72+
5273
def test_query
5374
stub_request(:post, 'http://localhost:8086/api/v2/query?org=my-org')
5475
.to_return(body: SUCCESS_DATA)
@@ -73,6 +94,36 @@ def test_query
7394
assert_equal 'free', record1.field
7495
end
7596

97+
def test_parameterized_query
98+
body = '{"query":"from(bucket: params.bucketParam) |> range(start: duration(v: params.startParam)) |> last()",' \
99+
'"params":{"bucketParam":"my-bucket","startParam":"1970-01-01T00:00:00.000000001Z"},' \
100+
'"dialect":{"header":true,"delimiter":",","annotations":["datatype","group","default"],' \
101+
'"commentPrefix":"#","dateTimeFormat":"RFC3339"}}'
102+
stub_request(:post, 'http://localhost:8086/api/v2/query?org=my-org')
103+
.with(body: body)
104+
.to_return(body: SUCCESS_DATA)
105+
106+
client = InfluxDB2::Client.new('http://localhost:8086', 'my-token',
107+
bucket: 'my-bucket',
108+
org: 'my-org',
109+
use_ssl: false)
110+
111+
query = 'from(bucket: params.bucketParam) |> range(start: duration(v: params.startParam)) |> last()'
112+
params = Hash['bucketParam' => 'my-bucket', 'startParam' => '1970-01-01T00:00:00.000000001Z']
113+
114+
result = client.create_query_api.query(query: query, params: params)
115+
116+
assert_equal 1, result.length
117+
assert_equal 4, result[0].records.length
118+
119+
record1 = result[0].records[0]
120+
121+
assert_equal Time.parse('1970-01-01T00:00:10Z').to_datetime.rfc3339(9), record1.time
122+
assert_equal 'mem', record1.measurement
123+
assert_equal 10, record1.value
124+
assert_equal 'free', record1.field
125+
end
126+
76127
def test_headers
77128
stub_request(:post, 'http://localhost:8086/api/v2/query?org=my-org')
78129
.to_return(body: SUCCESS_DATA)

0 commit comments

Comments
 (0)