|
14 | 14 |
|
15 | 15 | """Module for a description of a Parameter.""" |
16 | 16 |
|
| 17 | +import os |
| 18 | +from pathlib import Path |
| 19 | +from tempfile import NamedTemporaryFile |
17 | 20 | from typing import List |
18 | 21 | from typing import Optional |
19 | 22 | from typing import Text |
|
24 | 27 | from launch import LaunchContext |
25 | 28 | from launch import SomeSubstitutionsType |
26 | 29 | from launch import SomeSubstitutionsType_types_tuple |
| 30 | +from launch.frontend.parse_substitution import parse_substitution |
27 | 31 | from launch.substitution import Substitution |
| 32 | +from launch.substitutions import SubstitutionFailure |
28 | 33 | from launch.utilities import ensure_argument_type |
29 | 34 | from launch.utilities import normalize_to_list_of_substitutions |
30 | 35 | from launch.utilities import perform_substitutions |
31 | 36 | from launch.utilities.type_utils import AllowedTypesType |
32 | 37 | from launch.utilities.type_utils import normalize_typed_substitution |
33 | 38 | from launch.utilities.type_utils import perform_typed_substitution |
34 | 39 | from launch.utilities.type_utils import SomeValueType |
| 40 | +from launch.utilities.typing_file_path import FilePath |
| 41 | + |
| 42 | +import yaml |
35 | 43 |
|
36 | 44 | if TYPE_CHECKING: |
37 | 45 | from .parameters_type import EvaluatedParameterValue |
@@ -154,3 +162,99 @@ def evaluate(self, context: LaunchContext) -> Tuple[Text, 'EvaluatedParameterVal |
154 | 162 | self.__evaluated_parameter_name = name |
155 | 163 | self.__evaluated_parameter_rule = (name, value) |
156 | 164 | return (name, value) |
| 165 | + |
| 166 | + |
| 167 | +class ParameterFile: |
| 168 | + """Describes a ROS parameter file.""" |
| 169 | + |
| 170 | + def __init__( |
| 171 | + self, |
| 172 | + param_file: Union[FilePath, SomeSubstitutionsType], |
| 173 | + *, |
| 174 | + allow_substs: [bool, SomeSubstitutionsType] = False |
| 175 | + ) -> None: |
| 176 | + """ |
| 177 | + Construct a parameter file description. |
| 178 | +
|
| 179 | + :param param_file: Path to a parameter file. |
| 180 | + :param allow_subst: Allow substitutions in the parameter file. |
| 181 | + """ |
| 182 | + ensure_argument_type( |
| 183 | + param_file, |
| 184 | + SomeSubstitutionsType_types_tuple + (os.PathLike, bytes), |
| 185 | + 'param_file', |
| 186 | + 'ParameterFile()' |
| 187 | + ) |
| 188 | + ensure_argument_type( |
| 189 | + allow_substs, |
| 190 | + bool, |
| 191 | + 'allow_subst', |
| 192 | + 'ParameterFile()' |
| 193 | + ) |
| 194 | + self.__param_file: Union[List[Substitution], FilePath] = param_file |
| 195 | + if isinstance(param_file, SomeSubstitutionsType_types_tuple): |
| 196 | + self.__param_file = normalize_to_list_of_substitutions(param_file) |
| 197 | + self.__allow_substs = normalize_typed_substitution(allow_substs, data_type=bool) |
| 198 | + self.__evaluated_allow_substs: Optional[bool] = None |
| 199 | + self.__evaluated_param_file: Optional[Path] = None |
| 200 | + self.__created_tmp_file = False |
| 201 | + |
| 202 | + @property |
| 203 | + def param_file(self) -> Union[FilePath, List[Substitution]]: |
| 204 | + """Getter for parameter file.""" |
| 205 | + if self.__evaluated_param_file is not None: |
| 206 | + return self.__evaluated_param_file |
| 207 | + return self.__param_file |
| 208 | + |
| 209 | + @property |
| 210 | + def allow_substs(self) -> Union[bool, List[Substitution]]: |
| 211 | + """Getter for allow substitutions argument.""" |
| 212 | + if self.__evaluated_allow_substs is not None: |
| 213 | + return self.__evaluated_allow_substs |
| 214 | + return self.__allow_substs |
| 215 | + |
| 216 | + def __str__(self) -> Text: |
| 217 | + return ( |
| 218 | + 'launch_ros.description.ParameterFile' |
| 219 | + f'(param_file={self.param_file}, allow_substs={self.allow_substs})' |
| 220 | + ) |
| 221 | + |
| 222 | + def evaluate(self, context: LaunchContext) -> Path: |
| 223 | + """Evaluate and return a parameter file path.""" |
| 224 | + if self.__evaluated_param_file is not None: |
| 225 | + return self.__evaluated_param_file |
| 226 | + |
| 227 | + param_file = self.__param_file |
| 228 | + if isinstance(param_file, list): |
| 229 | + # list of substitutions |
| 230 | + param_file = perform_substitutions(context, self.__param_file) |
| 231 | + |
| 232 | + allow_substs = perform_typed_substitution(context, self.__allow_substs, data_type=bool) |
| 233 | + param_file_path: Path = Path(param_file) |
| 234 | + if allow_substs: |
| 235 | + with open(param_file_path, 'r') as f, NamedTemporaryFile( |
| 236 | + mode='w', prefix='launch_params_', delete=False |
| 237 | + ) as h: |
| 238 | + parsed = perform_substitutions(context, parse_substitution(f.read())) |
| 239 | + try: |
| 240 | + yaml.safe_load(parsed) |
| 241 | + except Exception: |
| 242 | + raise SubstitutionFailure( |
| 243 | + 'The substituted parameter file is not a valid yaml file') |
| 244 | + h.write(parsed) |
| 245 | + param_file_path = Path(h.name) |
| 246 | + self.__created_tmp_file = True |
| 247 | + self.__evaluated_param_file = param_file_path |
| 248 | + return param_file_path |
| 249 | + |
| 250 | + def cleanup(self): |
| 251 | + """Delete created temporary files.""" |
| 252 | + if self.__created_tmp_file and self.__evaluated_param_file is not None: |
| 253 | + try: |
| 254 | + os.unlink(self.__evaluated_param_file) |
| 255 | + except FileNotFoundError: |
| 256 | + pass |
| 257 | + self.__evaluated_param_file = None |
| 258 | + |
| 259 | + def __del__(self): |
| 260 | + self.cleanup() |
0 commit comments