Conversation
|
The remaining errors were not added by this pr |
There was a problem hiding this comment.
Pull request overview
This PR implements a flexible, configuration-driven data storage system for NWDAF that replaces hardcoded Pydantic models with a dynamic schema system based on YAML configuration files. The changes enable the system to handle multiple analytics types without code modifications.
Changes:
- Introduced YAML-based schema configuration system (SchemaConf) that defines data fields, types, and InfluxDB tags
- Refactored Raw model from Pydantic to plain Python class with dynamic field validation
- Replaced ProcessedLatency Pydantic model with dict-based approach and generic ClickHouse table
- Updated API endpoints from
/processed/latency/to/processed/to reflect generic nature - Simplified Docker setup by using ClickHouse's native initialization mechanism
Reviewed changes
Copilot reviewed 26 out of 27 changed files in this pull request and generated 24 comments.
Show a summary per file
| File | Description |
|---|---|
| src/configs/schema_conf.py | New configuration loader for YAML-based schema definitions with type parsing |
| src/configs/conf.py | Added generic load() method to base configuration class |
| src/configs/init.py | Added load_all() function to initialize all configurations |
| src/models/raw.py | Refactored from Pydantic to plain class with dynamic field validation against schema config |
| src/models/processed_latency.py | Deleted - replaced with dict-based approach |
| src/services/clickhouse.py | Dynamic data transformation that queries schema at runtime and supports flexible fields |
| src/services/clickhouse_query.py | Simplified query to use SELECT * for generic table |
| src/routers/v1/processed.py | New generic router replacing latency-specific endpoint |
| src/routers/v1/latency_router.py | Deleted - replaced by generic processed router |
| src/routers/v1/init.py | Updated router registration with generic "data" tags |
| src/routers/v1/raw_router.py | Code formatting improvements |
| sql/01_create_processed_table.sql | Renamed table to generic "processed" with nullable fields for all metric types |
| docker/Dockerfile | Added confs/ directory copy for runtime configuration |
| docker/Dockerfile.clickhouse | New Dockerfile using native ClickHouse initialization |
| docker-compose.yml | Removed separate init container, simplified ClickHouse setup |
| init-clickhouse.sh | Deleted - replaced by native ClickHouse init |
| Dockerfile.clickhouse-init | Deleted - no longer needed |
| confs/*.yml.example | Example configuration files for core fields, extra fields, and tags |
| .gitignore | Added confs/*.yml to ignore actual config files |
| .env.example | New example environment file |
| main.py | Added load_all() call to initialize schema configurations |
| tests/* | Updated tests to work with dict-based models and mock schema |
Comments suppressed due to low confidence (3)
sql/01_create_processed_table.sql:40
- The table schema has been changed to make critical fields like "network" and temporal fields nullable or moved to the end. The cell_index is at the top but window_start_time, window_end_time, and window_duration_seconds (which are part of the ORDER BY) are now at the bottom. While this doesn't affect functionality, it's unconventional to have ORDER BY columns at the end of the schema. Consider grouping related fields together for better readability.
sql/01_create_processed_table.sql:43 - The data_type field was added to distinguish between different analytics types (latency, anomaly, etc.) but it's not included in the ORDER BY clause. If queries frequently filter by data_type, this could lead to poor query performance. Consider adding data_type to the ORDER BY clause as: ORDER BY (cell_index, data_type, window_start_time) for better query performance when filtering by analytics type.
src/services/clickhouse_query.py:10 - The query uses SELECT * which will return all columns including the new data_type field. However, there's no filter for data_type in the WHERE clause. This means the query will return mixed analytics types (latency, anomaly, etc.) if they exist in the same table. Consider adding an optional data_type parameter to allow filtering by analytics type, or document that this query returns all types.
processed = """
SELECT
*
FROM analytics.processed
WHERE cell_index = {cell_index:Int32}
AND window_duration_seconds = {window_duration_seconds:Int32}
AND toUnixTimestamp(window_start_time) >= {start_time:Int64}
AND toUnixTimestamp(window_end_time) <= {end_time:Int64}
ORDER BY window_end_time DESC
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
src/routers/v1/processed.py
Outdated
| def get_processed_latency( | ||
| start_time: int = Query( | ||
| ..., description="Window start time (Unix timestamp in seconds)" | ||
| ), | ||
| end_time: int = Query( | ||
| ..., description="Window end time (Unix timestamp in seconds)" | ||
| ), | ||
| cell_index: int = Query(..., description="Cell index (required)"), | ||
| window_duration_seconds: int = Query( | ||
| ..., description="Duration of the target windows" | ||
| ), | ||
| offset: int = Query(0, ge=0, description="Number of records to skip"), | ||
| limit: int = Query( | ||
| 100, ge=1, le=1000, description="Maximum number of records to return" | ||
| ), | ||
| ): | ||
| """ | ||
| Query processed data with various filters. | ||
|
|
||
| Returns aggregated statistics over time windows including: | ||
| - Signal quality metrics | ||
| - Performance metrics | ||
| - Network information | ||
| - Statistical measures for each metric | ||
| """ |
There was a problem hiding this comment.
The endpoint function is still named "get_processed_latency" even though the endpoint is now generic and handles all types of processed data, not just latency. The docstring also still references "latency" specifically. This should be renamed to "get_processed_data" and the docstring updated to reflect the generic nature of the endpoint.
| @@ -41,29 +43,13 @@ services: | |||
| timeout: 3s | |||
| retries: 5 | |||
|
|
|||
There was a problem hiding this comment.
The clickhouse-init service has been removed, which simplifies the architecture. However, ensure that the new approach using /docker-entrypoint-initdb.d/ in the ClickHouse image properly waits for ClickHouse to be ready before executing the SQL scripts. The removed init-clickhouse.sh had explicit health checks and retries. The standard ClickHouse entrypoint should handle this, but it's worth verifying in testing.
| from_attributes=True | ||
| ) | ||
| # Do not change the following if statement if timestamp is not ensured to be present in data | ||
| # The current approach allows core features to fe configurable but influxdb always needs timestamp |
There was a problem hiding this comment.
There is a typo in the comment on line 27: "fe" should be "be". The comment reads "allows core features to fe configurable" but should read "allows core features to be configurable".
| # The current approach allows core features to fe configurable but influxdb always needs timestamp | |
| # The current approach allows core features to be configurable but influxdb always needs timestamp |
| @@ -0,0 +1,3 @@ | |||
| FROM clickhouse/clickhouse-server:latest | |||
|
|
|||
There was a problem hiding this comment.
The SQL files are copied to /docker-entrypoint-initdb.d/ which is the standard ClickHouse initialization directory. However, there's no guarantee about the execution order of multiple SQL files. The file is named with a "01_" prefix suggesting ordering, but this should be documented or verified that ClickHouse processes files in alphanumeric order. If there are dependencies between SQL files, this could cause initialization failures.
| # NOTE: The ClickHouse Docker entrypoint processes files in /docker-entrypoint-initdb.d/ | |
| # in lexicographical (alphanumeric) order. SQL files in sql/ should be named with | |
| # appropriate numeric prefixes (e.g., 01_, 02_) to ensure dependent scripts run in sequence. |
| core_fields: dict[str, type] = {} | ||
| extra_fields: dict[str, type] = {} | ||
| tags: set[str] = set() |
There was a problem hiding this comment.
The SchemaConf class variables (core_fields, extra_fields, tags) are class-level and never cleared. In a long-running service, if the configuration is reloaded multiple times, this is fine. However, in test environments, this can cause test pollution if tests don't properly reset the state. The test file test_singleton_databases.py resets database singletons but doesn't reset SchemaConf state, which could lead to test interdependencies.
| column_names = list(transformed_list[0].keys()) | ||
|
|
||
| values = [list(d.values()) for d in transformed_list] | ||
|
|
There was a problem hiding this comment.
The write_batch method assumes all items in transformed_list have the same keys and uses the keys from the first item for column_names. If different records in the batch have different fields (which is possible in a flexible schema system), this will cause data corruption or insertion errors. The values will be extracted in dictionary iteration order which may differ between items, causing values to be written to wrong columns.
| column_names = list(transformed_list[0].keys()) | |
| values = [list(d.values()) for d in transformed_list] | |
| # Build a unified, deterministic column list across all transformed records | |
| all_columns: set[str] = set() | |
| for item in transformed_list: | |
| all_columns.update(item.keys()) | |
| column_names = sorted(all_columns) | |
| # For each record, align values to the unified column order | |
| values = [[item.get(col) for col in column_names] for item in transformed_list] |
| core_fields: dict[str, type] = {} | ||
| extra_fields: dict[str, type] = {} | ||
| tags: set[str] = set() |
There was a problem hiding this comment.
The class attributes core_fields, extra_fields, and tags are initialized with mutable defaults (empty dict and set). In Python, mutable default arguments are shared across all instances. While this might be intentional for a singleton-like pattern, it's safer to initialize these in load_yml() or use None as defaults and check for None before returning them. This prevents potential issues if the class is instantiated multiple times or if tests don't properly clean up state.
| # Should return False because sample_count is a required field | ||
| # The transform function will raise ValueError for missing required fields | ||
| assert result is False |
There was a problem hiding this comment.
The test expectation has changed from "assert result is True" to "assert result is False" with a comment explaining that sample_count is a required field. However, this assumption may not be accurate - the actual behavior depends on whether the transform function in the ClickHouse sink catches and handles the ValueError. The test should verify the actual exception or error handling behavior rather than assuming a False return value.
| v1_router = APIRouter() | ||
| v1_router.include_router(latencyR, prefix="/processed", tags=["v1", "latency"]) | ||
| v1_router.include_router(rawR, prefix="/raw", tags=["v1", "latency"]) | ||
| v1_router.include_router(latencyR, prefix="/processed", tags=["v1", "data"]) |
There was a problem hiding this comment.
The tags in the router registration have changed from "latency" to "data" but the function is still named "get_processed_latency". This naming inconsistency could be confusing. Consider renaming the function to "get_processed_data" to match the generic nature of the endpoint and the tag name.
docker/Dockerfile.clickhouse
Outdated
| @@ -0,0 +1,3 @@ | |||
| FROM clickhouse/clickhouse-server:latest | |||
There was a problem hiding this comment.
The Dockerfile.clickhouse uses the mutable base image tag clickhouse/clickhouse-server:latest, which introduces a supply chain risk because future builds may automatically pull a compromised or incompatible image. An attacker who compromises the upstream image registry could gain code execution inside your ClickHouse container and access or exfiltrate analytics data. To reduce this risk, pin the base image to a specific, trusted version or immutable digest and update it deliberately as part of your release process.
Summary
This PR implements a config-driven, schema-agnostic data storage system for NWDAF that can handle multiple analytics types (latency, anomaly detection, etc.) without requiring code changes.
Key Changes
Configuration System
confs/for defining data fields (RAW):core_fields.yml- Required fields (timestamp, cell_index)extra_fields.yml- Optional metricstag_fields.yml- InfluxDB indexed tagssrc/configs/schema_conf.py- Config loader with type parsing (YAML strings → Python types)Raw Data Pipeline (InfluxDB)
src/models/raw.pyto plain classProcessed Data Pipeline (ClickHouse)
analytics.processedtable with nullable columns for all metric typescolumn_namesparameter for dynamic insertsAPI Changes
/processed/latency/→/processed/ProcessedLatencymodel - endpoints now return plain dicts/exampleendpoint that generates from actual schemaDocker Setup
docker/directory/docker-entrypoint-initdb.d(removed separate init container)How it was tested
The whole PEI project was deployed locally and it was possible to conclude that services are correctly integrating with data storage. This means that these changes have solved some problems without breaking things.