Skip to content

Commit 8eca129

Browse files
authored
Refactoring: Entities - Sinks (#4973)
1 parent 970f3e4 commit 8eca129

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1831
-1589
lines changed

application/backend/app/api/dependencies.py

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,19 @@
1111

1212
from app.core.jobs.control_plane import JobQueue
1313
from app.db import get_db_session
14+
from app.models import Sink
1415
from app.scheduler import Scheduler
1516
from app.schemas import ProjectView
1617
from app.services import (
17-
ConfigurationService,
1818
DatasetService,
1919
MetricsService,
2020
ModelService,
2121
PipelineMetricsService,
2222
PipelineService,
2323
ProjectService,
2424
ResourceNotFoundError,
25+
SinkService,
26+
SourceService,
2527
SystemService,
2628
)
2729
from app.services.base_weights_service import BaseWeightsService
@@ -146,12 +148,18 @@ def get_metrics_service(scheduler: Annotated[Scheduler, Depends(get_scheduler)])
146148
return MetricsService(scheduler.shm_metrics.name, scheduler.shm_metrics_lock)
147149

148150

149-
def get_configuration_service(
150-
event_bus: Annotated[EventBus, Depends(get_event_bus)],
151-
db: Annotated[Session, Depends(get_db)],
152-
) -> ConfigurationService:
153-
"""Provides a ConfigurationService instance."""
154-
return ConfigurationService(event_bus=event_bus, db_session=db)
151+
def get_sink_service(
152+
event_bus: Annotated[EventBus, Depends(get_event_bus)], db: Annotated[Session, Depends(get_db)]
153+
) -> SinkService:
154+
"""Provides a SinkService instance."""
155+
return SinkService(event_bus=event_bus, db_session=db)
156+
157+
158+
def get_source_service(
159+
event_bus: Annotated[EventBus, Depends(get_event_bus)], db: Annotated[Session, Depends(get_db)]
160+
) -> SourceService:
161+
"""Provides a SourceService instance."""
162+
return SourceService(event_bus=event_bus, db_session=db)
155163

156164

157165
def get_pipeline_service(
@@ -225,6 +233,17 @@ def get_project(
225233
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
226234

227235

236+
def get_sink(
237+
sink_id: Annotated[UUID, Depends(get_sink_id)],
238+
sink_service: Annotated[SinkService, Depends(get_sink_service)],
239+
) -> Sink:
240+
"""Provides a Sink instance for request scoped sink."""
241+
try:
242+
return sink_service.get_by_id(sink_id)
243+
except ResourceNotFoundError as e:
244+
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
245+
246+
228247
def get_base_weights_service(data_dir: Annotated[Path, Depends(get_data_dir)]) -> BaseWeightsService:
229248
"""Provides a BaseWeightsService instance for managing base weights."""
230249
return BaseWeightsService(data_dir)

application/backend/app/api/routers/sinks.py

Lines changed: 55 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
import logging
77
from typing import Annotated
8-
from uuid import UUID
98

109
import yaml
1110
from fastapi import APIRouter, Body, Depends, File, UploadFile, status
@@ -14,15 +13,15 @@
1413
from fastapi.responses import FileResponse, Response
1514
from pydantic import ValidationError
1615

17-
from app.api.dependencies import get_configuration_service, get_sink_id
18-
from app.schemas import Sink, SinkCreate
19-
from app.schemas.sink import SinkCreateAdapter
16+
from app.api.dependencies import get_sink, get_sink_service
17+
from app.api.schemas.sink import SinkCreate, SinkCreateAdapter, SinkView, SinkViewAdapter
18+
from app.models import Sink
2019
from app.services import (
21-
ConfigurationService,
2220
ResourceInUseError,
2321
ResourceNotFoundError,
2422
ResourceWithIdAlreadyExistsError,
2523
ResourceWithNameAlreadyExistsError,
24+
SinkService,
2625
)
2726

2827
logger = logging.getLogger(__name__)
@@ -83,7 +82,7 @@
8382
@router.post(
8483
"",
8584
status_code=status.HTTP_201_CREATED,
86-
response_model=Sink,
85+
response_model=SinkView,
8786
responses={
8887
status.HTTP_201_CREATED: {"description": "Sink created"},
8988
status.HTTP_400_BAD_REQUEST: {"description": "Invalid sink ID"},
@@ -94,72 +93,84 @@ def create_sink(
9493
sink_create: Annotated[
9594
SinkCreate, Body(description=CREATE_SINK_BODY_DESCRIPTION, openapi_examples=CREATE_SINK_BODY_EXAMPLES)
9695
],
97-
configuration_service: Annotated[ConfigurationService, Depends(get_configuration_service)],
98-
) -> Sink:
96+
sink_service: Annotated[SinkService, Depends(get_sink_service)],
97+
) -> SinkView:
9998
"""Create and configure a new sink"""
10099
try:
101-
return configuration_service.create_sink(sink_create)
100+
sink = sink_service.create_sink(
101+
name=sink_create.name,
102+
sink_type=sink_create.sink_type,
103+
rate_limit=sink_create.rate_limit,
104+
config_data=sink_create.config_data,
105+
output_formats=sink_create.output_formats,
106+
sink_id=sink_create.id,
107+
)
108+
return SinkViewAdapter.validate_python(sink, from_attributes=True)
102109
except (ResourceWithNameAlreadyExistsError, ResourceWithIdAlreadyExistsError) as e:
103110
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
104111

105112

106113
@router.get(
107114
"",
108115
responses={
109-
status.HTTP_200_OK: {"description": "List of available sink configurations", "model": list[Sink]},
116+
status.HTTP_200_OK: {"description": "List of available sink configurations", "model": list[SinkView]},
110117
},
111118
)
112119
def list_sinks(
113-
configuration_service: Annotated[ConfigurationService, Depends(get_configuration_service)],
114-
) -> list[Sink]:
120+
sink_service: Annotated[SinkService, Depends(get_sink_service)],
121+
) -> list[SinkView]:
115122
"""List the available sinks"""
116-
return configuration_service.list_sinks()
123+
sinks = sink_service.list_all()
124+
return [SinkViewAdapter.validate_python(sink, from_attributes=True) for sink in sinks]
117125

118126

119127
@router.get(
120128
"/{sink_id}",
121129
responses={
122-
status.HTTP_200_OK: {"description": "Sink found", "model": Sink},
130+
status.HTTP_200_OK: {"description": "Sink found", "model": SinkView},
123131
status.HTTP_400_BAD_REQUEST: {"description": "Invalid sink ID"},
124132
status.HTTP_404_NOT_FOUND: {"description": "Sink not found"},
125133
},
126134
)
127-
def get_sink(
128-
sink_id: Annotated[UUID, Depends(get_sink_id)],
129-
configuration_service: Annotated[ConfigurationService, Depends(get_configuration_service)],
130-
) -> Sink:
135+
def get_sink_view(sink: Annotated[Sink, Depends(get_sink)]) -> SinkView:
131136
"""Get info about a sink"""
132-
try:
133-
return configuration_service.get_sink_by_id(sink_id)
134-
except ResourceNotFoundError as e:
135-
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
137+
return SinkViewAdapter.validate_python(sink, from_attributes=True)
136138

137139

138140
@router.patch(
139141
"/{sink_id}",
140142
responses={
141-
status.HTTP_200_OK: {"description": "Sink successfully updated", "model": Sink},
143+
status.HTTP_200_OK: {"description": "Sink successfully updated", "model": SinkView},
142144
status.HTTP_400_BAD_REQUEST: {"description": "Invalid sink ID or request body"},
143145
status.HTTP_404_NOT_FOUND: {"description": "Sink not found"},
144146
status.HTTP_409_CONFLICT: {"description": "Sink already exists"},
145147
},
146148
)
147149
def update_sink(
148-
sink_id: Annotated[UUID, Depends(get_sink_id)],
150+
sink: Annotated[Sink, Depends(get_sink)],
149151
sink_config: Annotated[
150152
dict,
151153
Body(
152154
description=UPDATE_SINK_BODY_DESCRIPTION,
153155
openapi_examples=UPDATE_SINK_BODY_EXAMPLES,
154156
),
155157
],
156-
configuration_service: Annotated[ConfigurationService, Depends(get_configuration_service)],
157-
) -> Sink:
158+
sink_service: Annotated[SinkService, Depends(get_sink_service)],
159+
) -> SinkView:
158160
"""Reconfigure an existing sink"""
159161
if "sink_type" in sink_config:
160162
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="The 'sink_type' field cannot be changed")
161163
try:
162-
return configuration_service.update_sink(sink_id, sink_config)
164+
updated_sink: Sink = sink.model_copy(update=sink_config)
165+
updated_sink.config_data = updated_sink.config_data.model_copy(update=sink_config)
166+
sink = sink_service.update_sink(
167+
sink=sink,
168+
new_name=updated_sink.name,
169+
new_rate_limit=updated_sink.rate_limit,
170+
new_config_data=updated_sink.config_data,
171+
new_output_formats=updated_sink.output_formats,
172+
)
173+
return SinkViewAdapter.validate_python(sink, from_attributes=True)
163174
except ResourceNotFoundError as e:
164175
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
165176
except ResourceWithNameAlreadyExistsError as e:
@@ -180,47 +191,45 @@ def update_sink(
180191
status.HTTP_404_NOT_FOUND: {"description": "Sink not found"},
181192
},
182193
)
183-
def export_sink(
184-
sink_id: Annotated[UUID, Depends(get_sink_id)],
185-
configuration_service: Annotated[ConfigurationService, Depends(get_configuration_service)],
186-
) -> Response:
194+
def export_sink(sink: Annotated[Sink, Depends(get_sink)]) -> Response:
187195
"""Export a sink to file"""
188-
sink = configuration_service.get_sink_by_id(sink_id)
189-
if not sink:
190-
raise HTTPException(
191-
status_code=status.HTTP_404_NOT_FOUND,
192-
detail=f"Sink with ID {sink_id} not found",
193-
)
194-
195196
yaml_content = yaml.safe_dump(sink.model_dump(mode="json", exclude={"id"}))
196197

