Skip to content

Commit d21b60f

Browse files
warrkanjpggvilaca
andauthored
Refactoring: Event Bus & services decoupling (#4956)
Co-authored-by: Joao Vilaca <[email protected]>
1 parent 517f1d1 commit d21b60f

38 files changed

+1059
-1056
lines changed

application/backend/app/api/dependencies.py

Lines changed: 39 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,21 @@
1212
from app.core.jobs.control_plane import JobQueue
1313
from app.db import get_db_session
1414
from app.scheduler import Scheduler
15+
from app.schemas import ProjectView
1516
from app.services import (
16-
ActivePipelineService,
1717
ConfigurationService,
1818
DatasetService,
1919
MetricsService,
2020
ModelService,
21+
PipelineMetricsService,
2122
PipelineService,
2223
ProjectService,
24+
ResourceNotFoundError,
2325
SystemService,
2426
)
2527
from app.services.base_weights_service import BaseWeightsService
2628
from app.services.data_collect import DataCollector
29+
from app.services.event.event_bus import EventBus
2730
from app.services.label_service import LabelService
2831
from app.webrtc.manager import WebRTCManager
2932

@@ -128,9 +131,9 @@ def get_data_dir(request: Request) -> Path:
128131
return request.app.state.settings.data_dir
129132

130133

131-
def get_active_pipeline_service(request: Request) -> ActivePipelineService:
132-
"""Provides an ActivePipelineService instance for managing the active pipeline."""
133-
return request.app.state.active_pipeline_service
134+
def get_event_bus(request: Request) -> EventBus:
135+
"""Provides an EventBus instance."""
136+
return request.app.state.event_bus
134137

135138

136139
def get_data_collector(request: Request) -> DataCollector:
@@ -144,32 +147,29 @@ def get_metrics_service(scheduler: Annotated[Scheduler, Depends(get_scheduler)])
144147

145148

146149
def get_configuration_service(
147-
active_pipeline_service: Annotated[ActivePipelineService, Depends(get_active_pipeline_service)],
148-
scheduler: Annotated[Scheduler, Depends(get_scheduler)],
150+
event_bus: Annotated[EventBus, Depends(get_event_bus)],
149151
db: Annotated[Session, Depends(get_db)],
150152
) -> ConfigurationService:
151-
"""Provides a ConfigurationService instance with the active pipeline service and config changed condition."""
152-
return ConfigurationService(
153-
active_pipeline_service=active_pipeline_service,
154-
db_session=db,
155-
config_changed_condition=scheduler.mp_config_changed_condition,
156-
)
153+
"""Provides a ConfigurationService instance."""
154+
return ConfigurationService(event_bus=event_bus, db_session=db)
157155

158156

159157
def get_pipeline_service(
160-
active_pipeline_service: Annotated[ActivePipelineService, Depends(get_active_pipeline_service)],
161-
data_collector: Annotated[DataCollector, Depends(get_data_collector)],
162-
metrics_service: Annotated[MetricsService, Depends(get_metrics_service)],
163-
scheduler: Annotated[Scheduler, Depends(get_scheduler)],
158+
event_bus: Annotated[EventBus, Depends(get_event_bus)],
164159
db: Annotated[Session, Depends(get_db)],
165160
) -> PipelineService:
166-
"""Provides a PipelineService instance with the active pipeline service and config changed condition."""
167-
return PipelineService(
168-
active_pipeline_service=active_pipeline_service,
169-
data_collector=data_collector,
161+
"""Provides a PipelineService instance ."""
162+
return PipelineService(event_bus=event_bus, db_session=db)
163+
164+
165+
def get_pipeline_metrics_service(
166+
pipeline_service: Annotated[PipelineService, Depends(get_pipeline_service)],
167+
metrics_service: Annotated[MetricsService, Depends(get_metrics_service)],
168+
) -> PipelineMetricsService:
169+
"""Provides a PipelineMetricsService instance."""
170+
return PipelineMetricsService(
171+
pipeline_service=pipeline_service,
170172
metrics_service=metrics_service,
171-
config_changed_condition=scheduler.mp_config_changed_condition,
172-
db_session=db,
173173
)
174174

175175

@@ -198,22 +198,31 @@ def get_label_service(db: Annotated[Session, Depends(get_db)]) -> LabelService:
198198
def get_project_service(
199199
data_dir: Annotated[Path, Depends(get_data_dir)],
200200
db: Annotated[Session, Depends(get_db)],
201+
pipeline_service: Annotated[PipelineService, Depends(get_pipeline_service)],
201202
label_service: Annotated[LabelService, Depends(get_label_service)],
202203
) -> ProjectService:
203204
"""Provides a ProjectService instance for managing projects."""
204-
return ProjectService(data_dir=data_dir, db_session=db, label_service=label_service)
205+
return ProjectService(
206+
data_dir=data_dir, db_session=db, label_service=label_service, pipeline_service=pipeline_service
207+
)
205208

206209

207210
def get_dataset_service(
208-
data_dir: Annotated[Path, Depends(get_data_dir)],
209-
db: Annotated[Session, Depends(get_db)],
210-
project_service: Annotated[ProjectService, Depends(get_project_service)],
211-
label_service: Annotated[LabelService, Depends(get_label_service)],
211+
data_dir: Annotated[Path, Depends(get_data_dir)], db: Annotated[Session, Depends(get_db)]
212212
) -> DatasetService:
213213
"""Provides a DatasetService instance."""
214-
return DatasetService(
215-
data_dir=data_dir, db_session=db, project_service=project_service, label_service=label_service
216-
)
214+
return DatasetService(data_dir=data_dir, db_session=db)
215+
216+
217+
def get_project(
218+
project_id: Annotated[UUID, Depends(get_project_id)],
219+
project_service: Annotated[ProjectService, Depends(get_project_service)],
220+
) -> ProjectView:
221+
"""Provides a ProjectView instance for request scoped project."""
222+
try:
223+
return project_service.get_project_by_id(project_id)
224+
except ResourceNotFoundError as e:
225+
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
217226

218227

219228
def get_base_weights_service(data_dir: Annotated[Path, Depends(get_data_dir)]) -> BaseWeightsService:

application/backend/app/api/endpoints/datasets.py

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@
88
from fastapi.openapi.models import Example
99
from starlette.responses import FileResponse
1010

11-
from app.api.dependencies import get_dataset_item_id, get_dataset_service, get_file_name_and_extension, get_project_id
11+
from app.api.dependencies import get_dataset_item_id, get_dataset_service, get_file_name_and_extension, get_project
1212
from app.core.models import Pagination
13-
from app.schemas import DatasetItem, DatasetItemsWithPagination
13+
from app.schemas import DatasetItem, DatasetItemsWithPagination, ProjectView
1414
from app.schemas.dataset_item import (
1515
DatasetItemAnnotation,
1616
DatasetItemAnnotationsWithSource,
@@ -74,7 +74,7 @@
7474
},
7575
)
7676
def add_dataset_item(
77-
project_id: Annotated[UUID, Depends(get_project_id)],
77+
project: Annotated[ProjectView, Depends(get_project)],
7878
dataset_service: Annotated[DatasetService, Depends(get_dataset_service)],
7979
file_name_and_extension: Annotated[tuple[str, str], Depends(get_file_name_and_extension)],
8080
file: Annotated[UploadFile, File()],
@@ -83,7 +83,7 @@ def add_dataset_item(
8383
name, format = file_name_and_extension
8484
try:
8585
return dataset_service.create_dataset_item(
86-
project_id=project_id,
86+
project=project,
8787
data=file.file,
8888
name=name,
8989
format=format,
@@ -100,7 +100,7 @@ def add_dataset_item(
100100
},
101101
)
102102
def list_dataset_items(
103-
project_id: Annotated[UUID, Depends(get_project_id)],
103+
project: Annotated[ProjectView, Depends(get_project)],
104104
dataset_service: Annotated[DatasetService, Depends(get_dataset_service)],
105105
limit: Annotated[int, Query(ge=1, le=MAX_DATASET_ITEMS_NUMBER_RETURNED)] = DEFAULT_DATASET_ITEMS_NUMBER_RETURNED,
106106
offset: Annotated[int, Query(ge=0)] = 0,
@@ -112,9 +112,9 @@ def list_dataset_items(
112112
raise HTTPException(
113113
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, detail="Start date must be before end date."
114114
)
115-
total = dataset_service.count_dataset_items(project_id=project_id, start_date=start_date, end_date=end_date)
115+
total = dataset_service.count_dataset_items(project=project, start_date=start_date, end_date=end_date)
116116
dataset_items = dataset_service.list_dataset_items(
117-
project_id=project_id, limit=limit, offset=offset, start_date=start_date, end_date=end_date
117+
project=project, limit=limit, offset=offset, start_date=start_date, end_date=end_date
118118
)
119119
return DatasetItemsWithPagination(
120120
items=dataset_items,
@@ -136,13 +136,13 @@ def list_dataset_items(
136136
},
137137
)
138138
def get_dataset_item(
139-
project_id: Annotated[UUID, Depends(get_project_id)],
139+
project: Annotated[ProjectView, Depends(get_project)],
140140
dataset_item_id: Annotated[UUID, Depends(get_dataset_item_id)],
141141
dataset_service: Annotated[DatasetService, Depends(get_dataset_service)],
142142
) -> DatasetItem:
143143
"""Get information about a specific dataset item"""
144144
try:
145-
return dataset_service.get_dataset_item_by_id(project_id=project_id, dataset_item_id=dataset_item_id)
145+
return dataset_service.get_dataset_item_by_id(project=project, dataset_item_id=dataset_item_id)
146146
except ResourceNotFoundError as e:
147147
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
148148

@@ -156,14 +156,14 @@ def get_dataset_item(
156156
},
157157
)
158158
def get_dataset_item_binary(
159-
project_id: Annotated[UUID, Depends(get_project_id)],
159+
project: Annotated[ProjectView, Depends(get_project)],
160160
dataset_item_id: Annotated[UUID, Depends(get_dataset_item_id)],
161161
dataset_service: Annotated[DatasetService, Depends(get_dataset_service)],
162162
) -> FileResponse:
163163
"""Get dataset item binary content"""
164164
try:
165165
binary_path = dataset_service.get_dataset_item_binary_path_by_id(
166-
project_id=project_id, dataset_item_id=dataset_item_id
166+
project=project, dataset_item_id=dataset_item_id
167167
)
168168
return FileResponse(path=binary_path)
169169
except ResourceNotFoundError as e:
@@ -179,14 +179,14 @@ def get_dataset_item_binary(
179179
},
180180
)
181181
def get_dataset_item_thumbnail(
182-
project_id: Annotated[UUID, Depends(get_project_id)],
182+
project: Annotated[ProjectView, Depends(get_project)],
183183
dataset_item_id: Annotated[UUID, Depends(get_dataset_item_id)],
184184
dataset_service: Annotated[DatasetService, Depends(get_dataset_service)],
185185
) -> FileResponse:
186186
"""Get dataset item thumbnail binary content"""
187187
try:
188188
thumbnail_path = dataset_service.get_dataset_item_thumbnail_path_by_id(
189-
project_id=project_id, dataset_item_id=dataset_item_id
189+
project=project, dataset_item_id=dataset_item_id
190190
)
191191
return FileResponse(path=thumbnail_path)
192192
except ResourceNotFoundError as e:
@@ -203,13 +203,13 @@ def get_dataset_item_thumbnail(
203203
},
204204
)
205205
def delete_dataset_item(
206-
project_id: Annotated[UUID, Depends(get_project_id)],
206+
project: Annotated[ProjectView, Depends(get_project)],
207207
dataset_item_id: Annotated[UUID, Depends(get_dataset_item_id)],
208208
dataset_service: Annotated[DatasetService, Depends(get_dataset_service)],
209209
) -> None:
210210
"""Delete an item from the dataset"""
211211
try:
212-
dataset_service.delete_dataset_item(project_id=project_id, dataset_item_id=dataset_item_id)
212+
dataset_service.delete_dataset_item(project=project, dataset_item_id=dataset_item_id)
213213
except ResourceNotFoundError as e:
214214
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
215215

@@ -224,7 +224,7 @@ def delete_dataset_item(
224224
},
225225
)
226226
def set_dataset_item_annotations(
227-
project_id: Annotated[UUID, Depends(get_project_id)],
227+
project: Annotated[ProjectView, Depends(get_project)],
228228
dataset_item_id: Annotated[UUID, Depends(get_dataset_item_id)],
229229
dataset_item_annotations: Annotated[
230230
SetDatasetItemAnnotations, Body(openapi_examples=SET_DATASET_ITEM_ANNOTATIONS_BODY_EXAMPLES)
@@ -234,7 +234,7 @@ def set_dataset_item_annotations(
234234
"""Set dataset item annotations"""
235235
try:
236236
return dataset_service.set_dataset_item_annotations(
237-
project_id=project_id, dataset_item_id=dataset_item_id, annotations=dataset_item_annotations.annotations
237+
project=project, dataset_item_id=dataset_item_id, annotations=dataset_item_annotations.annotations
238238
)
239239
except ResourceNotFoundError as e:
240240
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
@@ -254,13 +254,13 @@ def set_dataset_item_annotations(
254254
},
255255
)
256256
def get_dataset_item_annotations(
257-
project_id: Annotated[UUID, Depends(get_project_id)],
257+
project: Annotated[ProjectView, Depends(get_project)],
258258
dataset_item_id: Annotated[UUID, Depends(get_dataset_item_id)],
259259
dataset_service: Annotated[DatasetService, Depends(get_dataset_service)],
260260
) -> DatasetItemAnnotationsWithSource:
261261
"""Get the dataset item annotations"""
262262
try:
263-
return dataset_service.get_dataset_item_annotations(project_id=project_id, dataset_item_id=dataset_item_id)
263+
return dataset_service.get_dataset_item_annotations(project=project, dataset_item_id=dataset_item_id)
264264
except (ResourceNotFoundError, NotAnnotatedError) as e:
265265
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
266266

@@ -275,13 +275,13 @@ def get_dataset_item_annotations(
275275
},
276276
)
277277
def delete_dataset_item_annotation(
278-
project_id: Annotated[UUID, Depends(get_project_id)],
278+
project: Annotated[ProjectView, Depends(get_project)],
279279
dataset_item_id: Annotated[UUID, Depends(get_dataset_item_id)],
280280
dataset_service: Annotated[DatasetService, Depends(get_dataset_service)],
281281
) -> None:
282282
"""Delete dataset item annotations"""
283283
try:
284-
dataset_service.delete_dataset_item_annotations(project_id=project_id, dataset_item_id=dataset_item_id)
284+
dataset_service.delete_dataset_item_annotations(project=project, dataset_item_id=dataset_item_id)
285285
except ResourceNotFoundError as e:
286286
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
287287

@@ -298,15 +298,15 @@ def delete_dataset_item_annotation(
298298
},
299299
)
300300
def assign_dataset_item_subset(
301-
project_id: Annotated[UUID, Depends(get_project_id)],
301+
project: Annotated[ProjectView, Depends(get_project)],
302302
dataset_item_id: Annotated[UUID, Depends(get_dataset_item_id)],
303303
dataset_service: Annotated[DatasetService, Depends(get_dataset_service)],
304304
subset_config: Annotated[DatasetItemAssignSubset, Body()],
305305
) -> DatasetItem:
306306
"""Assign dataset item subset"""
307307
try:
308308
return dataset_service.assign_dataset_item_subset(
309-
project_id=project_id, dataset_item_id=dataset_item_id, subset=subset_config.subset
309+
project=project, dataset_item_id=dataset_item_id, subset=subset_config.subset
310310
)
311311
except ResourceNotFoundError as e:
312312
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))

