Skip to content

Commit 917cfe4

Browse files
authored
Removed passing of EventBus to a child process worker to fix serialization issue (#4979)
1 parent 54be039 commit 917cfe4

File tree

9 files changed

+129
-135
lines changed

9 files changed

+129
-135
lines changed

application/backend/app/api/dependencies.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
ProjectService,
2424
ResourceNotFoundError,
2525
SinkService,
26-
SourceService,
26+
SourceUpdateService,
2727
SystemService,
2828
)
2929
from app.services.base_weights_service import BaseWeightsService
@@ -155,11 +155,11 @@ def get_sink_service(
155155
return SinkService(event_bus=event_bus, db_session=db)
156156

157157

158-
def get_source_service(
158+
def get_source_update_service(
159159
event_bus: Annotated[EventBus, Depends(get_event_bus)], db: Annotated[Session, Depends(get_db)]
160-
) -> SourceService:
161-
"""Provides a SourceService instance."""
162-
return SourceService(event_bus=event_bus, db_session=db)
160+
) -> SourceUpdateService:
161+
"""Provides a SourceUpdateService instance."""
162+
return SourceUpdateService(event_bus=event_bus, db_session=db)
163163

164164

165165
def get_pipeline_service(

application/backend/app/api/routers/sources.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,15 @@
1414
from fastapi.responses import FileResponse, Response
1515
from pydantic import ValidationError
1616

17-
from app.api.dependencies import get_source_id, get_source_service
17+
from app.api.dependencies import get_source_id, get_source_update_service
1818
from app.schemas import Source, SourceCreate
1919
from app.schemas.source import SourceCreateAdapter
2020
from app.services import (
2121
ResourceInUseError,
2222
ResourceNotFoundError,
2323
ResourceWithIdAlreadyExistsError,
2424
ResourceWithNameAlreadyExistsError,
25-
SourceService,
25+
SourceUpdateService,
2626
)
2727

2828
logger = logging.getLogger(__name__)
@@ -109,11 +109,11 @@ def create_source(
109109
source_create: Annotated[
110110
SourceCreate, Body(description=CREATE_SOURCE_BODY_DESCRIPTION, openapi_examples=CREATE_SOURCE_BODY_EXAMPLES)
111111
],
112-
source_service: Annotated[SourceService, Depends(get_source_service)],
112+
source_update_service: Annotated[SourceUpdateService, Depends(get_source_update_service)],
113113
) -> Source:
114114
"""Create and configure a new source"""
115115
try:
116-
return source_service.create(source_create)
116+
return source_update_service.create(source_create)
117117
except (ResourceWithNameAlreadyExistsError, ResourceWithIdAlreadyExistsError) as e:
118118
raise HTTPException(status_code=status.HTTP_409_CONFLICT, detail=str(e))
119119

@@ -125,10 +125,10 @@ def create_source(
125125
},
126126
)
127127
def list_sources(
128-
source_service: Annotated[SourceService, Depends(get_source_service)],
128+
source_update_service: Annotated[SourceUpdateService, Depends(get_source_update_service)],
129129
) -> list[Source]:
130130
"""List the available sources"""
131-
return source_service.list_all()
131+
return source_update_service.list_all()
132132

133133

134134
@router.get(
@@ -141,11 +141,11 @@ def list_sources(
141141
)
142142
def get_source(
143143
source_id: Annotated[UUID, Depends(get_source_id)],
144-
source_service: Annotated[SourceService, Depends(get_source_service)],
144+
source_update_service: Annotated[SourceUpdateService, Depends(get_source_update_service)],
145145
) -> Source:
146146
"""Get info about a source"""
147147
try:
148-
return source_service.get_by_id(source_id)
148+
return source_update_service.get_by_id(source_id)
149149
except ResourceNotFoundError as e:
150150
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
151151

@@ -168,14 +168,14 @@ def update_source(
168168
openapi_examples=UPDATE_SOURCE_BODY_EXAMPLES,
169169
),
170170
],
171-
source_service: Annotated[SourceService, Depends(get_source_service)],
171+
source_update_service: Annotated[SourceUpdateService, Depends(get_source_update_service)],
172172
) -> Source:
173173
"""Reconfigure an existing source"""
174174
if "source_type" in source_config:
175175
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="The 'source_type' field cannot be changed")
176176
try:
177-
source = source_service.get_by_id(source_id)
178-
return source_service.update(source, source_config)
177+
source = source_update_service.get_by_id(source_id)
178+
return source_update_service.update(source, source_config)
179179
except ResourceNotFoundError as e:
180180
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
181181
except ResourceWithNameAlreadyExistsError as e:
@@ -198,10 +198,10 @@ def update_source(
198198
)
199199
def export_source(
200200
source_id: Annotated[UUID, Depends(get_source_id)],
201-
source_service: Annotated[SourceService, Depends(get_source_service)],
201+
source_update_service: Annotated[SourceUpdateService, Depends(get_source_update_service)],
202202
) -> Response:
203203
"""Export a source to file"""
204-
source = source_service.get_by_id(source_id)
204+
source = source_update_service.get_by_id(source_id)
205205
if not source:
206206
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Source with ID {source_id} not found")
207207

