-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinflux_handler.py
More file actions
63 lines (53 loc) · 1.58 KB
/
influx_handler.py
File metadata and controls
63 lines (53 loc) · 1.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import influxdb
def initialize_influx():
"""
Initializes InfluxDB to receive UPS data measurements.
"""
influx_client = influxdb.InfluxDBClient(host='localhost', port=8086)
database_list = influx_client.get_list_database()
if all([database['name'] != 'ups' for database in database_list]):
print("UPS database does not yet exist -- creating one now.")
influx_client.create_database('ups')
else:
#print("UPS database already exists.")
pass
return influx_client
def send_data_to_influx(influx_client, json_data_array):
"""
Uses the InfluxDB Python API to send data.
"""
influx_client.switch_database('ups')
bool_response = influx_client.write_points(json_data_array)
return bool_response
# You can use this to test your InfluxDB/Grafana connection. Comes from: https://www.influxdata.com/blog/getting-started-python-influxdb/
sample_input_json_body = [{
"measurement": "brushEvents",
"tags": {
"user": "Carol",
"brushId": "6c89f539-71c6-490d-a28d-6c5d84c0ee2f"
},
"time": "2018-03-28T8:01:00Z",
"fields": {
"duration": 127
}
}, {
"measurement": "brushEvents",
"tags": {
"user": "Carol",
"brushId": "6c89f539-71c6-490d-a28d-6c5d84c0ee2f"
},
"time": "2018-03-29T8:04:00Z",
"fields": {
"duration": 132
}
}, {
"measurement": "brushEvents",
"tags": {
"user": "Carol",
"brushId": "6c89f539-71c6-490d-a28d-6c5d84c0ee2f"
},
"time": "2018-03-30T8:02:00Z",
"fields": {
"duration": 129
}
}]