Skip to content

Commit a96e8f5

Browse files
authored
fix(recording): prevent ghost recordings from permanently blocking new recordings (#511)
Recording 68 for Dhalucard became a ghost recording on March 2 after a PostgreSQL recovery mode outage prevented status updates. This ghost entry blocked all new recordings for 12+ days with DUPLICATE_BLOCK errors. Root causes fixed: - _handle_recording_completion: 'file not found' path never called remove_active_recording, leaving ghost entries in state manager - _handle_recording_error: DB failures in mark_recording_failed caused the entire method to bail before remove_active_recording - stop_recording: EventSub handler's 5s timeout caused CancelledError before remove_active_recording was reached Changes: - Move remove_active_recording to finally blocks in _handle_recording_completion, _handle_recording_error, and stop_recording so cleanup always runs - Add stale recording detection in start_recording: if an 'active' recording has no running process, clean it up instead of blocking - Trigger post-processing for stale recordings that have files on disk - Move post-processing trigger to finally block in stop_recording so it survives handler timeouts
1 parent 6c24c0e commit a96e8f5

File tree

1 file changed

+134
-69
lines changed

1 file changed

+134
-69
lines changed

app/services/recording/recording_lifecycle_manager.py

Lines changed: 134 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -130,11 +130,53 @@ async def start_recording(
130130
None,
131131
)
132132
if existing_recording:
133-
logger.warning(
134-
f"🎬 DUPLICATE_BLOCK: Cannot start recording for streamer {streamer_id} - "
135-
f"already has active recording {existing_recording}"
133+
# STALE RECORDING RECOVERY: Check if the "active" recording actually has a running process.
134+
# If not, it's a ghost recording (e.g., from a DB outage or handler timeout) - clean it up.
135+
is_process_running = await self._check_recording_process(
136+
existing_recording
136137
)
137-
return None
138+
if not is_process_running:
139+
stale_data = self.state_manager.get_active_recording(
140+
existing_recording
141+
)
142+
logger.warning(
143+
f"🧹 STALE_RECORDING_CLEANUP: Recording {existing_recording} for streamer {streamer_id} "
144+
f"has no running process. Cleaning up ghost recording to unblock new recording."
145+
)
146+
self.state_manager.remove_active_recording(existing_recording)
147+
try:
148+
await self.database_service.mark_recording_failed(
149+
existing_recording,
150+
"Stale recording cleaned up: no running process found",
151+
)
152+
except Exception as cleanup_err:
153+
logger.error(
154+
f"Failed to mark stale recording {existing_recording} as failed: {cleanup_err}"
155+
)
156+
app_cache.delete("active_recordings")
157+
158+
# Trigger post-processing for the stale recording if a file exists on disk
159+
stale_file = stale_data.get("file_path") if stale_data else None
160+
if stale_file and Path(stale_file).exists():
161+
logger.info(
162+
f"🔄 STALE_POST_PROCESSING: Triggering post-processing for stale recording "
163+
f"{existing_recording} (file exists: {stale_file})"
164+
)
165+
try:
166+
await self._schedule_post_processing_once(
167+
existing_recording
168+
)
169+
except Exception as pp_err:
170+
logger.error(
171+
f"Failed to trigger post-processing for stale recording {existing_recording}: {pp_err}"
172+
)
173+
# Continue to start the new recording
174+
else:
175+
logger.warning(
176+
f"🎬 DUPLICATE_BLOCK: Cannot start recording for streamer {streamer_id} - "
177+
f"already has active recording {existing_recording}"
178+
)
179+
return None
138180

139181
# Generate file path
140182
file_path = await self._generate_recording_path(streamer_id, stream_id)
@@ -208,6 +250,8 @@ async def start_recording(
208250

209251
async def stop_recording(self, recording_id: int, reason: str = "manual") -> bool:
210252
"""Stop an active recording"""
253+
success = False
254+
recording_data = None
211255
try:
212256
recording_data = self.state_manager.get_active_recording(recording_id)
213257
if not recording_data:
@@ -260,9 +304,15 @@ async def stop_recording(self, recording_id: int, reason: str = "manual") -> boo
260304
logger.debug(f"Could not log recording stop: {log_error}")
261305

262306
# Update status regardless of process stop success
263-
await self.database_service.update_recording_status(
264-
recording_id=recording_id, status="stopped" if success else "failed"
265-
)
307+
try:
308+
await self.database_service.update_recording_status(
309+
recording_id=recording_id,
310+
status="stopped" if success else "failed",
311+
)
312+
except Exception as db_err:
313+
logger.error(
314+
f"Failed to update recording {recording_id} status in DB: {db_err}"
315+
)
266316

267317
# Trigger database event for orphaned recovery
268318
try:
@@ -278,34 +328,44 @@ async def stop_recording(self, recording_id: int, reason: str = "manual") -> boo
278328
f"Could not trigger database event for orphaned recovery: {e}"
279329
)
280330

281-
# Remove from active recordings
282-
self.state_manager.remove_active_recording(recording_id)
283-
284-
# Invalidate active recordings cache
285-
app_cache.delete("active_recordings")
286-
287331
# Send WebSocket notification
288332
if self.websocket_service:
289-
await self.websocket_service.send_recording_status_update(
290-
recording_id=recording_id,
291-
status="stopped" if success else "failed",
292-
additional_data={"reason": reason},
293-
)
294-
295-
# Trigger post-processing for automatic stops (stream ended)
296-
# Even if process termination failed, streamlink may have finished correctly
297-
if reason == "automatic":
298-
logger.info(
299-
f"🎬 TRIGGERING_POST_PROCESSING: recording_id={recording_id}, success={success}"
300-
)
301-
# Schedule post-processing once (idempotent)
302-
await self._schedule_post_processing_once(recording_id)
333+
try:
334+
await self.websocket_service.send_recording_status_update(
335+
recording_id=recording_id,
336+
status="stopped" if success else "failed",
337+
additional_data={"reason": reason},
338+
)
339+
except Exception:
340+
pass
303341

304342
return success
305343

306344
except Exception as e:
307345
logger.error(f"Failed to stop recording {recording_id}: {e}")
308346
return False
347+
finally:
348+
# CRITICAL: Always remove from active recordings when we had recording_data,
349+
# even if the stop process timed out or DB operations failed.
350+
# Prevents ghost recordings that permanently block future recordings.
351+
if recording_data is not None:
352+
self.state_manager.remove_active_recording(recording_id)
353+
app_cache.delete("active_recordings")
354+
355+
# Trigger post-processing for automatic stops (stream ended).
356+
# This MUST be in the finally block so it runs even when the EventSub
357+
# handler's 5s timeout cancels this coroutine via CancelledError.
358+
# _schedule_post_processing_once is idempotent and uses create_task (fire-and-forget).
359+
if reason == "automatic":
360+
try:
361+
logger.info(
362+
f"🎬 TRIGGERING_POST_PROCESSING: recording_id={recording_id}"
363+
)
364+
await self._schedule_post_processing_once(recording_id)
365+
except Exception as pp_err:
366+
logger.error(
367+
f"Failed to trigger post-processing for recording {recording_id}: {pp_err}"
368+
)
309369

310370
async def force_start_recording(self, streamer_id: int) -> Optional[int]:
311371
"""Force start recording for a live streamer"""
@@ -611,9 +671,14 @@ async def _handle_recording_completion(self, recording_id: int) -> None:
611671
file_path = recording_data.get("file_path")
612672
if not file_path or not Path(file_path).exists():
613673
logger.error(f"Recording file not found: {file_path}")
614-
await self.database_service.mark_recording_failed(
615-
recording_id, "Recording file not found"
616-
)
674+
try:
675+
await self.database_service.mark_recording_failed(
676+
recording_id, "Recording file not found"
677+
)
678+
except Exception as db_err:
679+
logger.error(
680+
f"Failed to mark recording {recording_id} as failed in DB: {db_err}"
681+
)
617682
return
618683

619684
# Update database status
@@ -631,23 +696,33 @@ async def _handle_recording_completion(self, recording_id: int) -> None:
631696
additional_data={"stream_id": recording_data.get("stream_id")},
632697
)
633698

634-
# Remove from active recordings
635-
self.state_manager.remove_active_recording(recording_id)
699+
logger.info(f"Recording {recording_id} completed successfully")
700+
701+
# Safety net: if offline event path didn't schedule post-processing, do it here.
702+
# This avoids scenarios where EventSub misses the offline event.
703+
try:
704+
await self._schedule_post_processing_once(recording_id)
705+
except Exception as sched_err:
706+
logger.error(
707+
f"Failed to schedule post-processing from completion handler for recording {recording_id}: {sched_err}"
708+
)
636709

637-
# Complete external task in background queue - CRITICAL FIX for stuck recordings
710+
except Exception as e:
711+
logger.error(f"Error handling recording completion {recording_id}: {e}")
712+
finally:
713+
# CRITICAL: Always remove from active recordings and clean up external tasks,
714+
# even if DB operations fail. Prevents ghost recordings that block future recordings.
715+
self.state_manager.remove_active_recording(recording_id)
638716
try:
639717
if background_queue_service:
640718
task_id = f"recording_{recording_id}"
641719
background_queue_service.complete_external_task(
642720
task_id, success=True
643721
)
644-
645-
# Also remove from external tasks to prevent UI showing as stuck
646722
if hasattr(background_queue_service, "progress_tracker"):
647723
background_queue_service.progress_tracker.remove_external_task(
648724
task_id
649725
)
650-
651726
logger.info(
652727
f"✅ EXTERNAL_TASK_COMPLETED: recording_{recording_id} marked as completed and removed"
653728
)
@@ -656,20 +731,6 @@ async def _handle_recording_completion(self, recording_id: int) -> None:
656731
f"❌ EXTERNAL_TASK_ERROR: Failed to complete external task for recording {recording_id}: {e}"
657732
)
658733

