|
12 | 12 | # See the License for the specific language governing permissions and |
13 | 13 | # limitations under the License. |
14 | 14 |
|
| 15 | +import sys |
| 16 | + |
| 17 | +from rcl_interfaces.msg import Parameter |
15 | 18 | from rcl_interfaces.msg import ParameterType |
16 | 19 | from rcl_interfaces.msg import ParameterValue |
17 | 20 | from rcl_interfaces.srv import DescribeParameters |
18 | 21 | from rcl_interfaces.srv import GetParameters |
19 | 22 | from rcl_interfaces.srv import ListParameters |
20 | 23 | from rcl_interfaces.srv import SetParameters |
21 | 24 | import rclpy |
| 25 | +from rclpy.parameter import PARAMETER_SEPARATOR_STRING |
22 | 26 | from ros2cli.node.direct import DirectNode |
| 27 | + |
23 | 28 | import yaml |
24 | 29 |
|
25 | 30 |
|
@@ -92,6 +97,65 @@ def get_parameter_value(*, string_value): |
92 | 97 | return value |
93 | 98 |
|
94 | 99 |
|
| 100 | +def parse_parameter_dict(*, namespace, parameter_dict): |
| 101 | + parameters = [] |
| 102 | + for param_name, param_value in parameter_dict.items(): |
| 103 | + full_param_name = namespace + param_name |
| 104 | + # Unroll nested parameters |
| 105 | + if type(param_value) == dict: |
| 106 | + parameters += parse_parameter_dict( |
| 107 | + namespace=full_param_name + PARAMETER_SEPARATOR_STRING, |
| 108 | + parameter_dict=param_value) |
| 109 | + else: |
| 110 | + parameter = Parameter() |
| 111 | + parameter.name = full_param_name |
| 112 | + parameter.value = get_parameter_value(string_value=str(param_value)) |
| 113 | + parameters.append(parameter) |
| 114 | + return parameters |
| 115 | + |
| 116 | + |
| 117 | +def load_parameter_dict(*, node, node_name, parameter_dict): |
| 118 | + |
| 119 | + parameters = parse_parameter_dict(namespace='', parameter_dict=parameter_dict) |
| 120 | + response = call_set_parameters( |
| 121 | + node=node, node_name=node_name, parameters=parameters) |
| 122 | + |
| 123 | + # output response |
| 124 | + assert len(response.results) == len(parameters) |
| 125 | + for i in range(0, len(response.results)): |
| 126 | + result = response.results[i] |
| 127 | + param_name = parameters[i].name |
| 128 | + if result.successful: |
| 129 | + msg = 'Set parameter {} successful'.format(param_name) |
| 130 | + if result.reason: |
| 131 | + msg += ': ' + result.reason |
| 132 | + print(msg) |
| 133 | + else: |
| 134 | + msg = 'Set parameter {} failed'.format(param_name) |
| 135 | + if result.reason: |
| 136 | + msg += ': ' + result.reason |
| 137 | + print(msg, file=sys.stderr) |
| 138 | + |
| 139 | + |
| 140 | +def load_parameter_file(*, node, node_name, parameter_file): |
| 141 | + # Remove leading slash and namespaces |
| 142 | + internal_node_name = node_name.split('/')[-1] |
| 143 | + with open(parameter_file, 'r') as f: |
| 144 | + param_file = yaml.safe_load(f) |
| 145 | + if internal_node_name not in param_file: |
| 146 | + raise RuntimeError('Param file does not contain parameters for {}, ' |
| 147 | + ' only for namespaces: {}' .format(internal_node_name, |
| 148 | + param_file.keys())) |
| 149 | + |
| 150 | + value = param_file[internal_node_name] |
| 151 | + if type(value) != dict or 'ros__parameters' not in value: |
| 152 | + raise RuntimeError('Invalid structure of parameter file in namespace {}' |
| 153 | + 'expected same format as provided by ros2 param dump' |
| 154 | + .format(internal_node_name)) |
| 155 | + load_parameter_dict(node=node, node_name=node_name, |
| 156 | + parameter_dict=value['ros__parameters']) |
| 157 | + |
| 158 | + |
95 | 159 | def call_describe_parameters(*, node, node_name, parameter_names=None): |
96 | 160 | # create client |
97 | 161 | client = node.create_client( |
|
0 commit comments