Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
INFLUX_PORT=8086
INFLUX_ADMIN_USER=admin
INFLUX_ADMIN_PASSWORD=changeme
INFLUX_ORG=nwdaf
INFLUX_BUCKET=raw_data
INFLUX_TOKEN=your-super-secret-token

CLICKHOUSE_HTTP_PORT=8123
CLICKHOUSE_TCP_PORT=9000
CLICKHOUSE_DB=analytics
CLICKHOUSE_USER=default
CLICKHOUSE_PASSWORD=changeme
6 changes: 3 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ __pycache__/

# C extensions
*.so

confs/*.yml
# Distribution / packaging
.Python
build/
Expand Down Expand Up @@ -182,9 +182,9 @@ cython_debug/
.abstra/

# Visual Studio Code
# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore
# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore
# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore
# and can be added to the global gitignore or merged into this file. However, if you prefer,
# and can be added to the global gitignore or merged into this file. However, if you prefer,
# you could uncomment the following to ignore the entire vscode folder
# .vscode/

Expand Down
9 changes: 0 additions & 9 deletions Dockerfile.clickhouse-init

This file was deleted.

2 changes: 2 additions & 0 deletions confs/core_fields.yml.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
timestamp: datetime
cell_index: integer
12 changes: 12 additions & 0 deletions confs/extra_fields.yml.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Optional/allowed extra fields for raw data points

datarate: float
mean_latency: float
rsrp: float
sinr: float
rsrq: float
direction: string
network: string
cqi: float
primary_bandwidth: float
ul_bandwidth: float
2 changes: 2 additions & 0 deletions confs/tag_fields.yml.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- cell_index
- network
22 changes: 4 additions & 18 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ services:
restart: unless-stopped

clickhouse:
image: clickhouse/clickhouse-server:latest
build:
context: .
dockerfile: docker/Dockerfile.clickhouse
container_name: clickhouse
ports:
- "${CLICKHOUSE_HTTP_PORT}:${CLICKHOUSE_HTTP_PORT}"
Expand All @@ -41,29 +43,13 @@ services:
timeout: 3s
retries: 5

clickhouse-init:
build:
context: .
dockerfile: Dockerfile.clickhouse-init
container_name: clickhouse-init
depends_on:
clickhouse:
condition: service_healthy
environment:
- CLICKHOUSE_HOST=clickhouse
- CLICKHOUSE_USER=${CLICKHOUSE_USER}
- CLICKHOUSE_PASSWORD=${CLICKHOUSE_PASSWORD}
networks:
- nwdaf-network
restart: "no"

data-storage:
container_name: data-storage
ports:
- "8000:8000"
build:
context: .
dockerfile: Dockerfile
dockerfile: docker/Dockerfile
environment:
- INFLUX_URL=http://influxdb:8086
- INFLUX_TOKEN=${INFLUX_TOKEN}
Expand Down
1 change: 1 addition & 0 deletions Dockerfile → docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ COPY requirements.txt ./
RUN uv pip install --system -r requirements.txt

COPY src/ ./src/
COPY confs/ ./confs/
COPY main.py .

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
3 changes: 3 additions & 0 deletions docker/Dockerfile.clickhouse
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
FROM clickhouse/clickhouse-server:24.1.2.5

Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SQL files are copied to /docker-entrypoint-initdb.d/ which is the standard ClickHouse initialization directory. However, there's no guarantee about the execution order of multiple SQL files. The file is named with a "01_" prefix suggesting ordering, but this should be documented or verified that ClickHouse processes files in alphanumeric order. If there are dependencies between SQL files, this could cause initialization failures.

Suggested change
# NOTE: The ClickHouse Docker entrypoint processes files in /docker-entrypoint-initdb.d/
# in lexicographical (alphanumeric) order. SQL files in sql/ should be named with
# appropriate numeric prefixes (e.g., 01_, 02_) to ensure dependent scripts run in sequence.

Copilot uses AI. Check for mistakes.
COPY sql/ /docker-entrypoint-initdb.d/
34 changes: 0 additions & 34 deletions init-clickhouse.sh

This file was deleted.

20 changes: 7 additions & 13 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import os
import asyncio
from threading import Thread
import os
from contextlib import asynccontextmanager
from threading import Thread

from fastapi import FastAPI

from src.configs import load_all
from src.routers.v1 import v1_router
from src.services.databases import ClickHouse, Influx

from src.sink import KafkaSinkManager

KAFKA_HOST = os.getenv("KAFKA_HOST", "localhost")
Expand All @@ -19,7 +20,7 @@ async def lifespan(app: FastAPI):
# Initialize database connections (singleton, handles connection internally)
Influx.service # Access triggers lazy initialization
ClickHouse.service # Access triggers lazy initialization

load_all()
Comment on lines 21 to +23
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The schema configuration is loaded AFTER the database services are initialized. This will cause a runtime error because the Raw class instantiation in InfluxService.write_data (line 67) and write_batch (line 73) calls SchemaConf.get_core_fields() and SchemaConf.get_extra_fields() which will return empty dictionaries if load_all() hasn't been called yet.

The database services are initialized on lines 21-22 (via property access triggering lazy init), but load_all() is only called on line 23. This means any attempt to write data to InfluxDB before SchemaConf is loaded will fail.

Copilot uses AI. Check for mistakes.
sink_manager = KafkaSinkManager(KAFKA_HOST, KAFKA_PORT)

def kafka_worker():
Expand All @@ -33,17 +34,10 @@ def kafka_worker():
except Exception as e:
print(f"Kafka worker crashed: {e}")

kafka_thread = Thread(
target=kafka_worker,
daemon=True,
name="kafka-sink-thread"
)
kafka_thread = Thread(target=kafka_worker, daemon=True, name="kafka-sink-thread")
kafka_thread.start()

print(
f"API started (Kafka connecting in background to "
f"{KAFKA_HOST}:{KAFKA_PORT})"
)
print(f"API started (Kafka connecting in background to {KAFKA_HOST}:{KAFKA_PORT})")

yield

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
CREATE DATABASE IF NOT EXISTS analytics;

CREATE TABLE IF NOT EXISTS analytics.processed_latency
CREATE TABLE IF NOT EXISTS analytics.processed
(
window_start_time DateTime64(3),
window_end_time DateTime64(3),
window_duration_seconds Float64,

cell_index Int32,
network String,

network Nullable(String),
data_type Nullable(String), -- e.g., 'latency', 'anomaly', etc.

rsrp_mean Nullable(Float64),
rsrp_max Nullable(Float64),
Expand Down Expand Up @@ -37,7 +34,10 @@ CREATE TABLE IF NOT EXISTS analytics.processed_latency
primary_bandwidth Nullable(Float64),
ul_bandwidth Nullable(Float64),

sample_count Int32
sample_count Int32,
window_start_time DateTime64(3),
window_end_time DateTime64(3),
window_duration_seconds Float64
)
ENGINE = MergeTree
ORDER BY (cell_index, window_start_time)
Expand Down
13 changes: 13 additions & 0 deletions src/configs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from .clickhouse_conf import ClickhouseConf
from .conf import Conf
from .influx_conf import InfluxConf
from .schema_conf import SchemaConf

__all__ = ["ClickhouseConf", "InfluxConf", "SchemaConf"]


def load_all() -> None:
"""Load all configs"""
config: Conf
for config in [ClickhouseConf, InfluxConf, SchemaConf]:
config.load()
13 changes: 7 additions & 6 deletions src/configs/clickhouse_conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@

logger = logging.getLogger("Config")


class ClickhouseConf(Conf):
host: str
port: int
user: str
host: str
port: int
user: str
password: str

_instance = None
Expand All @@ -26,9 +27,9 @@ def __init__(self):
@classmethod
def load_env(cls, file: str = ".env") -> None:

cls.host = os.getenv("CLICKHOUSE_HOST", "localhost")
cls.port = int(os.getenv("CLICKHOUSE_PORT", "9000"))
cls.user = os.getenv("CLICKHOUSE_USER", "default")
cls.host = os.getenv("CLICKHOUSE_HOST", "localhost")
cls.port = int(os.getenv("CLICKHOUSE_PORT", "9000"))
cls.user = os.getenv("CLICKHOUSE_USER", "default")
cls.password = os.getenv("CLICKHOUSE_PASSWORD", "")

cls._loaded = True
Expand Down
8 changes: 6 additions & 2 deletions src/configs/conf.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
class Conf:
@classmethod
def load_env(cls,file:str=".env")->None:
def load_env(cls, file: str = ".env") -> None:
"""
Load configuration from env
"""
raise NotImplementedError

@classmethod
def get(cls)->dict:
def get(cls) -> dict:
"""
Get a dict with the loaded confs.
If not loaded yet, then load in the function
"""
raise NotImplementedError

@classmethod
def load(cls) -> None:
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new load() method is added to the base Conf class but calls load_env(). However, SchemaConf overrides load() to call load_yml() instead. This is correct but creates an inconsistency - some configs use environment variables while SchemaConf uses YAML files. This mixed approach could be confusing. Consider documenting this design decision or creating separate base classes for env-based vs file-based configs.

Suggested change
def load(cls) -> None:
def load(cls) -> None:
"""
Load configuration for this Conf subclass.
By default, this delegates to ``load_env`` so that env-based
configuration classes only need to implement ``load_env``.
Subclasses that load configuration from other sources (for example,
YAML files) are expected to override this method and provide their
own loading logic instead of, or in addition to, calling
``load_env``.
"""

Copilot uses AI. Check for mistakes.
cls.load_env()
Loading
Loading