@@ -226,14 +226,14 @@ def export_source(
226226
)
227227
def import_source(
228228
yaml_file: Annotated[UploadFile, File(description="YAML file containing the source configuration")],
229-
source_service: Annotated[SourceService, Depends(get_source_service)],
229+
source_update_service: Annotated[SourceUpdateService, Depends(get_source_update_service)],
230230
) -> Source:
231231
"""Import a source from file"""
232232
try:
233233
yaml_content = yaml_file.file.read()
234234
source_data = yaml.safe_load(yaml_content)
235235
source_create = SourceCreateAdapter.validate_python(source_data)
236-
return source_service.create(source_create)
236+
return source_update_service.create(source_create)
237237
except yaml.YAMLError as e:
238238
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=f"Invalid YAML format: {str(e)}")
239239
except (ResourceWithNameAlreadyExistsError, ResourceWithIdAlreadyExistsError) as e:
@@ -256,11 +256,11 @@ def import_source(
256256
)
257257
def delete_source(
258258
source_id: Annotated[UUID, Depends(get_source_id)],
259-
source_service: Annotated[SourceService, Depends(get_source_service)],
259+
source_update_service: Annotated[SourceUpdateService, Depends(get_source_update_service)],
260260
) -> None:
261261
"""Remove a source"""
262262
try:
263-
source_service.delete_by_id(source_id)
263+
source_update_service.delete_by_id(source_id)
264264
except ResourceNotFoundError as e:
265265
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=str(e))
266266
except ResourceInUseError as e:

application/backend/app/scheduler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def start_workers(self) -> None:
5454

5555
# Create and start processes
5656
stream_loader_proc = StreamLoader(
57-
self._event_bus, self.frame_queue, self.mp_stop_event, self._event_bus.source_changed_condition
57+
self.frame_queue, self.mp_stop_event, self._event_bus.source_changed_condition
5858
)
5959

6060
inference_server_proc = InferenceWorker(

application/backend/app/services/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from .pipeline_service import PipelineService
1919
from .project_service import ProjectService
2020
from .sink_service import SinkService
21-
from .source_service import SourceService
21+
from .source_service import SourceService, SourceUpdateService
2222
from .system_service import SystemService
2323
from .video_stream_service import VideoStreamService
2424

@@ -39,6 +39,7 @@
3939
"ResourceWithNameAlreadyExistsError",
4040
"SinkService",
4141
"SourceService",
42+
"SourceUpdateService",
4243
"SystemService",
4344
"VideoStreamService",
4445
]

application/backend/app/services/source_service.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@
2121

2222

2323
class SourceService(GenericPersistenceService[Source, SourceRepository]):
24-
def __init__(self, event_bus: EventBus, db_session: Session):
25-
self._event_bus: EventBus = event_bus
24+
def __init__(self, db_session: Session):
2625
super().__init__(ServiceConfig(SourceRepository, SourceMapper, ResourceType.SOURCE), db_session)
2726

2827
def get_by_id(self, item_id: UUID) -> Source:
@@ -38,6 +37,21 @@ def create(self, item: Source) -> Source:
3837
except UniqueConstraintIntegrityError:
3938
raise ResourceWithNameAlreadyExistsError(ResourceType.SOURCE, item.name)
4039

40+
def get_active_source(self) -> Source | None:
41+
with self._get_repo() as repo:
42+
item_db = repo.get_active_source()
43+
return self.config.mapper_class.to_schema(item_db) if item_db else None
44+
45+
@parent_process_only
46+
def delete_by_id(self, item_id: UUID) -> None:
47+
super().delete_by_id(item_id)
48+
49+
50+
class SourceUpdateService(SourceService):
51+
def __init__(self, event_bus: EventBus, db_session: Session):
52+
self._event_bus: EventBus = event_bus
53+
super().__init__(db_session)
54+
4155
@parent_process_only
4256
def update(self, source: Source, partial_config: dict) -> Source:
4357
try:
@@ -48,12 +62,3 @@ def update(self, source: Source, partial_config: dict) -> Source:
4862
return updated
4963
except UniqueConstraintIntegrityError:
5064
raise ResourceWithNameAlreadyExistsError(ResourceType.SOURCE, partial_config["name"])
51-
52-
def get_active_source(self) -> Source | None:
53-
with self._get_repo() as repo:
54-
item_db = repo.get_active_source()
55-
return self.config.mapper_class.to_schema(item_db) if item_db else None
56-
57-
@parent_process_only
58-
def delete_by_id(self, item_id: UUID) -> None:
59-
super().delete_by_id(item_id)

application/backend/app/workers/stream_loading.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
from app.db import get_db_session
1212
from app.schemas import DisconnectedSourceConfig, Source, SourceType
1313
from app.services import SourceService, VideoStreamService
14-
from app.services.event.event_bus import EventBus
1514
from app.stream.stream_data import StreamData
1615
from app.stream.video_stream import VideoStream
1716
from app.workers.base import BaseProcessWorker
@@ -26,13 +25,11 @@ class StreamLoader(BaseProcessWorker):
2625

2726
def __init__(
2827
self,
29-
event_bus: EventBus,
3028
frame_queue: mp.Queue,
3129
stop_event: EventClass,
3230
source_changed_condition: Condition | None,
3331
) -> None:
3432
super().__init__(stop_event=stop_event, queues_to_cancel=[frame_queue])
35-
self._event_bus = event_bus
3633
self._frame_queue = frame_queue
3734
self._source_changed_condition = source_changed_condition
3835

@@ -41,7 +38,7 @@ def __init__(
4138

4239
def _load_source(self) -> None:
4340
with get_db_session() as db:
44-
source = SourceService(event_bus=self._event_bus, db_session=db).get_active_source()
41+
source = SourceService(db_session=db).get_active_source()
4542
self._source = source if source is not None else DisconnectedSourceConfig()
4643
logger.info(f"Active source set to {self._source}. Process: %s", mp.current_process().name)
4744
self._reset_stream()

0 commit comments

Comments
 (0)