3636
3737import numpy as np
3838
39- try :
40- from rosbags .typesys .types import sensor_msgs__msg__PointCloud2 as PointCloud2
41- from rosbags .typesys .types import sensor_msgs__msg__PointField as PointField
42- except ImportError as e :
43- raise ImportError ('rosbags library not installed, run "pip install -U rosbags"' ) from e
44-
45-
46- _DATATYPES = {}
47- _DATATYPES [PointField .INT8 ] = np .dtype (np .int8 )
48- _DATATYPES [PointField .UINT8 ] = np .dtype (np .uint8 )
49- _DATATYPES [PointField .INT16 ] = np .dtype (np .int16 )
50- _DATATYPES [PointField .UINT16 ] = np .dtype (np .uint16 )
51- _DATATYPES [PointField .INT32 ] = np .dtype (np .int32 )
52- _DATATYPES [PointField .UINT32 ] = np .dtype (np .uint32 )
53- _DATATYPES [PointField .FLOAT32 ] = np .dtype (np .float32 )
54- _DATATYPES [PointField .FLOAT64 ] = np .dtype (np .float64 )
39+ __TIMESTAMP_ATTRIBUTE_NAMES__ = ["time" , "timestamps" , "timestamp" , "t" ]
40+
41+ _DATATYPES = {
42+ "int8" : np .dtype (np .int8 ),
43+ "uint8" : np .dtype (np .uint8 ),
44+ "int16" : np .dtype (np .int16 ),
45+ "uint16" : np .dtype (np .uint16 ),
46+ "int32" : np .dtype (np .int32 ),
47+ "uint32" : np .dtype (np .uint32 ),
48+ "float32" : np .dtype (np .float32 ),
49+ "float64" : np .dtype (np .float64 ),
50+ }
5551
5652DUMMY_FIELD_PREFIX = "unnamed_field"
5753
5854
59- def read_point_cloud (msg : PointCloud2 ) -> Tuple [np .ndarray , np .ndarray ]:
55+ def read_point_cloud (msg ) -> Tuple [np .ndarray , np .ndarray | None ]:
6056 """
6157 Extract poitns and timestamps from a PointCloud2 message.
6258
@@ -88,7 +84,7 @@ def read_point_cloud(msg: PointCloud2) -> Tuple[np.ndarray, np.ndarray]:
8884
8985
9086def read_points (
91- cloud : PointCloud2 ,
87+ cloud ,
9288 field_names : Optional [List [str ]] = None ,
9389 uvs : Optional [Iterable ] = None ,
9490 reshape_organized_cloud : bool = False ,
@@ -137,7 +133,15 @@ def read_points(
137133 return points
138134
139135
140- def dtype_from_fields (fields : Iterable [PointField ], point_step : Optional [int ] = None ) -> np .dtype :
136+ def get_datatype_name (field ) -> str :
137+ for attr_name , attr_value in vars (field ).items ():
138+ key_lower = attr_name .lower ()
139+ if key_lower in _DATATYPES and attr_value == field .datatype :
140+ return key_lower
141+ raise ValueError (f"Unknown datatype code { field .datatype } for field { vars (field )} " )
142+
143+
144+ def dtype_from_fields (fields : Iterable , point_step : Optional [int ] = None ) -> np .dtype :
141145 """
142146 Convert a Iterable of sensor_msgs.msg.PointField messages to a np.dtype.
143147 :param fields: The point cloud fields.
@@ -152,7 +156,7 @@ def dtype_from_fields(fields: Iterable[PointField], point_step: Optional[int] =
152156 field_datatypes = []
153157 for i , field in enumerate (fields ):
154158 # Datatype as numpy datatype
155- datatype = _DATATYPES [field . datatype ]
159+ datatype = _DATATYPES [get_datatype_name ( field ) ]
156160 # Name field
157161 if field .name == "" :
158162 name = f"{ DUMMY_FIELD_PREFIX } _{ i } "
0 commit comments