Skip to content

Commit 5c1e000

Browse files
Add endpoint to query known cells
1 parent 05c2dcb commit 5c1e000

File tree

11 files changed

+75
-57
lines changed

11 files changed

+75
-57
lines changed

main.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@
55
from fastapi import FastAPI
66

77
from src.routers.v1 import v1_router
8-
from src.routers.v1.latency_router import ClickHouse
9-
from src.routers.v1.raw_router import Influx
8+
from src.services.databases import ClickHouse, Influx
109

1110
from src.sink import KafkaSinkManager
1211

src/models/raw.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from influxdb_client.client.write.point import Point
44
from typing import Optional
55

6-
RAW:str = "raw"
6+
RAW_MEASUREMENT:str = "raw"
77

88

99

@@ -38,10 +38,10 @@ class Raw(BaseModel):
3838

3939
def to_point(self) -> Point:
4040
"""Convert to InfluxDB Point for writing"""
41-
point = Point(RAW).time(self.timestamp)
41+
point = Point(RAW_MEASUREMENT).time(self.timestamp)
4242

4343
# Tags (indexed, used for filtering/grouping)
44-
tags = ["cell_index", "network"]
44+
tags = {"cell_index", "network"}
4545

4646
# Add tags and fields
4747
for key, value in self.model_dump(exclude={"timestamp"}).items():

src/routers/v1/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
from fastapi import APIRouter
22
from src.routers.v1.latency_router import router as latencyR
33
from src.routers.v1.raw_router import router as rawR
4+
from src.routers.v1.cells_router import router as cellR
45

56
v1_router = APIRouter()
67
v1_router.include_router(latencyR, prefix="/processed", tags=["v1", "latency"])
78
v1_router.include_router(rawR, prefix="/raw", tags=["v1", "latency"])
9+
v1_router.include_router(cellR, prefix="/cell", tags=["v1", "cell"])

src/routers/v1/cells_router.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
"""
2+
Endpoints for query data
3+
"""
4+
from fastapi import APIRouter, HTTPException
5+
from src.services.databases import Influx
6+
7+
router = APIRouter()
8+
9+
@router.get("/", response_model=list[int])
10+
def get_processed_latency():
11+
"""
12+
Return a list of know cell indexes
13+
"""
14+
try:
15+
results = Influx.service.get_known_cells()
16+
return results
17+
18+
except Exception as e:
19+
raise HTTPException(status_code=500, detail=f"Error querying processed latency: {str(e)}")

src/routers/v1/latency_router.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,11 @@
22
Endpoints for query data
33
"""
44
from fastapi import APIRouter, HTTPException, Query
5-
from src.configs.clickhouse_conf import ClickhouseConf
6-
from src.services.clickhouse import ClickHouseService
5+
from src.services.databases import ClickHouse
76
from src.models.processed_latency import ProcessedLatency
87

98
router = APIRouter()
109

11-
class ClickHouse():
12-
conf = ClickhouseConf()
13-
service = ClickHouseService()
14-
1510

1611
@router.get("/latency/example", response_model=list[ProcessedLatency])
1712
def get_latency_example():

src/routers/v1/raw_router.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,10 @@
11
from fastapi import APIRouter, HTTPException, Query
2-
from src.configs.influx_conf import InfluxConf
3-
from src.services.influx import InfluxService
42
from src.models.raw import RawResponse
53
from datetime import datetime
6-
4+
from src.services.databases import Influx
75

86
router = APIRouter()
97

10-
class Influx():
11-
conf = InfluxConf()
12-
service = InfluxService()
138

149

1510
@router.get("/", response_model=RawResponse)

src/services/clickhouse.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
from clickhouse_connect.driver.client import Client
44
from src.configs.clickhouse_conf import ClickhouseConf
55
from src.models.processed_latency import ProcessedLatency
6-
from src.services.db_service import DBService
76
from src.services.clickhouse_query import QueryCH
87

98

@@ -78,7 +77,7 @@ def transform_processor_output(data: dict) -> dict:
7877

7978
return transformed
8079

81-
class ClickHouseService(DBService):
80+
class ClickHouseService:
8281
def __init__(self) -> None:
8382
self.conf = ClickhouseConf()
8483
self.client: Client = None
@@ -92,9 +91,6 @@ def connect(self):
9291
password = self.conf.password,
9392
)
9493

95-
def get_data(self, batch_number: int = 1, batch_size: int = 50) -> list:
96-
return []
97-
9894
def write_data(self, data: dict) -> None:
9995
"""Write a single processed latency record to ClickHouse"""
10096
try:

src/services/databases.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from src.configs.clickhouse_conf import ClickhouseConf
2+
from src.services.clickhouse import ClickHouseService
3+
from src.configs.influx_conf import InfluxConf
4+
from src.services.influx import InfluxService
5+
6+
7+
class ClickHouse:
8+
conf = ClickhouseConf()
9+
service = ClickHouseService()
10+
11+
12+
class Influx:
13+
conf = InfluxConf()
14+
service = InfluxService()

src/services/db_service.py

Lines changed: 0 additions & 26 deletions
This file was deleted.

src/services/influx.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,10 @@
11
from influxdb_client.client.influxdb_client import InfluxDBClient
2-
from influxdb_client.client.write_api import ASYNCHRONOUS, WriteApi
3-
from influxdb_client.client.query_api import QueryApi
2+
from influxdb_client.client.write_api import ASYNCHRONOUS
43
from src.configs.influx_conf import InfluxConf
5-
from src.models.raw import Raw
6-
from src.services.db_service import DBService
4+
from src.models.raw import Raw, RAW_MEASUREMENT
75
from src.services.influx_query import QueryIF
86

9-
class InfluxService(DBService):
7+
class InfluxService:
108
def __init__(self) -> None:
119
self.conf= InfluxConf()
1210

@@ -22,10 +20,6 @@ def connect(self):
2220
self.write_api = self.client.write_api(write_options=ASYNCHRONOUS)
2321
self.query_api = self.client.query_api()
2422

25-
def get_data(self, batch_number:int = 1 ,batch_size:int = 50) -> list[Raw]:
26-
#TODO: add query
27-
return []
28-
2923
def query_raw_data(self, start_time: int, end_time: int, cell_index: int, batch_number: int):
3024
_LIMIT = 50
3125
offset = (batch_number-1) * _LIMIT
@@ -61,3 +55,23 @@ def write_batch(self, data_list:list[dict]) -> None :
6155
raw_list = [Raw(**d) for d in data_list]
6256
points = [r.to_point() for r in raw_list]
6357
self.write_api.write(bucket=self.conf.bucket, org = self.conf.org ,record = points)
58+
59+
def get_known_cells(self) -> list[int]:
60+
"""Returns a list of known cell indexes"""
61+
62+
query = QueryIF.get_known_cells.format(
63+
bucket=self.conf.bucket,
64+
measurement=RAW_MEASUREMENT
65+
)
66+
67+
tables = self.query_api.query(query)
68+
69+
cells = []
70+
for table in tables:
71+
for record in table.records:
72+
try:
73+
cells.append(int(record["_value"]))
74+
except (TypeError, ValueError):
75+
pass
76+
77+
return set(cells)

0 commit comments

Comments
 (0)