Skip to content

Commit d10a017

Browse files
Merge pull request #19 from ATNoG/hotfix/processed_data_format
Add function to format data
2 parents 1beaef5 + 5812484 commit d10a017

File tree

2 files changed

+244
-38
lines changed

2 files changed

+244
-38
lines changed

src/services/clickhouse.py

Lines changed: 79 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,81 @@
1-
from datetime import datetime
1+
from datetime import datetime, timezone
22
import clickhouse_connect
33
from clickhouse_connect.driver.client import Client
44
from src.configs.clickhouse_conf import ClickhouseConf
55
from src.models.processed_latency import ProcessedLatency
66
from src.services.db_service import DBService
77
from src.services.query import QueryCH
88

9+
10+
def transform_processor_output(data: dict) -> dict:
11+
"""
12+
Transform processor's nested output format to flat storage format.
13+
14+
Processor format:
15+
{
16+
"type": "latency",
17+
"cell_index": 123,
18+
"window_start": 1733684400, // Unix timestamp in seconds
19+
"window_end": 1733684410,
20+
"rsrp": {"mean": -85.5, "max": -80, "min": -90, "std": 2.5, "samples": 100},
21+
"mean_latency": {...},
22+
...
23+
}
24+
25+
Storage format:
26+
{
27+
"window_start_time": datetime,
28+
"window_end_time": datetime,
29+
"window_duration_seconds": 10.0,
30+
"cell_index": 123,
31+
"rsrp_mean": -85.5,
32+
"rsrp_max": -80,
33+
...
34+
}
35+
"""
36+
# Extract timestamps and convert to datetime
37+
window_start = data.get("window_start")
38+
window_end = data.get("window_end")
39+
40+
if window_start is None or window_end is None:
41+
raise ValueError("Missing window_start or window_end in processor output")
42+
43+
window_start_dt = datetime.fromtimestamp(window_start, tz=timezone.utc)
44+
window_end_dt = datetime.fromtimestamp(window_end, tz=timezone.utc)
45+
window_duration = window_end - window_start
46+
47+
# Build flat structure
48+
transformed = {
49+
"window_start_time": window_start_dt,
50+
"window_end_time": window_end_dt,
51+
"window_duration_seconds": float(window_duration),
52+
"cell_index": data.get("cell_index"),
53+
"network": data.get("network"),
54+
"sample_count": data.get("sample_count"),
55+
"primary_bandwidth": data.get("primary_bandwidth"),
56+
"ul_bandwidth": data.get("ul_bandwidth"),
57+
}
58+
59+
# Flatten nested metric structures
60+
# Processor uses "mean_latency", storage uses "latency"
61+
metric_mapping = {
62+
"rsrp": "rsrp",
63+
"sinr": "sinr",
64+
"rsrq": "rsrq",
65+
"mean_latency": "latency", # Map mean_latency -> latency
66+
"cqi": "cqi"
67+
}
68+
69+
for processor_key, storage_key in metric_mapping.items():
70+
metric_data = data.get(processor_key)
71+
if metric_data and isinstance(metric_data, dict):
72+
transformed[f"{storage_key}_mean"] = metric_data.get("mean")
73+
transformed[f"{storage_key}_max"] = metric_data.get("max")
74+
transformed[f"{storage_key}_min"] = metric_data.get("min")
75+
transformed[f"{storage_key}_std"] = metric_data.get("std")
76+
77+
return transformed
78+
979
class ClickHouseService(DBService):
1080
def __init__(self) -> None:
1181
self.conf = ClickhouseConf()
@@ -25,9 +95,11 @@ def get_data(self, batch_number: int = 1, batch_size: int = 50) -> list:
2595
def write_data(self, data: dict) -> None:
2696
"""Write a single processed latency record to ClickHouse"""
2797
try:
28-
processed = ProcessedLatency(**data)
98+
# Transform nested processor format to flat storage format
99+
transformed = transform_processor_output(data)
100+
processed = ProcessedLatency(**transformed)
29101
record = processed.to_dict()
30-
102+
31103
# Use async_insert for better performance
32104
self.client.insert(
33105
'analytics.processed_latency',
@@ -40,9 +112,11 @@ def write_data(self, data: dict) -> None:
40112
def write_batch(self, data_list: list[dict]) -> None:
41113
"""Write multiple processed latency records to ClickHouse"""
42114
try:
43-
processed_list = [ProcessedLatency(**d) for d in data_list]
115+
# Transform each record from nested processor format to flat storage format
116+
transformed_list = [transform_processor_output(d) for d in data_list]
117+
processed_list = [ProcessedLatency(**d) for d in transformed_list]
44118
records = [p.to_dict() for p in processed_list]
45-
119+
46120
# Use async_insert for better performance
47121
self.client.insert(
48122
'analytics.processed_latency',

tests/test_clickhouse_service.py

Lines changed: 165 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import pytest
22
from unittest.mock import MagicMock, patch
33
from datetime import datetime, timezone
4-
from src.services.clickhouse import ClickHouseService
4+
from src.services.clickhouse import ClickHouseService, transform_processor_output
55
from src.models.processed_latency import ProcessedLatency
66

77

@@ -22,6 +22,123 @@ def clickhouse_service(mock_clickhouse_client):
2222
return service
2323

2424

25+
class TestTransformProcessorOutput:
26+
"""Tests for transform_processor_output function."""
27+
28+
def test_transform_complete_data(self):
29+
"""Test transformation of complete processor output."""
30+
processor_output = {
31+
"type": "latency",
32+
"cell_index": 123,
33+
"network": "5G",
34+
"window_start": 1733684400,
35+
"window_end": 1733684410,
36+
"sample_count": 100,
37+
"primary_bandwidth": 100.0,
38+
"ul_bandwidth": 50.0,
39+
"rsrp": {
40+
"mean": -85.5,
41+
"max": -80.0,
42+
"min": -90.0,
43+
"std": 2.5,
44+
"samples": 100
45+
},
46+
"sinr": {
47+
"mean": 15.0,
48+
"max": 20.0,
49+
"min": 10.0,
50+
"std": 3.0,
51+
"samples": 100
52+
},
53+
"rsrq": {
54+
"mean": -10.0,
55+
"max": -8.0,
56+
"min": -12.0,
57+
"std": 1.5,
58+
"samples": 100
59+
},
60+
"mean_latency": {
61+
"mean": 20.0,
62+
"max": 30.0,
63+
"min": 10.0,
64+
"std": 5.0,
65+
"samples": 100
66+
},
67+
"cqi": {
68+
"mean": 12.0,
69+
"max": 15.0,
70+
"min": 10.0,
71+
"std": 2.0,
72+
"samples": 100
73+
}
74+
}
75+
76+
result = transform_processor_output(processor_output)
77+
78+
# Check timestamp conversion
79+
assert result["window_start_time"] == datetime.fromtimestamp(1733684400, tz=timezone.utc)
80+
assert result["window_end_time"] == datetime.fromtimestamp(1733684410, tz=timezone.utc)
81+
assert result["window_duration_seconds"] == 10.0
82+
83+
# Check metadata
84+
assert result["cell_index"] == 123
85+
assert result["network"] == "5G"
86+
assert result["sample_count"] == 100
87+
assert result["primary_bandwidth"] == 100.0
88+
assert result["ul_bandwidth"] == 50.0
89+
90+
# Check flattened RSRP
91+
assert result["rsrp_mean"] == -85.5
92+
assert result["rsrp_max"] == -80.0
93+
assert result["rsrp_min"] == -90.0
94+
assert result["rsrp_std"] == 2.5
95+
96+
# Check flattened latency (mean_latency -> latency)
97+
assert result["latency_mean"] == 20.0
98+
assert result["latency_max"] == 30.0
99+
assert result["latency_min"] == 10.0
100+
assert result["latency_std"] == 5.0
101+
102+
def test_transform_with_null_metrics(self):
103+
"""Test transformation when some metrics are missing."""
104+
processor_output = {
105+
"cell_index": 123,
106+
"network": "5G",
107+
"window_start": 1733684400,
108+
"window_end": 1733684410,
109+
"sample_count": 50,
110+
"rsrp": {
111+
"mean": -85.5,
112+
"max": -80.0,
113+
"min": -90.0,
114+
"std": 2.5,
115+
"samples": 50
116+
}
117+
# Missing sinr, rsrq, mean_latency, cqi
118+
}
119+
120+
result = transform_processor_output(processor_output)
121+
122+
# Check that RSRP is present
123+
assert result["rsrp_mean"] == -85.5
124+
125+
# Check that missing metrics result in None values
126+
assert "sinr_mean" not in result or result["sinr_mean"] is None
127+
assert "latency_mean" not in result or result["latency_mean"] is None
128+
129+
def test_transform_missing_window_times(self):
130+
"""Test that transformation raises error when window times are missing."""
131+
processor_output = {
132+
"cell_index": 123,
133+
"network": "5G",
134+
"sample_count": 100
135+
# Missing window_start and window_end
136+
}
137+
138+
with pytest.raises(ValueError, match="Missing window_start or window_end"):
139+
transform_processor_output(processor_output)
140+
141+
25142
class TestClickHouseService:
26143
"""Tests for ClickHouseService class."""
27144

@@ -40,37 +157,52 @@ def test_connect(self, mock_clickhouse_client):
40157
assert service.client == mock_clickhouse_client
41158

42159
def test_write_data(self, clickhouse_service, mock_clickhouse_client):
43-
"""Test writing a single ProcessedLatency record to ClickHouse."""
44-
# Prepare test data
160+
"""Test writing a single ProcessedLatency record to ClickHouse using processor format."""
161+
# Prepare test data in processor format (nested structure)
45162
test_data = {
46-
'window_start_time': datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
47-
'window_end_time': datetime(2024, 1, 1, 12, 5, 0, tzinfo=timezone.utc),
48-
'window_duration_seconds': 300.0,
49-
'cell_index': 1,
50-
'network': '5G',
51-
'rsrp_mean': -80.0,
52-
'rsrp_max': -70.0,
53-
'rsrp_min': -90.0,
54-
'rsrp_std': 5.0,
55-
'sinr_mean': 15.0,
56-
'sinr_max': 20.0,
57-
'sinr_min': 10.0,
58-
'sinr_std': 3.0,
59-
'rsrq_mean': -10.0,
60-
'rsrq_max': -8.0,
61-
'rsrq_min': -12.0,
62-
'rsrq_std': 1.5,
63-
'latency_mean': 20.0,
64-
'latency_max': 30.0,
65-
'latency_min': 10.0,
66-
'latency_std': 5.0,
67-
'cqi_mean': 12.0,
68-
'cqi_max': 15.0,
69-
'cqi_min': 10.0,
70-
'cqi_std': 2.0,
71-
'primary_bandwidth': 100.0,
72-
'ul_bandwidth': 50.0,
73-
'sample_count': 100
163+
"type": "latency",
164+
"cell_index": 1,
165+
"network": "5G",
166+
"window_start": 1704110400, # 2024-01-01 12:00:00 UTC
167+
"window_end": 1704110700, # 2024-01-01 12:05:00 UTC
168+
"sample_count": 100,
169+
"primary_bandwidth": 100.0,
170+
"ul_bandwidth": 50.0,
171+
"rsrp": {
172+
"mean": -80.0,
173+
"max": -70.0,
174+
"min": -90.0,
175+
"std": 5.0,
176+
"samples": 100
177+
},
178+
"sinr": {
179+
"mean": 15.0,
180+
"max": 20.0,
181+
"min": 10.0,
182+
"std": 3.0,
183+
"samples": 100
184+
},
185+
"rsrq": {
186+
"mean": -10.0,
187+
"max": -8.0,
188+
"min": -12.0,
189+
"std": 1.5,
190+
"samples": 100
191+
},
192+
"mean_latency": {
193+
"mean": 20.0,
194+
"max": 30.0,
195+
"min": 10.0,
196+
"std": 5.0,
197+
"samples": 100
198+
},
199+
"cqi": {
200+
"mean": 12.0,
201+
"max": 15.0,
202+
"min": 10.0,
203+
"std": 2.0,
204+
"samples": 100
205+
}
74206
}
75207

76208
# Execute write
@@ -82,11 +214,11 @@ def test_write_data(self, clickhouse_service, mock_clickhouse_client):
82214

83215
# Check table name
84216
assert call_args[0][0] == 'analytics.processed_latency'
85-
217+
86218
# Check data format (should be a list with one dict)
87219
assert isinstance(call_args[0][1], list)
88220
assert len(call_args[0][1]) == 1
89-
221+
90222
# Check async insert settings
91223
assert call_args[1]['settings']['async_insert'] == 1
92224
assert call_args[1]['settings']['wait_for_async_insert'] == 0

0 commit comments

Comments
 (0)