|
11 | 11 | import xml.etree.ElementTree as ET |
12 | 12 |
|
13 | 13 | import attr |
| 14 | +import yaml |
14 | 15 |
|
| 16 | +from .rosparam import load_from_yaml_string as load_rosparam_from_string |
15 | 17 | from .config import ROSConfig, NodeConfig, Parameter |
16 | 18 | from .context import LaunchContext |
17 | 19 | from ..substitution import resolve as resolve_args |
|
27 | 29 | _TAG_TO_LOADER = {} |
28 | 30 |
|
29 | 31 |
|
| 32 | +def _read_contents(tag: ET.Element) -> str: |
| 33 | + """Reads the text contents of an XML element.""" |
| 34 | + # FIXME add support for CDATA -- possibly via lxml or xml.dom? |
| 35 | + return ''.join(t.text for t in tag if t.text) |
| 36 | + |
| 37 | + |
30 | 38 | def _parse_bool(attr: str, val: str) -> bool: |
31 | 39 | """Parses a boolean value from an XML attribute.""" |
32 | 40 | val = val.lower() |
@@ -170,6 +178,65 @@ def _load_param_tag(self, |
170 | 178 |
|
171 | 179 | return ctx, cfg |
172 | 180 |
|
| 181 | + @tag('rosparam', ['command', 'ns', 'file', 'param', 'subst_value']) |
| 182 | + def _load_rosparam_tag(self, |
| 183 | + ctx: LaunchContext, |
| 184 | + cfg: ROSConfig, |
| 185 | + tag: ET.Element |
| 186 | + ) -> Tuple[LaunchContext, ROSConfig]: |
| 187 | + filename = self._read_optional(tag, 'file', ctx) |
| 188 | + subst_value = self._read_optional_bool(tag, 'subst_value', ctx, False) |
| 189 | + ns = self._read_optional(tag, 'ns', ctx) or '' |
| 190 | + param = self._read_optional(tag, 'param', ctx) or '' |
| 191 | + param = namespace_join(ns, param) |
| 192 | + full_param = namespace_join(ctx.namespace, param) |
| 193 | + value = _read_contents(tag) |
| 194 | + |
| 195 | + cmd: str = self._read_optional(tag, 'command', ctx) or 'load' |
| 196 | + if cmd not in ('load', 'delete', 'dump'): |
| 197 | + m = f"<rosparam> unsupported 'command': {cmd}" |
| 198 | + raise FailedToParseLaunchFile(m) |
| 199 | + |
| 200 | + if cmd == 'load' and not filename: |
| 201 | + m = "<rosparam> load command requires 'filename' attribute" |
| 202 | + raise FailedToParseLaunchFile(m) |
| 203 | + |
| 204 | + if cmd == 'load': |
| 205 | + assert filename is not None # mypy can't work this out |
| 206 | + if not self.__files.isfile(filename): |
| 207 | + m = f"<rosparam> file does not exist: {filename}" |
| 208 | + raise FailedToParseLaunchFile(m) |
| 209 | + |
| 210 | + if cmd == 'delete' and filename is not None: |
| 211 | + m = "<rosparam> command:delete does not support filename" |
| 212 | + raise FailedToParseLaunchFile(m) |
| 213 | + |
| 214 | + # handle load command |
| 215 | + if cmd == 'load': |
| 216 | + assert filename is not None # mypy can't work this out |
| 217 | + yml_text = self.__files.read(filename) |
| 218 | + if subst_value: |
| 219 | + yml_text = self._resolve_args(yml_text, ctx) |
| 220 | + logger.debug("parsing rosparam YAML:\n%s", yml_text) |
| 221 | + data = load_rosparam_from_string(yml_text) |
| 222 | + logger.debug("rosparam values: %s", data) |
| 223 | + if not isinstance(data, dict) and not param: |
| 224 | + m = "<rosparam> requires 'param' for non-dictionary values" |
| 225 | + raise FailedToParseLaunchFile(m) |
| 226 | + cfg = cfg.with_param(full_param, data) |
| 227 | + |
| 228 | + # handle dump command |
| 229 | + if cmd == 'dump': |
| 230 | + m = "'dump' command is currently not supported in <rosparam>" |
| 231 | + raise NotImplementedError(m) |
| 232 | + |
| 233 | + # handle delete command |
| 234 | + if cmd == 'delete': |
| 235 | + m = "'delete' command is currently not supported in <rosparam>" |
| 236 | + raise NotImplementedError(m) |
| 237 | + |
| 238 | + return ctx, cfg |
| 239 | + |
173 | 240 | @tag('remap', ['from', 'to']) |
174 | 241 | def _load_remap_tag(self, |
175 | 242 | ctx: LaunchContext, |
|
0 commit comments