Skip to content

Commit 06a7f65

Browse files
committed
Improve export capabilities
- Add parameters `sort`, `direction`, `limit`, and `scalar`. - Improve documentation
1 parent 5c3c856 commit 06a7f65

File tree

5 files changed

+144
-12
lines changed

5 files changed

+144
-12
lines changed

CHANGES.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ in progress
2525
a single TTN Application. Thanks, @thiasB, @einsiedlerkrebs, and @ClemensGruber.
2626
- [core] Fix error when connecting to MQTT broker without authentication credentials
2727
- [docs] Refactor "decoders" section to "integrations", and improve index/overview page
28+
- [export] Improve export capabilities by adding parameters ``sort``, ``direction``,
29+
``limit``, and ``scalar``. Thanks, @ClemensGruber.
2830

2931

3032
.. _kotori-0.27.0:

doc/source/handbook/export/index.rst

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -160,13 +160,31 @@ This would yield the result with "weight" field omitted::
160160
2016-07-01 16:58:34.788767764,64.64,48.48
161161
2016-07-01 16:58:37.645754806,64.64,48.48
162162

163-
There's also the url parameter ``include`` which does things the other way round:
163+
Include fields
164+
==============
165+
There is also the url parameter ``include`` which does things the other way round:
164166
It will only export the named fields::
165167

166168
http GET $HTTP_URI/api/$MQTT_TOPIC/data.csv include=weight
167169

168170
Both parameters take a comma-separated list of field names.
169171

172+
Sort records
173+
============
174+
The URL parameter ``sort`` specifies a field name you would like to sort by, e.g.
175+
``sort=temperature``. The parameter ``direction`` can be used to control the sort
176+
oder, e.g. ``direction=desc`` for sorting in descending order. If you omit the
177+
``direction`` parameter, the sort order is ascending.
178+
179+
Limit result size
180+
=================
181+
The ``limit`` parameter controls the number of records to be returned.
182+
183+
Return scalar values
184+
====================
185+
In order to return a single scalar value, use the ``scalar`` parameter. It will
186+
omit any headers. For example, ``sort=time&direction=desc&scalar=humidity`` will
187+
return the most recent humidity value in the database.
170188

171189
Hierarchical data
172190
=================
@@ -182,8 +200,4 @@ Todo
182200

183201
Describe parameters::
184202

185-
include, exclude, pad
186-
interpolate=true
187-
&from=20160519
188-
sorted
189-
203+
pad, interpolate=true

kotori/io/protocol/target.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# -*- coding: utf-8 -*-
2-
# (c) 2016-2021 Andreas Motl <andreas@getkotori.org>
2+
# (c) 2016-2023 Andreas Motl <andreas@getkotori.org>
33
from pyramid.settings import asbool
44
from twisted.internet import threads
55
from twisted.web import http, server
@@ -108,6 +108,10 @@ def emit(self, uri, bucket):
108108

109109
# DataFrame manipulation
110110

111+
if 'sort' in bucket.tdata and bucket.tdata.sort:
112+
# http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.sort.html
113+
df.sort_values(by=read_list(bucket.tdata.sort), ascending=bucket.tdata.get("direction", "ascending").startswith("asc"), inplace=True)
114+
111115
# Drop some fields from DataFrame as requested
112116
if 'exclude' in bucket.tdata and bucket.tdata.exclude:
113117
drop_fields = read_list(bucket.tdata.exclude, empty_elements=False)
@@ -144,10 +148,14 @@ def emit(self, uri, bucket):
144148
# http://pandas.pydata.org/pandas-docs/stable/missing_data.html#interpolation
145149
df.interpolate(inplace=True)
146150

147-
if 'sorted' in bucket.tdata and asbool(bucket.tdata.sorted):
148-
# http://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.sort.html
149-
df.sort(axis='columns', inplace=True)
151+
# Only return specified number of records.
152+
if 'limit' in bucket.tdata and bucket.tdata.limit:
153+
df = df[:int(bucket.tdata.limit)]
150154

155+
if 'scalar' in bucket.tdata and bucket.tdata.scalar:
156+
bucket.request.setHeader('Content-Type', 'text/plain; charset=utf-8')
157+
value = df[bucket.tdata.scalar].values[0]
158+
return str(value)
151159

152160
# Compute http response from DataFrame, taking designated output format into account
153161
response = HttpDataFrameResponse(bucket, dataframe=df)

test/test_export.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,109 @@ def verify_export_general(channel_path, http_submit, http_fetch):
104104
assert b"cdn.datatables.net" in deferred.result
105105

106106

