|
202 | 202 | _PIPELINE_HOOK_PROCESS_ELEMENT_POST = PIPELINE_HOOK_PROCESS_ELEMENT_POST+"0" |
203 | 203 | PIPELINE_HOOK_PROCESS_FRAME = "pipeline.process_frame:" |
204 | 204 | _PIPELINE_HOOK_PROCESS_FRAME = PIPELINE_HOOK_PROCESS_FRAME+"0" |
| 205 | +PIPELINE_HOOK_PROCESS_FRAME_COMPLETE = "pipeline.process_frame_complete:" |
| 206 | +_PIPELINE_HOOK_PROCESS_FRAME_COMPLETE = PIPELINE_HOOK_PROCESS_FRAME_COMPLETE+"0" |
| 207 | +PIPELINE_HOOK_DESTROY_STREAM = "pipeline.destroy_stream:" |
| 208 | +_PIPELINE_HOOK_DESTROY_STREAM = PIPELINE_HOOK_DESTROY_STREAM+"0" |
205 | 209 |
|
206 | 210 | _GRACE_TIME = 60 # seconds |
207 | 211 | _LOGGER = aiko.logger(__name__) |
@@ -697,6 +701,8 @@ def __init__(self, context): |
697 | 701 | self.add_hook(_PIPELINE_HOOK_PROCESS_ELEMENT) |
698 | 702 | self.add_hook(_PIPELINE_HOOK_PROCESS_ELEMENT_POST) |
699 | 703 | self.add_hook(_PIPELINE_HOOK_PROCESS_FRAME) |
| 704 | + self.add_hook(_PIPELINE_HOOK_PROCESS_FRAME_COMPLETE) |
| 705 | + self.add_hook(_PIPELINE_HOOK_DESTROY_STREAM) |
700 | 706 |
|
701 | 707 | self.pipeline_graph = self._create_pipeline_graph(context.definition) |
702 | 708 | self.share["element_count"] = self.pipeline_graph.element_count |
@@ -1094,13 +1100,15 @@ def destroy_stream(self, stream_id, |
1094 | 1100 | stream.state = StreamState.ERROR |
1095 | 1101 |
|
1096 | 1102 | # Notify listeners that the stream has stopped |
1097 | | - stop_state = stream.state |
1098 | | - if stop_state >= StreamState.RUN: |
1099 | | - stop_state = StreamState.STOP |
| 1103 | + if stream.state >= StreamState.RUN: |
| 1104 | + stream.state = StreamState.STOP |
| 1105 | + self.run_hook(_PIPELINE_HOOK_DESTROY_STREAM, |
| 1106 | + lambda: {"stream": stream, |
| 1107 | + "diagnostic": diagnostic}) |
1100 | 1108 | stream_info = { |
1101 | 1109 | "stream_id": stream.stream_id, |
1102 | 1110 | "frame_id": stream.frame_id, |
1103 | | - "state": stop_state} |
| 1111 | + "state": stream.state} |
1104 | 1112 | if stream.queue_response: |
1105 | 1113 | stream.queue_response.put((stream_info, diagnostic)) |
1106 | 1114 | if stream.topic_response: |
@@ -1393,6 +1401,10 @@ def _process_frame_common(self, stream_dict, frame_data_in, new_frame) \ |
1393 | 1401 | "stream_id": stream.stream_id, |
1394 | 1402 | "frame_id": stream.frame_id, |
1395 | 1403 | "state": stream.state} |
| 1404 | + self.run_hook(_PIPELINE_HOOK_PROCESS_FRAME_COMPLETE, |
| 1405 | + lambda: { |
| 1406 | + "stream": stream, |
| 1407 | + "frame_data_out": frame_data_out}) |
1396 | 1408 | if stream.queue_response: |
1397 | 1409 | stream.queue_response.put((stream_info, frame_data_out)) |
1398 | 1410 | elif stream.topic_response: |
|
0 commit comments