application/backend/app/api/endpoints/pipelines.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@
1212
from fastapi.openapi.models import Example
1313
from pydantic import ValidationError
1414

15-
from app.api.dependencies import get_pipeline_service, get_project_id
15+
from app.api.dependencies import get_pipeline_metrics_service, get_pipeline_service, get_project_id
1616
from app.schemas.metrics import PipelineMetrics
1717
from app.schemas.pipeline import DataCollectionPolicyAdapter, PipelineStatus, PipelineView
18-
from app.services import PipelineService, ResourceNotFoundError
18+
from app.services import PipelineMetricsService, PipelineService, ResourceNotFoundError
1919

2020
logger = logging.getLogger(__name__)
2121
router = APIRouter(prefix="/api/projects/{project_id}/pipeline", tags=["Pipelines"])
@@ -176,7 +176,7 @@ def disable_pipeline(
176176
)
177177
def get_project_metrics(
178178
project_id: Annotated[UUID, Depends(get_project_id)],
179-
pipeline_service: Annotated[PipelineService, Depends(get_pipeline_service)],
179+
pipeline_metrics_service: Annotated[PipelineMetricsService, Depends(get_pipeline_metrics_service)],
180180
time_window: int = 60,
181181
) -> PipelineMetrics:
182182
"""
@@ -191,7 +191,7 @@ def get_project_metrics(
191191
)
192192

193193
try:
194-
return pipeline_service.get_pipeline_metrics(project_id, time_window)
194+
return pipeline_metrics_service.get_pipeline_metrics(project_id, time_window)
195195
except ResourceNotFoundError as e:
196196
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
197197
except ValueError as e:

application/backend/app/db/schema.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ class ProjectDB(BaseID):
3434
task_type: Mapped[str] = mapped_column(String(50), nullable=False)
3535
exclusive_labels: Mapped[bool] = mapped_column(Boolean, default=False)
3636

37-
pipeline = relationship("PipelineDB", back_populates="project", uselist=False)
3837
model_revisions = relationship("ModelRevisionDB", back_populates="project")
3938

4039

@@ -48,7 +47,6 @@ class PipelineDB(Base):
4847
is_running: Mapped[bool] = mapped_column(Boolean, default=False)
4948
data_collection_policies: Mapped[list] = mapped_column(JSON, nullable=False, default=list)
5049

51-
project = relationship("ProjectDB", back_populates="pipeline")
5250
sink = relationship("SinkDB", uselist=False)
5351
source = relationship("SourceDB", uselist=False)
5452
model_revision = relationship("ModelRevisionDB", uselist=False)

0 commit comments

Comments
 (0)