|
26 | 26 | NamedTuple, |
27 | 27 | Optional, |
28 | 28 | TYPE_CHECKING, |
| 29 | + Union, |
29 | 30 | ) |
30 | 31 |
|
31 | 32 | from galaxy import util |
|
38 | 39 |
|
39 | 40 | if TYPE_CHECKING: |
40 | 41 | from galaxy.job_metrics.instrumenters import InstrumentPlugin |
| 42 | + from galaxy.util import Element |
41 | 43 |
|
42 | 44 | log = logging.getLogger(__name__) |
43 | 45 |
|
@@ -123,21 +125,23 @@ def raw_to_dictifiable(raw_metric: RawMetric) -> DictifiableMetric: |
123 | 125 | metrics = map(raw_to_dictifiable, raw_metrics) |
124 | 126 | return [m for m in metrics if m.safety.value >= allowed_safety.value] |
125 | 127 |
|
126 | | - def set_destination_conf_file(self, destination_id, conf_file): |
| 128 | + def set_destination_conf_file(self, destination_id: str, conf_file: str) -> None: |
127 | 129 | instrumenter = JobInstrumenter.from_file(self.plugin_classes, conf_file) |
128 | 130 | self.set_destination_instrumenter(destination_id, instrumenter) |
129 | 131 |
|
130 | | - def set_destination_conf_element(self, destination_id, element): |
| 132 | + def set_destination_conf_element(self, destination_id: str, element: "Element") -> None: |
131 | 133 | plugin_source = plugin_config.PluginConfigSource("xml", element) |
132 | 134 | instrumenter = JobInstrumenter(self.plugin_classes, plugin_source) |
133 | 135 | self.set_destination_instrumenter(destination_id, instrumenter) |
134 | 136 |
|
135 | | - def set_destination_conf_dicts(self, destination_id, conf_dicts): |
| 137 | + def set_destination_conf_dicts(self, destination_id: str, conf_dicts: List[Dict[str, Any]]) -> None: |
136 | 138 | plugin_source = plugin_config.PluginConfigSource("dict", conf_dicts) |
137 | 139 | instrumenter = JobInstrumenter(self.plugin_classes, plugin_source) |
138 | 140 | self.set_destination_instrumenter(destination_id, instrumenter) |
139 | 141 |
|
140 | | - def set_destination_instrumenter(self, destination_id, job_instrumenter=None): |
| 142 | + def set_destination_instrumenter( |
| 143 | + self, destination_id: str, job_instrumenter: Union["JobInstrumenterI", None] = None |
| 144 | + ) -> None: |
141 | 145 | if job_instrumenter is None: |
142 | 146 | job_instrumenter = NULL_JOB_INSTRUMENTER |
143 | 147 | self.job_instrumenters[destination_id] = job_instrumenter |
|
0 commit comments