Skip to content

Commit ad4dbea

Browse files
authored
Merge pull request #15 from ATNoG/hotfix/kafkabridge_start
run kafkabridge on another thread
2 parents 3e5191c + b158b47 commit ad4dbea

File tree

2 files changed

+33
-21
lines changed

2 files changed

+33
-21
lines changed

main.py

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,46 @@
11
import os
2+
import asyncio
3+
from threading import Thread
24
from contextlib import asynccontextmanager
35
from fastapi import FastAPI
6+
47
from src.routers.v1 import v1_router
58
from src.routers.v1.latency import ClickHouse
69
from src.sink import KafkaSinkManager
7-
import asyncio
810

911
KAFKA_HOST = os.getenv("KAFKA_HOST", "localhost")
1012
KAFKA_PORT = os.getenv("KAFKA_PORT", "9092")
11-
KAFKA_TOPICS = ["raw-data","processed-data"]
13+
KAFKA_TOPICS = ["raw-data", "processed-data"]
14+
1215

1316
@asynccontextmanager
1417
async def lifespan(app: FastAPI):
15-
# Connect to databases
1618
ClickHouse.service.connect()
1719

18-
# Initialize and start Kafka sink manager
1920
sink_manager = KafkaSinkManager(KAFKA_HOST, KAFKA_PORT)
2021

21-
try:
22-
await asyncio.to_thread(sink_manager.start, *KAFKA_TOPICS)
23-
except Exception as e:
24-
print(f"Warning: Failed to start Kafka sink: {e}")
25-
print("API will continue running without Kafka sink")
22+
def kafka_worker():
23+
"""
24+
Kafka runs in its own thread with its own event loop.
25+
This prevents rdkafka from blocking FastAPI startup.
26+
"""
27+
try:
28+
asyncio.run(sink_manager.start(*KAFKA_TOPICS))
29+
print("Kafka consumer started successfully")
30+
except Exception as e:
31+
print(f"Kafka worker crashed: {e}")
32+
33+
kafka_thread = Thread(
34+
target=kafka_worker,
35+
daemon=True,
36+
name="kafka-sink-thread"
37+
)
38+
kafka_thread.start()
39+
40+
print(
41+
f"API started (Kafka connecting in background to "
42+
f"{KAFKA_HOST}:{KAFKA_PORT})"
43+
)
2644

2745
yield
2846

@@ -33,6 +51,7 @@ async def lifespan(app: FastAPI):
3351

3452
ClickHouse.service.client.close()
3553

54+
3655
app = FastAPI(lifespan=lifespan)
3756

3857
app.include_router(v1_router, prefix="/api/v1", tags=["v1"])
Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,44 @@
1-
-- Create processed_latency table for storing aggregated network metrics
2-
CREATE TABLE IF NOT EXISTS processed_latency (
3-
-- Window temporal information
1+
CREATE DATABASE IF NOT EXISTS analytics;
2+
3+
CREATE TABLE IF NOT EXISTS analytics.processed_latency
4+
(
45
window_start_time DateTime64(3),
56
window_end_time DateTime64(3),
67
window_duration_seconds Float64,
78

8-
-- Cell metadata
99
cell_index Int32,
1010
network String,
1111

12-
-- RSRP statistics (Signal power)
1312
rsrp_mean Nullable(Float64),
1413
rsrp_max Nullable(Float64),
1514
rsrp_min Nullable(Float64),
1615
rsrp_std Nullable(Float64),
1716

18-
-- SINR statistics (Interference + noise)
1917
sinr_mean Nullable(Float64),
2018
sinr_max Nullable(Float64),
2119
sinr_min Nullable(Float64),
2220
sinr_std Nullable(Float64),
2321

24-
-- RSRQ statistics (Signal quality)
2522
rsrq_mean Nullable(Float64),
2623
rsrq_max Nullable(Float64),
2724
rsrq_min Nullable(Float64),
2825
rsrq_std Nullable(Float64),
2926

30-
-- Latency statistics
3127
latency_mean Nullable(Float64),
3228
latency_max Nullable(Float64),
3329
latency_min Nullable(Float64),
3430
latency_std Nullable(Float64),
3531

36-
-- CQI statistics (Cell quality)
3732
cqi_mean Nullable(Float64),
3833
cqi_max Nullable(Float64),
3934
cqi_min Nullable(Float64),
4035
cqi_std Nullable(Float64),
4136

42-
-- Bandwidth information
4337
primary_bandwidth Nullable(Float64),
4438
ul_bandwidth Nullable(Float64),
4539

46-
-- Sample metadata (higher count = more confidence)
4740
sample_count Int32
4841
)
49-
ENGINE = MergeTree()
42+
ENGINE = MergeTree
5043
ORDER BY (cell_index, window_start_time)
5144
SETTINGS index_granularity = 8192;

0 commit comments

Comments
 (0)