659-
logger.info(f"Recording {recording_id} completed successfully")
660-
661-
# Safety net: if offline event path didn't schedule post-processing, do it here.
662-
# This avoids scenarios where EventSub misses the offline event.
663-
try:
664-
await self._schedule_post_processing_once(recording_id)
665-
except Exception as sched_err:
666-
logger.error(
667-
f"Failed to schedule post-processing from completion handler for recording {recording_id}: {sched_err}"
668-
)
669-
670-
except Exception as e:
671-
logger.error(f"Error handling recording completion {recording_id}: {e}")
672-
673734
async def _handle_recording_error(
674735
self, recording_id: int, error_message: str
675736
) -> None:
@@ -678,24 +739,40 @@ async def _handle_recording_error(
678739
logger.error(f"Recording {recording_id} error: {error_message}")
679740

680741
# Update database
681-
await self.database_service.mark_recording_failed(
682-
recording_id, error_message
683-
)
742+
try:
743+
await self.database_service.mark_recording_failed(
744+
recording_id, error_message
745+
)
746+
except Exception as db_err:
747+
logger.error(
748+
f"Failed to mark recording {recording_id} as failed in DB: {db_err}"
749+
)
684750

685-
# Mark external task as failed in background queue - CRITICAL FIX for stuck recordings
751+
# Send error notification
752+
if self.websocket_service:
753+
try:
754+
await self.websocket_service.send_recording_error(
755+
recording_id=recording_id, error_message=error_message
756+
)
757+
except Exception:
758+
pass
759+
760+
except Exception as e:
761+
logger.error(f"Error handling recording error {recording_id}: {e}")
762+
finally:
763+
# CRITICAL: Always remove from active recordings and clean up external tasks,
764+
# even if DB operations fail. Prevents ghost recordings that block future recordings.
765+
self.state_manager.remove_active_recording(recording_id)
686766
try:
687767
if background_queue_service:
688768
task_id = f"recording_{recording_id}"
689769
background_queue_service.complete_external_task(
690770
task_id, success=False
691771
)
692-
693-
# Also remove from external tasks to prevent UI showing as stuck
694772
if hasattr(background_queue_service, "progress_tracker"):
695773
background_queue_service.progress_tracker.remove_external_task(
696774
task_id
697775
)
698-
699776
logger.info(
700777
f"❌ EXTERNAL_TASK_FAILED: recording_{recording_id} marked as failed and removed"
701778
)
@@ -704,18 +781,6 @@ async def _handle_recording_error(
704781
f"❌ EXTERNAL_TASK_ERROR: Failed to mark external task as failed for recording {recording_id}: {e}"
705782
)
706783

707-
# Send error notification
708-
if self.websocket_service:
709-
await self.websocket_service.send_recording_error(
710-
recording_id=recording_id, error_message=error_message
711-
)
712-
713-
# Remove from active recordings
714-
self.state_manager.remove_active_recording(recording_id)
715-
716-
except Exception as e:
717-
logger.error(f"Error handling recording error {recording_id}: {e}")
718-
719784
async def _generate_recording_path(self, streamer_id: int, stream_id: int) -> str:
720785
"""Generate file path for recording"""
721786
try:

0 commit comments

Comments
 (0)