Skip to content

Commit 1beaef5

Browse files
Merge pull request #16 from ATNoG/hotfix/sinkmanager_cancel_error
Finally fixed ig
2 parents ad4dbea + 7f365db commit 1beaef5

File tree

5 files changed

+87
-4
lines changed

5 files changed

+87
-4
lines changed

src/services/clickhouse.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,34 @@ def get_data(self, batch_number: int = 1, batch_size: int = 50) -> list:
2323
return []
2424

2525
def write_data(self, data: dict) -> None:
26-
pass
26+
"""Write a single processed latency record to ClickHouse"""
27+
try:
28+
processed = ProcessedLatency(**data)
29+
record = processed.to_dict()
30+
31+
# Use async_insert for better performance
32+
self.client.insert(
33+
'analytics.processed_latency',
34+
[record],
35+
settings={'async_insert': 1, 'wait_for_async_insert': 0}
36+
)
37+
except Exception as e:
38+
raise Exception(f"Failed to write to ClickHouse: {e}")
2739

2840
def write_batch(self, data_list: list[dict]) -> None:
29-
pass
41+
"""Write multiple processed latency records to ClickHouse"""
42+
try:
43+
processed_list = [ProcessedLatency(**d) for d in data_list]
44+
records = [p.to_dict() for p in processed_list]
45+
46+
# Use async_insert for better performance
47+
self.client.insert(
48+
'analytics.processed_latency',
49+
records,
50+
settings={'async_insert': 1, 'wait_for_async_insert': 0}
51+
)
52+
except Exception as e:
53+
raise Exception(f"Failed to batch write to ClickHouse: {e}")
3054

3155

3256
def query_processed_latency(self,

src/services/influx.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from influxdb_client.client.influxdb_client import InfluxDBClient
2-
from influxdb_client.client.write_api import SYNCHRONOUS, WriteApi
2+
from influxdb_client.client.write_api import ASYNCHRONOUS, WriteApi
33
from influxdb_client.client.query_api import QueryApi
44
from src.configs.influx_conf import InfluxConf
55
from src.models.raw import Raw
@@ -21,7 +21,7 @@ def connect(self):
2121
)
2222

2323
# get iteration apis
24-
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
24+
self.write_api = self.client.write_api(write_options=ASYNCHRONOUS)
2525
self.query_api = self.client.query_api()
2626

2727
def get_data(self, batch_number:int = 1 ,batch_size:int = 50) -> list[Raw]:

src/sink.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@ async def start(self, *topics):
6060
self.bridge.add_n_topics(topics, bind=self.route_message)
6161

6262
await self.bridge.start_consumer()
63+
64+
# Keep the event loop alive - wait for the consumer task to complete
65+
# This prevents asyncio.run() from exiting and cancelling the consumer task
66+
if self.bridge._consumer_task:
67+
await self.bridge._consumer_task
6368

6469
async def stop(self):
6570
if self.bridge is not None:

tests/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ def __init__(self, *topics, hostname=None, port=None):
1212
self.port = port
1313
self._closed = False
1414
self._callbacks = {}
15+
self._consumer_task = None # Added for consumer task tracking
1516

1617
def add_n_topics(self, topics, bind=None):
1718
"""Mock add_n_topics method."""

tests/test_clickhouse_service.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,59 @@ def test_connect(self, mock_clickhouse_client):
3939

4040
assert service.client == mock_clickhouse_client
4141

42+
def test_write_data(self, clickhouse_service, mock_clickhouse_client):
43+
"""Test writing a single ProcessedLatency record to ClickHouse."""
44+
# Prepare test data
45+
test_data = {
46+
'window_start_time': datetime(2024, 1, 1, 12, 0, 0, tzinfo=timezone.utc),
47+
'window_end_time': datetime(2024, 1, 1, 12, 5, 0, tzinfo=timezone.utc),
48+
'window_duration_seconds': 300.0,
49+
'cell_index': 1,
50+
'network': '5G',
51+
'rsrp_mean': -80.0,
52+
'rsrp_max': -70.0,
53+
'rsrp_min': -90.0,
54+
'rsrp_std': 5.0,
55+
'sinr_mean': 15.0,
56+
'sinr_max': 20.0,
57+
'sinr_min': 10.0,
58+
'sinr_std': 3.0,
59+
'rsrq_mean': -10.0,
60+
'rsrq_max': -8.0,
61+
'rsrq_min': -12.0,
62+
'rsrq_std': 1.5,
63+
'latency_mean': 20.0,
64+
'latency_max': 30.0,
65+
'latency_min': 10.0,
66+
'latency_std': 5.0,
67+
'cqi_mean': 12.0,
68+
'cqi_max': 15.0,
69+
'cqi_min': 10.0,
70+
'cqi_std': 2.0,
71+
'primary_bandwidth': 100.0,
72+
'ul_bandwidth': 50.0,
73+
'sample_count': 100
74+
}
75+
76+
# Execute write
77+
clickhouse_service.write_data(test_data)
78+
79+
# Verify insert was called with correct parameters
80+
mock_clickhouse_client.insert.assert_called_once()
81+
call_args = mock_clickhouse_client.insert.call_args
82+
83+
# Check table name
84+
assert call_args[0][0] == 'analytics.processed_latency'
85+
86+
# Check data format (should be a list with one dict)
87+
assert isinstance(call_args[0][1], list)
88+
assert len(call_args[0][1]) == 1
89+
90+
# Check async insert settings
91+
assert call_args[1]['settings']['async_insert'] == 1
92+
assert call_args[1]['settings']['wait_for_async_insert'] == 0
93+
94+
4295
def test_query_processed_latency_success(self, clickhouse_service, mock_clickhouse_client):
4396
"""Test successful query of processed latency data."""
4497
# Mock query result

0 commit comments

Comments
 (0)