Skip to content

Commit ed762e7

Browse files
Merge pull request #22 from ATNoG/hotfix/data_insertion
Fixed and tested
2 parents 2678885 + e4f817d commit ed762e7

File tree

2 files changed

+69
-8
lines changed

2 files changed

+69
-8
lines changed

src/models/processed_latency.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,5 @@ class ProcessedLatency(BaseModel):
6363

6464
def to_dict(self) -> dict:
6565
"""Convert to dictionary for ClickHouse insertion"""
66-
return self.model_dump(mode='json')
66+
# Use mode='python' to keep datetime objects as datetime, not strings
67+
return self.model_dump(mode='python')

src/services/clickhouse.py

Lines changed: 67 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,19 @@ def transform_processor_output(data: dict) -> dict:
4444
window_end_dt = datetime.fromtimestamp(window_end, tz=timezone.utc)
4545
window_duration = window_end - window_start
4646

47+
# Skip records with no samples
48+
sample_count = data.get("sample_count", 0)
49+
if sample_count == 0:
50+
raise ValueError("Cannot store record with sample_count = 0")
51+
4752
# Build flat structure
4853
transformed = {
4954
"window_start_time": window_start_dt,
5055
"window_end_time": window_end_dt,
5156
"window_duration_seconds": float(window_duration),
5257
"cell_index": data.get("cell_index"),
53-
"network": data.get("network"),
54-
"sample_count": data.get("sample_count"),
58+
"network": data.get("network") or "Unknown",
59+
"sample_count": sample_count,
5560
"primary_bandwidth": data.get("primary_bandwidth"),
5661
"ul_bandwidth": data.get("ul_bandwidth"),
5762
}
@@ -98,29 +103,84 @@ def write_data(self, data: dict) -> None:
98103
# Transform nested processor format to flat storage format
99104
transformed = transform_processor_output(data)
100105
processed = ProcessedLatency(**transformed)
101-
record = processed.to_dict()
106+
record_dict = processed.to_dict()
107+
108+
# Convert dict to list in column order for ClickHouse
109+
record_row = [
110+
record_dict['window_start_time'],
111+
record_dict['window_end_time'],
112+
record_dict['window_duration_seconds'],
113+
record_dict['cell_index'],
114+
record_dict['network'],
115+
record_dict['rsrp_mean'], record_dict['rsrp_max'], record_dict['rsrp_min'], record_dict['rsrp_std'],
116+
record_dict['sinr_mean'], record_dict['sinr_max'], record_dict['sinr_min'], record_dict['sinr_std'],
117+
record_dict['rsrq_mean'], record_dict['rsrq_max'], record_dict['rsrq_min'], record_dict['rsrq_std'],
118+
record_dict['latency_mean'], record_dict['latency_max'], record_dict['latency_min'], record_dict['latency_std'],
119+
record_dict['cqi_mean'], record_dict['cqi_max'], record_dict['cqi_min'], record_dict['cqi_std'],
120+
record_dict['primary_bandwidth'],
121+
record_dict['ul_bandwidth'],
122+
record_dict['sample_count']
123+
]
102124

103125
# Use async_insert for better performance
104126
self.client.insert(
105127
'analytics.processed_latency',
106-
[record],
128+
[record_row],
107129
settings={'async_insert': 1, 'wait_for_async_insert': 0}
108130
)
131+
except ValueError as e:
132+
# Skip records with no samples (sample_count = 0)
133+
if "sample_count = 0" in str(e):
134+
return
135+
raise Exception(f"Failed to write to ClickHouse: {e}")
109136
except Exception as e:
110137
raise Exception(f"Failed to write to ClickHouse: {e}")
111138

112139
def write_batch(self, data_list: list[dict]) -> None:
113140
"""Write multiple processed latency records to ClickHouse"""
114141
try:
115142
# Transform each record from nested processor format to flat storage format
116-
transformed_list = [transform_processor_output(d) for d in data_list]
143+
# Skip records with sample_count = 0
144+
transformed_list = []
145+
for d in data_list:
146+
try:
147+
transformed = transform_processor_output(d)
148+
transformed_list.append(transformed)
149+
except ValueError as e:
150+
if "sample_count = 0" not in str(e):
151+
raise
152+
153+
# Skip batch if no valid records
154+
if not transformed_list:
155+
return
156+
117157
processed_list = [ProcessedLatency(**d) for d in transformed_list]
118-
records = [p.to_dict() for p in processed_list]
158+
159+
# Convert each dict to list in column order for ClickHouse
160+
record_rows = []
161+
for p in processed_list:
162+
record_dict = p.to_dict()
163+
record_row = [
164+
record_dict['window_start_time'],
165+
record_dict['window_end_time'],
166+
record_dict['window_duration_seconds'],
167+
record_dict['cell_index'],
168+
record_dict['network'],
169+
record_dict['rsrp_mean'], record_dict['rsrp_max'], record_dict['rsrp_min'], record_dict['rsrp_std'],
170+
record_dict['sinr_mean'], record_dict['sinr_max'], record_dict['sinr_min'], record_dict['sinr_std'],
171+
record_dict['rsrq_mean'], record_dict['rsrq_max'], record_dict['rsrq_min'], record_dict['rsrq_std'],
172+
record_dict['latency_mean'], record_dict['latency_max'], record_dict['latency_min'], record_dict['latency_std'],
173+
record_dict['cqi_mean'], record_dict['cqi_max'], record_dict['cqi_min'], record_dict['cqi_std'],
174+
record_dict['primary_bandwidth'],
175+
record_dict['ul_bandwidth'],
176+
record_dict['sample_count']
177+
]
178+
record_rows.append(record_row)
119179

120180
# Use async_insert for better performance
121181
self.client.insert(
122182
'analytics.processed_latency',
123-
records,
183+
record_rows,
124184
settings={'async_insert': 1, 'wait_for_async_insert': 0}
125185
)
126186
except Exception as e:

0 commit comments

Comments
 (0)