107+
@pytest_twisted.inlineCallbacks
108+
@pytest.mark.http
109+
@pytest.mark.export
110+
def test_export_exclude_include(machinery, create_influxdb, reset_influxdb):
111+
"""
112+
Verify `exclude` and `include` transformation parameters of HTTP export API.
113+
"""
114+
115+
# Submit a single measurement, with timestamp.
116+
data = {
117+
'time': 1583810982,
118+
'temperature': 25.26,
119+
'humidity': 51.8,
120+
}
121+
yield threads.deferToThread(http_json_sensor, settings.channel_path_data, data)
122+
123+
# Wait for some time to process the message.
124+
yield sleep(PROCESS_DELAY_MQTT)
125+
126+
# Excluding fields.
127+
deferred = threads.deferToThread(http_get_data, settings.channel_path_data, format="json", params={
128+
"from": ts_from,
129+
"to": ts_to,
130+
"exclude": "time,humidity",
131+
})
132+
yield deferred
133+
result = json.loads(deferred.result)
134+
assert_equal(result, [{"temperature": 25.26}])
135+
136+
# Including fields.
137+
deferred = threads.deferToThread(http_get_data, settings.channel_path_data, format="json", params={
138+
"from": ts_from,
139+
"to": ts_to,
140+
"include": "temperature",
141+
})
142+
yield deferred
143+
result = json.loads(deferred.result)
144+
assert_equal(result, [{"time": "2020-03-10T03:29:42.000Z", "temperature": 25.26}])
145+
146+
147+
@pytest_twisted.inlineCallbacks
148+
@pytest.mark.http
149+
@pytest.mark.export
150+
def test_export_sorting_limit_scalar(machinery, create_influxdb, reset_influxdb):
151+
"""
152+
Verify `sort` and `direction` transformation parameters of HTTP export API.
153+
"""
154+
155+
# Submit two measurements, with timestamp.
156+
data = {
157+
'time': 1583810982,
158+
'temperature': 25.26,
159+
'humidity': 51.8,
160+
}
161+
yield threads.deferToThread(http_json_sensor, settings.channel_path_data, data)
162+
163+
data = {
164+
'time': 1583810993,
165+
'temperature': 32.26,
166+
'humidity': 64.8,
167+
}
168+
yield threads.deferToThread(http_json_sensor, settings.channel_path_data, data)
169+
170+
# Wait for some time to process the message.
171+
yield sleep(PROCESS_DELAY_MQTT)
172+
173+
# Sorting.
174+
deferred = threads.deferToThread(http_get_data, settings.channel_path_data, format="json", params={
175+
"from": ts_from,
176+
"to": ts_to,
177+
"exclude": "time,humidity",
178+
"sort": "temperature",
179+
"direction": "descending",
180+
})
181+
yield deferred
182+
result = json.loads(deferred.result)
183+
assert_equal(result, [{"temperature": 32.26}, {'temperature': 25.26}])
184+
185+
# Limit.
186+
deferred = threads.deferToThread(http_get_data, settings.channel_path_data, format="json", params={
187+
"from": ts_from,
188+
"to": ts_to,
189+
"exclude": "time,humidity",
190+
"sort": "temperature",
191+
"direction": "descending",
192+
"limit": 1,
193+
})
194+
yield deferred
195+
result = json.loads(deferred.result)
196+
assert_equal(result, [{"temperature": 32.26}])
197+
198+
# Scalar.
199+
deferred = threads.deferToThread(http_get_data, settings.channel_path_data, format="txt", params={
200+
"from": ts_from,
201+
"to": ts_to,
202+
"sort": "temperature",
203+
"direction": "descending",
204+
"scalar": "temperature",
205+
})
206+
yield deferred
207+
assert deferred.result == "32.26"
208+
209+
107210
@pytest_twisted.inlineCallbacks
108211
@pytest.mark.http
109212
@pytest.mark.export

test/util.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,11 +256,16 @@ def http_csv_sensor(topic, data):
256256
return requests.post(uri, data=body, headers={'Content-Type': 'text/csv'})
257257

258258

259-
def http_get_data(path: str = None, format='csv', ts_from=None, ts_to=None, port=24642):
259+
def http_get_data(path: str = None, format='csv', params=None, ts_from=None, ts_to=None, port=24642):
260260
path = path.lstrip("/")
261261
uri = f'http://localhost:{port}/api/{path}.{format}?from={ts_from}&to={ts_to}'
262262
logger.info('HTTP: Exporting data from {} using format "{}"'.format(uri, format))
263-
payload = requests.get(uri).content
263+
params = params or {}
264+
if ts_from:
265+
params["from"] = ts_from
266+
if ts_to:
267+
params["to"] = ts_to
268+
payload = requests.get(uri, params=params).content
264269
if format in ["csv", "txt", "json", "html"]:
265270
payload = payload.decode()
266271
return payload

0 commit comments

Comments
 (0)