|
| 1 | +from pathlib import Path |
| 2 | +from types import NoneType |
| 3 | +from typing import Any, List, Type |
| 4 | + |
| 5 | +from ament_index_python.packages import get_package_share_directory |
| 6 | + |
| 7 | +from rosidl_parser.parser import parse_idl_file |
| 8 | +import rosidl_parser.definition as idl_def |
| 9 | +import rosidl_adapter.parser as rosidl_parser |
| 10 | +from rosidl_adapter.msg import MSG_TYPE_TO_IDL |
| 11 | + |
| 12 | +IDL_TYPE_TO_MSG = {v: k for k, v in MSG_TYPE_TO_IDL.items()} |
| 13 | + |
| 14 | +BOUNDED_TYPES = [idl_def.BoundedSequence, idl_def.Array] |
| 15 | +BRACKET_TYPES = BOUNDED_TYPES + [idl_def.UnboundedSequence, idl_def.Array] |
| 16 | +STRING_BOUND_TYPES = [idl_def.BoundedString, idl_def.BoundedWString] |
| 17 | + |
| 18 | +UNSUPPORTED_ROSIDL_TYPES = { |
| 19 | + 'long double': 'float64' |
| 20 | +} |
| 21 | + |
| 22 | + |
| 23 | +def find_value(haystack: List, name: str) -> Any: |
| 24 | + for a in haystack: |
| 25 | + if a.name == name: |
| 26 | + return a.value |
| 27 | + |
| 28 | + return None |
| 29 | + |
| 30 | + |
| 31 | +def find_annotation_value(annotations: List, name: str, value_key="value") -> Any: |
| 32 | + for a in annotations: |
| 33 | + if a.name == name: |
| 34 | + return a.value[value_key] |
| 35 | + |
| 36 | + return None |
| 37 | + |
| 38 | + |
| 39 | +def resolve_typename(member_type: Type[idl_def.AbstractType]) -> str: |
| 40 | + if isinstance(member_type, idl_def.BasicType): |
| 41 | + return member_type.typename |
| 42 | + elif isinstance(member_type, idl_def.AbstractWString): |
| 43 | + return "wstring" |
| 44 | + elif isinstance(member_type, idl_def.AbstractString): |
| 45 | + return "string" |
| 46 | + elif isinstance(member_type, idl_def.NamedType): |
| 47 | + return member_type.name |
| 48 | + elif isinstance(member_type, idl_def.AbstractNestedType): |
| 49 | + return resolve_typename(member_type.value_type) |
| 50 | + else: |
| 51 | + return member_type.name |
| 52 | + |
| 53 | + |
| 54 | +def build_type_string(member_type: Type[idl_def.AbstractType], constants: List[idl_def.Constant]) -> str: |
| 55 | + type_string = resolve_typename(member_type) |
| 56 | + |
| 57 | + if type_string in IDL_TYPE_TO_MSG: |
| 58 | + type_string = IDL_TYPE_TO_MSG[type_string] |
| 59 | + elif type_string in UNSUPPORTED_ROSIDL_TYPES: |
| 60 | + print(f"WARNING: Unsupported type could result in in loss of precision '{type_string}' --> '{UNSUPPORTED_ROSIDL_TYPES[type_string]}'") |
| 61 | + type_string = UNSUPPORTED_ROSIDL_TYPES[type_string] |
| 62 | + |
| 63 | + has_string_bounds = any(isinstance(member_type, t) for t in STRING_BOUND_TYPES) |
| 64 | + string_upper_bound = member_type.maximum_size if has_string_bounds else None |
| 65 | + has_brackets = any(isinstance(member_type, t) for t in BRACKET_TYPES) |
| 66 | + |
| 67 | + if type_string == 'wchar': |
| 68 | + # hack to support wchar as a wstring with length 1 |
| 69 | + type_string = 'wstring' |
| 70 | + has_string_bounds = True |
| 71 | + string_upper_bound = 1 |
| 72 | + |
| 73 | + if has_string_bounds: |
| 74 | + if type(string_upper_bound) == str: |
| 75 | + # constant bounded type needs to be resolved to the value |
| 76 | + string_upper_bound = find_value(constants, string_upper_bound) |
| 77 | + assert string_upper_bound is not None |
| 78 | + |
| 79 | + type_string += f"{rosidl_parser.STRING_UPPER_BOUND_TOKEN}{string_upper_bound}" |
| 80 | + |
| 81 | + if has_brackets: |
| 82 | + bounds = '' |
| 83 | + if isinstance(member_type, idl_def.BoundedSequence): |
| 84 | + bounds = f"{rosidl_parser.ARRAY_UPPER_BOUND_TOKEN}{member_type.maximum_size}" |
| 85 | + if isinstance(member_type, idl_def.Array): |
| 86 | + bounds = f"{member_type.size}" |
| 87 | + type_string += f"[{bounds}]" |
| 88 | + |
| 89 | + return type_string |
| 90 | + |
| 91 | + |
| 92 | +def process_constants(constants: List[idl_def.Constant]) -> List[rosidl_parser.Constant]: |
| 93 | + out_constants = [] |
| 94 | + for c in constants: |
| 95 | + if isinstance(c.type, idl_def.BoundedString): |
| 96 | + typename = "string" |
| 97 | + elif isinstance(c.type, idl_def.BoundedWString): |
| 98 | + typename = "wstring" |
| 99 | + else: |
| 100 | + typename = IDL_TYPE_TO_MSG[c.type.typename] |
| 101 | + out_constants.append(rosidl_parser.Constant(typename, c.name, str(c.value))) |
| 102 | + return out_constants |
| 103 | + |
| 104 | + |
| 105 | +def process_members(members: List[idl_def.Member], constants: List[idl_def.Constant]) -> List[rosidl_parser.Field]: |
| 106 | + fields = [] |
| 107 | + for m in members: |
| 108 | + type_pkg_name = None |
| 109 | + type_string = build_type_string(m.type, constants) |
| 110 | + |
| 111 | + if isinstance(m.type, idl_def.NamespacedType): |
| 112 | + type_pkg_name = m.type.namespaces[0] |
| 113 | + |
| 114 | + field_type = rosidl_parser.Type(type_string, type_pkg_name) |
| 115 | + |
| 116 | + default_value_str = find_annotation_value(m.annotations, 'default') |
| 117 | + if type(default_value_str) not in [NoneType, str]: |
| 118 | + default_value_str = str(default_value_str) |
| 119 | + |
| 120 | + fields.append(rosidl_parser.Field(field_type, m.name, default_value_str)) |
| 121 | + |
| 122 | + return fields |
| 123 | + |
| 124 | + |
| 125 | +def parse_idl_message(pkg_name: str, msg_name: str, idl_msg: idl_def.Message) -> rosidl_parser.MessageSpecification: |
| 126 | + fields = process_members(idl_msg.structure.members, idl_msg.constants) |
| 127 | + constants = process_constants(idl_msg.constants) |
| 128 | + |
| 129 | + msg = rosidl_parser.MessageSpecification(pkg_name, msg_name, fields, constants) |
| 130 | + # TODO add comments |
| 131 | + # msg.annotations['comment'] = message_comments |
| 132 | + |
| 133 | + return msg |
| 134 | + |
| 135 | + |
| 136 | +def parse_idl_to_message_spec(pkg_name: str, interface_file_path: str) -> rosidl_parser.MessageSpecification: |
| 137 | + path = Path(interface_file_path) |
| 138 | + # share_dir = get_package_share_directory(pkg_name) |
| 139 | + share_dir = path.parent |
| 140 | + msg_name = path.stem |
| 141 | + |
| 142 | + idl_file = parse_idl_file(idl_def.IdlLocator(share_dir, path.relative_to(share_dir))) |
| 143 | + idl_msg = idl_file.content.get_elements_of_type(idl_def.Message)[0] |
| 144 | + return parse_idl_message(pkg_name, msg_name, idl_msg) |
| 145 | + |
| 146 | + |
| 147 | +def parse_idl_to_service_spec(pkg_name: str, interface_file_path: str) -> rosidl_parser.ServiceSpecification: |
| 148 | + path = Path(interface_file_path) |
| 149 | + share_dir = get_package_share_directory(pkg_name) |
| 150 | + srv_name = path.stem |
| 151 | + |
| 152 | + idl_file = parse_idl_file(idl_def.IdlLocator(share_dir, path.relative_to(share_dir))) |
| 153 | + idl_srv = idl_file.content.get_elements_of_type(idl_def.Service)[0] |
| 154 | + request_message = parse_idl_message(pkg_name, srv_name + rosidl_parser.SERVICE_REQUEST_MESSAGE_SUFFIX, idl_srv.request_message) |
| 155 | + response_message = parse_idl_message(pkg_name, srv_name + rosidl_parser.SERVICE_RESPONSE_MESSAGE_SUFFIX, idl_srv.response_message) |
| 156 | + |
| 157 | + return rosidl_parser.ServiceSpecification(pkg_name, srv_name, request_message, response_message) |
0 commit comments