Skip to content

Commit f78e577

Browse files
authored
Merge pull request #28 from ATNoG/feature/query_raw_data
Feature/query raw data
2 parents f9680c7 + dcdc561 commit f78e577

File tree

10 files changed

+97
-9
lines changed

10 files changed

+97
-9
lines changed

main.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
from fastapi import FastAPI
66

77
from src.routers.v1 import v1_router
8-
from src.routers.v1.latency import ClickHouse
8+
from src.routers.v1.latency_router import ClickHouse
9+
from src.routers.v1.raw_router import Influx
10+
911
from src.sink import KafkaSinkManager
1012

1113
KAFKA_HOST = os.getenv("KAFKA_HOST", "localhost")
@@ -15,6 +17,8 @@
1517

1618
@asynccontextmanager
1719
async def lifespan(app: FastAPI):
20+
21+
Influx.service.connect()
1822
ClickHouse.service.connect()
1923

2024
sink_manager = KafkaSinkManager(KAFKA_HOST, KAFKA_PORT)

src/models/raw.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
RAW:str = "raw"
77

8+
9+
810
class Raw(BaseModel):
911
# Temporal
1012
timestamp: datetime = Field(
@@ -49,3 +51,7 @@ def to_point(self) -> Point:
4951
else:
5052
point.field(key, value)
5153
return point
54+
55+
class RawResponse(BaseModel):
56+
data: list[Raw]
57+
has_next: bool

src/routers/v1/__init__.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from fastapi import APIRouter
2-
from src.routers.v1 import latency
2+
from src.routers.v1.latency_router import router as latencyR
3+
from src.routers.v1.raw_router import router as rawR
34

45
v1_router = APIRouter()
5-
6-
v1_router.include_router(latency.router, prefix="/processed", tags=["v1", "latency"])
6+
v1_router.include_router(latencyR, prefix="/processed", tags=["v1", "latency"])
7+
v1_router.include_router(rawR, prefix="/raw", tags=["v1", "latency"])

src/routers/v1/raw_router.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
from fastapi import APIRouter, HTTPException, Query
2+
from src.configs.influx_conf import InfluxConf
3+
from src.services.influx import InfluxService
4+
from src.models.raw import RawResponse
5+
from datetime import datetime
6+
7+
8+
router = APIRouter()
9+
10+
class Influx():
11+
conf = InfluxConf()
12+
service = InfluxService()
13+
14+
15+
@router.get("/", response_model=RawResponse)
16+
def get_raw_data(
17+
start_time: int = Query(..., description="Start time (Unix timestamp in seconds)"),
18+
end_time: int = Query(..., description="End time (Unix timestamp in seconds)"),
19+
cell_index: int = Query(..., description="Cell index (required)"),
20+
21+
batch_number: int = Query(1, ge=1, description="Batch number"),
22+
):
23+
"""
24+
Query raw data entries between a given start and end time for a specific cell.
25+
26+
This endpoint returns raw entries directly from the database,
27+
limited to a maximum of 50 per request.
28+
"""
29+
try:
30+
query_params = {
31+
"start_time": start_time,
32+
"end_time": end_time,
33+
"cell_index": cell_index,
34+
"batch_number": batch_number,
35+
}
36+
37+
results,has_next = Influx.service.query_raw_data(**query_params)
38+
39+
for row in results:
40+
for k, v in row.items():
41+
if isinstance(v, datetime):
42+
row[k] = v.isoformat()
43+
44+
return {"data": results, "has_next": has_next}
45+
46+
except Exception as e:
47+
raise HTTPException(status_code=500, detail=f"Error querying raw data: {str(e)}")

src/services/clickhouse.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from src.configs.clickhouse_conf import ClickhouseConf
55
from src.models.processed_latency import ProcessedLatency
66
from src.services.db_service import DBService
7-
from src.services.query import QueryCH
7+
from src.services.clickhouse_query import QueryCH
88

99

1010
def transform_processor_output(data: dict) -> dict:
@@ -83,6 +83,7 @@ def __init__(self) -> None:
8383
self.conf = ClickhouseConf()
8484
self.client: Client = None
8585

86+
8687
def connect(self):
8788
self.client = clickhouse_connect.get_client(
8889
host = self.conf.host,

src/services/influx.py

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,11 @@
44
from src.configs.influx_conf import InfluxConf
55
from src.models.raw import Raw
66
from src.services.db_service import DBService
7+
from src.services.influx_query import QueryIF
78

89
class InfluxService(DBService):
910
def __init__(self) -> None:
1011
self.conf= InfluxConf()
11-
self.client: InfluxDBClient
12-
self.write_api: WriteApi
13-
self.query_api: QueryApi
1412

1513
def connect(self):
1614
"""Init client"""
@@ -28,6 +26,30 @@ def get_data(self, batch_number:int = 1 ,batch_size:int = 50) -> list[Raw]:
2826
#TODO: add query
2927
return []
3028

29+
def query_raw_data(self, start_time: int, end_time: int, cell_index: int, batch_number: int):
30+
_LIMIT = 50
31+
offset = (batch_number-1) * _LIMIT
32+
33+
query = QueryIF.between.format(
34+
bucket=self.conf.bucket,
35+
start_time=start_time,
36+
end_time=end_time,
37+
cell_index=cell_index,
38+
limit=_LIMIT+1,
39+
offset=offset
40+
)
41+
42+
result = self.query_api.query(query)
43+
44+
# Convert FluxTable to list of dicts
45+
rows = []
46+
for table in result:
47+
for record in table.records:
48+
rows.append(record.values)
49+
50+
return rows, len(rows)>_LIMIT
51+
52+
3153
def write_data(self, data: dict) -> None :
3254
"""Write a single record"""
3355
raw = Raw(**data)

src/services/influx_query.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
class QueryIF:
2+
between = """
3+
from(bucket: "{bucket}")
4+
|> range(start: {start_time}, stop: {end_time})
5+
|> filter(fn: (r) => r["cell_index"] == "{cell_index}")
6+
|> limit(n: {limit}, offset: {offset})
7+
"""

tests/test_latency_endpoint.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
@pytest.fixture
1414
def mock_clickhouse_service():
1515
"""Mock ClickHouseService for testing."""
16-
with patch('src.routers.v1.latency.ClickHouse') as mock:
16+
with patch('src.routers.v1.latency_router.ClickHouse') as mock:
1717
service_mock = MagicMock()
1818
mock.service = service_mock
1919
yield service_mock

0 commit comments

Comments
 (0)