|
| 1 | +from posix import times |
| 2 | +from influxdb_client.client.flux_table import FluxRecord, FluxTable |
1 | 3 | from influxdb_client.client.influxdb_client import InfluxDBClient |
2 | 4 | from influxdb_client.client.write_api import ASYNCHRONOUS |
3 | 5 | from src.configs.influx_conf import InfluxConf |
4 | 6 | from src.models.raw import Raw, RAW_MEASUREMENT |
5 | 7 | from src.services.influx_query import QueryIF |
| 8 | +import logging |
| 9 | + |
| 10 | +logger = logging.getLogger(__name__) |
6 | 11 |
|
7 | 12 | class InfluxService: |
8 | 13 | def __init__(self) -> None: |
@@ -32,15 +37,28 @@ def query_raw_data(self, start_time: int, end_time: int, cell_index: int, batch_ |
32 | 37 | limit=_LIMIT+1, |
33 | 38 | offset=offset |
34 | 39 | ) |
35 | | - |
36 | | - result = self.query_api.query(query) |
37 | | - |
| 40 | + print(query) |
| 41 | + result:list[FluxTable] = self.query_api.query(query) |
| 42 | + print(result) |
38 | 43 | # Convert FluxTable to list of dicts |
39 | | - rows = [] |
| 44 | + rows = {} |
| 45 | + table:FluxTable |
| 46 | + record:FluxRecord |
40 | 47 | for table in result: |
41 | 48 | for record in table.records: |
42 | | - rows.append(record.values) |
43 | | - |
| 49 | + timestamp = record.get_time() |
| 50 | + if timestamp not in rows: |
| 51 | + rows[timestamp] = {k: v for k, v in record.values.items() if not k.startswith('_')} |
| 52 | + rows[timestamp]["timestamp"] = timestamp |
| 53 | + for col in record.values.keys(): |
| 54 | + if col.startswith('_'): |
| 55 | + rows[timestamp][col] = None |
| 56 | + |
| 57 | + field = record.get_field() |
| 58 | + value = record.get_value() |
| 59 | + rows[timestamp][field] = value |
| 60 | + |
| 61 | + rows = list(rows.values()) |
44 | 62 | return rows, len(rows)>_LIMIT |
45 | 63 |
|
46 | 64 |
|
|
0 commit comments