Skip to content

Commit 0fe84d7

Browse files
authored
streamdiffusion: Report internal errors updating parameters (#796)
* live/pipelines: Reorder function definitions in iface Initialize first small change keep going * [POC] streamdiffusion: Report errors through logs * Simplify error log schema * Simplify logging schema even further * Clean-up some error messages * Improve _report_error output
1 parent 9fb8027 commit 0fe84d7

File tree

3 files changed

+45
-32
lines changed

3 files changed

+45
-32
lines changed

runner/app/live/pipelines/interface.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,19 @@ def __init__(self):
1818
"""Initialize pipeline with optional parameters."""
1919
pass
2020

21+
@abstractmethod
22+
async def initialize(self, **params):
23+
"""Initialize the pipeline with parameters and warm up the processing.
24+
25+
This method sets up the initial pipeline state and performs warmup operations.
26+
Must maintain valid state on success or restore previous state on failure.
27+
Starts the pipeline loops in comfystream.
28+
29+
Args:
30+
**params: Implementation-specific parameters
31+
"""
32+
pass
33+
2134
@abstractmethod
2235
async def put_video_frame(self, frame: VideoFrame, request_id: str):
2336
"""Put a frame into the pipeline.
@@ -36,19 +49,6 @@ async def get_processed_video_frame(self) -> VideoOutput:
3649
"""
3750
pass
3851

39-
@abstractmethod
40-
async def initialize(self, **params):
41-
"""Initialize the pipeline with parameters and warm up the processing.
42-
43-
This method sets up the initial pipeline state and performs warmup operations.
44-
Must maintain valid state on success or restore previous state on failure.
45-
Starts the pipeline loops in comfystream.
46-
47-
Args:
48-
**params: Implementation-specific parameters
49-
"""
50-
pass
51-
5252
@abstractmethod
5353
async def update_params(self, **params):
5454
"""Update pipeline parameters.

runner/app/live/pipelines/streamdiffusion.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,14 +106,18 @@ async def update_params(self, **params):
106106
if await self._update_params_dynamic(new_params):
107107
return
108108
except Exception as e:
109-
logging.error(f"Error updating parameters dynamically: {e}")
109+
logging.error(
110+
f"[update_params] Error updating params dynamically, reloading pipeline: {e}",
111+
extra={"report_error": True},
112+
exc_info=True,
113+
)
110114

111115
logging.info(f"Resetting pipeline for params change")
112116

113117
try:
114118
await self._overlay_renderer.prewarm(new_params.width, new_params.height)
115119
except Exception:
116-
logging.debug("Failed to prewarm loading overlay caches", exc_info=True)
120+
logging.warning("Failed to prewarm loading overlay caches", exc_info=True)
117121

118122
async with self._pipeline_lock:
119123
# Clear the pipeline while loading the new one. The loading overlay will be shown while this is happening.
@@ -124,14 +128,18 @@ async def update_params(self, **params):
124128
new_pipe: Optional[StreamDiffusionWrapper] = None
125129
try:
126130
new_pipe = await asyncio.to_thread(load_streamdiffusion_sync, new_params)
127-
except Exception:
128-
logging.error(f"Error resetting pipeline, reloading with previous params", exc_info=True)
131+
except Exception as e:
132+
logging.error(
133+
f"[update_params] Error reloading pipeline, falling back to previous params: {e}",
134+
extra={"report_error": True},
135+
exc_info=True,
136+
)
129137
try:
130138
new_params = prev_params or StreamDiffusionParams()
131139
new_pipe = await asyncio.to_thread(load_streamdiffusion_sync, new_params)
132-
except Exception:
133-
logging.exception("Failed to reload pipeline with fallback params", stack_info=True)
134-
raise
140+
except Exception as e:
141+
# No need to log here as we have to bubble up the error to the caller.
142+
raise RuntimeError(f"Failed to reload pipeline with previous params: {e}") from e
135143

136144
async with self._pipeline_lock:
137145
self.pipe = new_pipe

runner/app/live/streamer/process.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ def process_loop(self):
142142
try:
143143
asyncio.run(self._run_pipeline_loops())
144144
except Exception as e:
145-
self._report_error(f"Error in process run method: {e}")
145+
self._report_error("Error in process run method", e)
146146

147147

148148
def _handle_logging_params(self, params: dict) -> bool:
@@ -169,8 +169,7 @@ async def _initialize_pipeline(self):
169169
await pipeline.initialize(**params)
170170
return pipeline
171171
except Exception as e:
172-
self._report_error(f"Error loading pipeline: {e}")
173-
logging.exception(e)
172+
self._report_error("Error loading pipeline", e)
174173
if not params:
175174
# Already tried loading with default params
176175
raise
@@ -182,7 +181,7 @@ async def _initialize_pipeline(self):
182181
await pipeline.initialize()
183182
return pipeline
184183
except Exception as e:
185-
self._report_error(f"Error loading pipeline with default params: {e}")
184+
self._report_error("Error loading pipeline with default params", e)
186185
raise
187186

188187
async def _run_pipeline_loops(self):
@@ -201,7 +200,7 @@ async def wait_for_stop():
201200
try:
202201
await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
203202
except Exception as e:
204-
self._report_error(f"Error in pipeline loops: {e}")
203+
self._report_error("Error in pipeline loops", e)
205204
finally:
206205
for task in tasks:
207206
task.cancel()
@@ -223,8 +222,7 @@ async def _input_loop(self, pipeline: Pipeline):
223222
# Timeout ensures the non-daemon threads from to_thread can exit if task is cancelled
224223
continue
225224
except Exception as e:
226-
self._report_error(f"Error processing input frame: {e}")
227-
logging.exception(e)
225+
self._report_error("Error processing input frame", e)
228226

229227
async def _output_loop(self, pipeline: Pipeline):
230228
while not self.is_done():
@@ -235,7 +233,7 @@ async def _output_loop(self, pipeline: Pipeline):
235233
output.log_timestamps["post_process_frame"] = time.time()
236234
self._try_queue_put(self.output_queue, output)
237235
except Exception as e:
238-
self._report_error(f"Error processing output frame: {e}")
236+
self._report_error("Error processing output frame", e)
239237

240238
async def _param_update_loop(self, pipeline: Pipeline):
241239
while not self.is_done():
@@ -250,7 +248,7 @@ async def _param_update_loop(self, pipeline: Pipeline):
250248
with log_timing(f"PipelineProcess: Pipeline update parameters with params_hash={params_hash}"):
251249
await pipeline.update_params(**params)
252250
except Exception as e:
253-
self._report_error(f"Error updating params: {e}")
251+
self._report_error("Error updating params", e)
254252

255253
async def _get_latest_params(self, timeout: float) -> dict | None:
256254
"""
@@ -278,12 +276,14 @@ async def _get_latest_params(self, timeout: float) -> dict | None:
278276

279277
return params
280278

281-
def _report_error(self, error_msg: str):
279+
def _report_error(self, msg: str, error: Exception | None = None, silent = False):
280+
if not silent:
281+
logging.error(msg, exc_info=error)
282+
282283
error_event = {
283-
"message": error_msg,
284+
"message": f"{msg}: {error}" if error else msg,
284285
"timestamp": time.time()
285286
}
286-
logging.error(error_msg)
287287
self._try_queue_put(self.error_queue, error_event)
288288

289289
async def _cleanup_pipeline(self, pipeline):
@@ -356,6 +356,11 @@ def __init__(self, process: PipelineProcess):
356356
def emit(self, record):
357357
msg = self.format(record)
358358
self.process._try_queue_put(self.process.log_queue, msg)
359+
try:
360+
if getattr(record, "report_error", False):
361+
self.process._report_error(record.getMessage(), silent=True)
362+
except Exception as e:
363+
logging.error(f"Error reporting error: {e}")
359364

360365
# Function to clear the queue
361366
def clear_queue(queue):

0 commit comments

Comments
 (0)