|
1 | 1 | import logging |
2 | | - |
3 | 2 | from dataclasses import Field, dataclass, fields, is_dataclass |
4 | 3 | from typing import ( |
5 | 4 | Any, |
@@ -170,13 +169,72 @@ def __init__(self, visible=True): |
170 | 169 | self.visible = visible |
171 | 170 | self.app = Router() |
172 | 171 | self.url = "" # will be replaced during webserver init |
173 | | - |
174 | | - def cancel_dependents(self): |
| 172 | + self.cache = {"skip": {}, "deps": {}} |
| 173 | + self.listeners = {"startup": set(), "shutdown": set(), "pipeline_update": set()} |
| 174 | + self.lastPipeline = None |
| 175 | + |
| 176 | + def update_cache(self): |
| 177 | + if not self.lastPipeline == self.pipeline.nodes: |
| 178 | + self.cache = {"skip": {}, "deps": {}} |
| 179 | + self.lastPipeline = self.pipeline.nodes |
| 180 | + |
| 181 | + def get_skips(self, node): |
| 182 | + self.update_cache() |
| 183 | + skip = self.cache["skip"].get(node) |
| 184 | + if skip is None: |
| 185 | + skip = self.pipeline.get_dependents(node) |
| 186 | + self.cache["skip"][node] = skip |
| 187 | + return skip |
| 188 | + |
| 189 | + def get_output_deps(self, node, output): |
| 190 | + self.update_cache() |
| 191 | + |
| 192 | + if node not in self.cache["deps"]: |
| 193 | + self.cache["deps"][node] = {} |
| 194 | + |
| 195 | + deps = self.cache["deps"][node].get(output) |
| 196 | + |
| 197 | + if deps is None: |
| 198 | + deps = [] |
| 199 | + for i in self.pipeline.nodes.values(): |
| 200 | + for link in i.inputLinks.values(): |
| 201 | + if link.node is node and link.name == output: |
| 202 | + deps.append(i) |
| 203 | + self.cache["deps"][node][output] = deps |
| 204 | + |
| 205 | + return deps |
| 206 | + |
| 207 | + def cancel_node(self, node): |
175 | 208 | try: |
176 | | - self.pipeline.cancel_dependents(self.pipeline.current) |
| 209 | + # reset path cache if pipeline has changed |
| 210 | + skip = self.get_skips(node) |
| 211 | + self.pipeline.cancel_nodes(skip) |
177 | 212 | except: |
178 | 213 | raise ValueError("Pipeline not available! Cannot cancel dependents.") |
179 | 214 |
|
| 215 | + def cancel_current(self): |
| 216 | + self.cancel_node(self.pipeline.current) |
| 217 | + |
| 218 | + def cancel_output(self, output: str): |
| 219 | + node = self.pipeline.current |
| 220 | + deps = self.get_output_deps(node, output) |
| 221 | + for dep in deps: |
| 222 | + self.cancel_node(dep) |
| 223 | + |
| 224 | + def add_listener(self, event: str, function: callable): |
| 225 | + self.listeners[event].add(function) |
| 226 | + |
| 227 | + def remove_listener(self, event: str, function: callable): |
| 228 | + self.listeners[event].discard(function) |
| 229 | + |
| 230 | + def startup(self): |
| 231 | + for func in self.listeners["startup"]: |
| 232 | + func() |
| 233 | + |
| 234 | + def shutdown(self): |
| 235 | + for func in self.listeners["shutdown"]: |
| 236 | + func() |
| 237 | + |
180 | 238 |
|
181 | 239 | def ishook(hook): |
182 | 240 | return isinstance(hook, Hook) |
|
0 commit comments