197198
return Response(
198199
content=yaml_content.encode("utf-8"),
199200
media_type="application/x-yaml",
200-
headers={"Content-Disposition": f"attachment; filename=sink_{sink_id}.yaml"},
201+
headers={"Content-Disposition": f"attachment; filename=sink_{sink.id}.yaml"},
201202
)
202203

203204

204205
@router.post(
205206
":import",
206207
status_code=status.HTTP_201_CREATED,
207208
responses={
208-
status.HTTP_201_CREATED: {"description": "Sink imported successfully", "model": Sink},
209+
status.HTTP_201_CREATED: {"description": "Sink imported successfully", "model": SinkView},
209210
status.HTTP_400_BAD_REQUEST: {"description": "Invalid YAML format"},
210211
status.HTTP_409_CONFLICT: {"description": "Sink already exists"},
211212
status.HTTP_422_UNPROCESSABLE_ENTITY: {"description": "Validation error(s)"},
212213
},
213214
)
214215
def import_sink(
215216
yaml_file: Annotated[UploadFile, File(description="YAML file containing the sink configuration")],
216-
configuration_service: Annotated[ConfigurationService, Depends(get_configuration_service)],
217-
) -> Sink:
217+
sink_service: Annotated[SinkService, Depends(get_sink_service)],
218+
) -> SinkView:
218219
"""Import a sink from file"""
219220
try:
220221
yaml_content = yaml_file.file.read()
221222
sink_data = yaml.safe_load(yaml_content)
222223
sink_create = SinkCreateAdapter.validate_python(sink_data)
223-
return configuration_service.create_sink(sink_create)
224+
sink = sink_service.create_sink(
225+
name=sink_create.name,
226+
sink_type=sink_create.sink_type,
227+
rate_limit=sink_create.rate_limit,
228+
config_data=sink_create.config_data,
229+
output_formats=sink_create.output_formats,
230+
sink_id=sink_create.id,
231+
)
232+
return SinkViewAdapter.validate_python(sink, from_attributes=True)
224233
except yaml.YAMLError as e:
225234
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid YAML format: {str(e)}")
226235
except (ResourceWithNameAlreadyExistsError, ResourceWithIdAlreadyExistsError) as e:
@@ -242,13 +251,11 @@ def import_sink(
242251
},
243252
)
244253
def delete_sink(
245-
sink_id: Annotated[UUID, Depends(get_sink_id)],
246-
configuration_service: Annotated[ConfigurationService, Depends(get_configuration_service)],
254+
sink: Annotated[Sink, Depends(get_sink)],
255+
sink_service: Annotated[SinkService, Depends(get_sink_service)],
247256
) -> None:
248257
"""Remove a sink"""
249258
try:
250-
configuration_service.delete_sink_by_id(sink_id)
251-
except ResourceNotFoundError as e:
252-
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
259+
sink_service.delete_sink(sink)
253260
except ResourceInUseError as e:
254261
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))

0 commit comments

Comments
 (0)