-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathknx_log_utils.py
More file actions
248 lines (202 loc) · 8.83 KB
/
knx_log_utils.py
File metadata and controls
248 lines (202 loc) · 8.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Hilfsfunktionen zum Parsen und Cachen von KNX-Logdateien.
Wird von knx-lens.py importiert.
"""
import csv
import re
import logging
from datetime import datetime, time as datetime_time
from typing import Dict, List, Any, Optional, Tuple
# ============================================================================
# CONSTANTS
# ============================================================================
# Pre-compiled regex for performance
GA_PATTERN = re.compile(r'\d+/\d+/\d+')
# Log Format Detection
LOG_FORMAT_PIPE_SEPARATED = 'pipe_separated'
LOG_FORMAT_CSV = 'csv'
PIPE_SEPARATOR = ' | '
CSV_DELIMITER = ';'
MIN_LOG_FORMAT_CHECK_LINES = 20
# Timestamp Parsing
TIMESTAMP_TIME_FORMAT = "%H:%M:%S"
def detect_log_format(first_lines: List[str]) -> Optional[str]:
"""Detect log file format (pipe-separated or CSV).
Args:
first_lines: First N lines from log file
Returns:
Format string ('pipe_separated', 'csv') or None if unrecognized
"""
for line in first_lines:
line = line.strip()
if not line or line.startswith("="): continue
if PIPE_SEPARATOR in line and len(line.split('|')) > 4 and GA_PATTERN.search(line.split('|')[3]):
return LOG_FORMAT_PIPE_SEPARATED
if CSV_DELIMITER in line:
return LOG_FORMAT_CSV
return None
def _parse_lines_internal(
lines: List[str],
project_data: Dict,
log_format: str,
time_filter_start: Optional[datetime_time] = None,
time_filter_end: Optional[datetime_time] = None
) -> Tuple[List[Dict[str, Any]], List[Dict[str, str]]]:
"""Internal parsing engine for log lines.
Parses log lines based on format and extracts timestamp, GA, payload.
Supports optional time filtering.
Args:
lines: Log lines to parse
project_data: Loaded project data (wrapped or unwrapped)
log_format: Format type ('pipe_separated' or 'csv')
time_filter_start: Optional start time filter
time_filter_end: Optional end time filter
Returns:
Tuple of (payload_items, cached_items) dicts
"""
new_payload_items = []
new_cached_items = []
# --- FIX für N/A Problem: Wrapper entpacken ---
if "project" in project_data:
actual_data = project_data["project"]
else:
actual_data = project_data
devices_dict = actual_data.get("devices", {})
ga_dict = actual_data.get("group_addresses", {})
# ----------------------------------------------
has_time_filter = time_filter_start or time_filter_end
for line in lines:
clean_line = line.strip()
if not clean_line or clean_line.startswith("="): continue
try:
timestamp, pa, ga, payload = None, "N/A", None, None
if log_format == LOG_FORMAT_PIPE_SEPARATED:
parts = [p.strip() for p in clean_line.split('|')]
if len(parts) > 3:
timestamp = parts[0]
ga = parts[3]
pa = parts[1] if len(parts) > 1 else "N/A"
payload = parts[5] if len(parts) > 5 else None
elif log_format == LOG_FORMAT_CSV:
row = next(csv.reader([clean_line], delimiter=';'))
if len(row) > 4:
timestamp = row[0]
ga = row[4]
pa = row[1] if len(row) > 1 else "N/A"
payload = row[6] if len(row) > 6 else None
if timestamp and ga and GA_PATTERN.match(ga):
if has_time_filter:
try:
time_str = timestamp.split(' ')[1].split('.')[0]
log_time = datetime.strptime(time_str, TIMESTAMP_TIME_FORMAT).time()
if time_filter_start and log_time < time_filter_start:
continue
if time_filter_end and log_time > time_filter_end:
continue
except (ValueError, IndexError):
logging.debug(f"Konnte Timestamp für Zeitfilter nicht parsen: {timestamp}")
continue
payload_str = payload if payload is not None else "N/A"
if payload is not None:
new_payload_items.append({
"ga": ga,
"timestamp": timestamp,
"payload": payload_str
})
pa_name = devices_dict.get(pa, {}).get("name", "N/A")
ga_name = ga_dict.get(ga, {}).get("name", "N/A")
search_string = (
f"{timestamp} "
f"{pa} "
f"{pa_name} "
f"{ga} "
f"{ga_name} "
f"{payload_str}"
)
new_cached_items.append({
"timestamp": timestamp,
"pa": pa,
"pa_name": pa_name,
"ga": ga,
"ga_name": ga_name,
"payload": payload_str,
"search_string": search_string
})
except (IndexError, StopIteration, csv.Error) as e:
logging.debug(f"Konnte Log-Zeile nicht parsen: '{clean_line}' - Fehler: {e}")
continue
return new_payload_items, new_cached_items
def parse_and_cache_log_data(
lines: List[str],
project_data: Dict,
time_filter_start: Optional[datetime_time] = None,
time_filter_end: Optional[datetime_time] = None
) -> Tuple[Dict[str, List[Dict[str, str]]], List[Dict[str, str]]]:
"""Parse log file and build payload history + cache.
Args:
lines: All log lines from file
project_data: Loaded project data
time_filter_start: Optional start time filter
time_filter_end: Optional end time filter
Returns:
Tuple of (payload_history dict, cached_log_data list)
"""
payload_history: Dict[str, List[Dict[str, str]]] = {}
cached_log_data: List[Dict[str, str]] = []
first_content_lines = [line for line in lines[:MIN_LOG_FORMAT_CHECK_LINES] if line.strip() and not line.strip().startswith("=")]
log_format = detect_log_format(first_content_lines)
if not log_format:
logging.warning("Konnte Log-Format beim Parsen für Cache nicht bestimmen.")
return payload_history, cached_log_data
new_payload_items, new_cached_items = _parse_lines_internal(
lines, project_data, log_format, time_filter_start, time_filter_end
)
cached_log_data = new_cached_items
for item in new_payload_items:
ga = item["ga"]
if ga not in payload_history:
payload_history[ga] = []
payload_history[ga].append({'timestamp': item["timestamp"], 'payload': item["payload"]})
for ga in payload_history:
payload_history[ga].sort(key=lambda x: x['timestamp'])
return payload_history, cached_log_data
def append_new_log_lines(
lines: List[str],
project_data: Dict,
payload_history: Dict[str, List[Dict[str, str]]],
cached_log_data: List[Dict[str, str]],
time_filter_start: Optional[datetime_time] = None,
time_filter_end: Optional[datetime_time] = None
) -> List[Dict[str, str]]:
"""Append new log lines to existing cache and payload history.
Args:
lines: New log lines to append
project_data: Loaded project data
payload_history: Existing payload history (modified in place)
cached_log_data: Existing cache (modified in place)
time_filter_start: Optional start time filter
time_filter_end: Optional end time filter
Returns:
List of newly added cache items
"""
log_format = detect_log_format(lines[:MIN_LOG_FORMAT_CHECK_LINES])
if not log_format:
if cached_log_data:
first_entry = cached_log_data[0]
simulated_line = f"{first_entry['timestamp']} | {first_entry['pa']} | | {first_entry['ga']} | | {first_entry['payload']}"
log_format = detect_log_format([simulated_line])
if not log_format:
logging.warning("Konnte Log-Format für Delta-Update nicht bestimmen.")
return []
new_payload_items, new_cached_items = _parse_lines_internal(
lines, project_data, log_format, time_filter_start, time_filter_end
)
cached_log_data.extend(new_cached_items)
for item in new_payload_items:
ga = item["ga"]
if ga not in payload_history:
payload_history[ga] = []
payload_history[ga].append({'timestamp': item["timestamp"], 'payload': item["payload"]})
return new_cached_items