-
Notifications
You must be signed in to change notification settings - Fork 599
Expand file tree
/
Copy pathimage.py
More file actions
490 lines (395 loc) · 21.6 KB
/
image.py
File metadata and controls
490 lines (395 loc) · 21.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""For clients to use the image service."""
import collections
import os
import warnings
import numpy as np
from bosdyn.api import image_pb2, image_service_pb2_grpc
from bosdyn.client.common import (BaseClient, common_header_errors, custom_params_error,
error_factory, error_pair, handle_common_header_errors)
from bosdyn.client.exceptions import ResponseError, UnsetStatusError
class ImageResponseError(ResponseError):
"""General class of errors for Image service."""
class UnknownImageSourceError(ImageResponseError):
"""System cannot find the requested image source name."""
class SourceDataError(ImageResponseError):
"""System cannot generate the ImageSource at this time."""
class ImageDataError(ImageResponseError):
"""System cannot generate image data for the ImageCapture at this time."""
class UnsupportedImageFormatRequestedError(ImageResponseError):
"""The image service cannot return data in the requested format."""
class UnsupportedPixelFormatRequestedError(ImageResponseError):
"""The image service cannot return data in the requested pixel format."""
class UnsupportedResizeRatioRequestedError(ImageResponseError):
"""The image service cannot return data with the requested resize ratio."""
_STATUS_TO_ERROR = collections.defaultdict(lambda: (ResponseError, None))
_STATUS_TO_ERROR.update({
image_pb2.ImageResponse.STATUS_OK: (None, None),
image_pb2.ImageResponse.STATUS_UNKNOWN_CAMERA:
error_pair(UnknownImageSourceError),
image_pb2.ImageResponse.STATUS_SOURCE_DATA_ERROR:
error_pair(SourceDataError),
image_pb2.ImageResponse.STATUS_IMAGE_DATA_ERROR:
error_pair(ImageDataError),
image_pb2.ImageResponse.STATUS_UNSUPPORTED_IMAGE_FORMAT_REQUESTED:
error_pair(UnsupportedImageFormatRequestedError),
image_pb2.ImageResponse.STATUS_UNSUPPORTED_PIXEL_FORMAT_REQUESTED:
error_pair(UnsupportedPixelFormatRequestedError),
image_pb2.ImageResponse.STATUS_UNSUPPORTED_RESIZE_RATIO_REQUESTED:
error_pair(UnsupportedResizeRatioRequestedError),
image_pb2.ImageResponse.STATUS_UNKNOWN:
error_pair(UnsetStatusError),
})
@handle_common_header_errors
def _error_from_response(response):
"""Return a custom exception based on the first invalid image response, None if no error."""
for image_response in response.image_responses:
result = custom_params_error(image_response, total_response=response)
if result is not None:
return result
result = error_factory(response, image_response.status,
status_to_string=image_pb2.ImageResponse.Status.Name,
status_to_error=_STATUS_TO_ERROR)
if result is not None:
# The exception is using the image_response. Replace it with the full response.
result.response = response
return result
return None
class ImageClient(BaseClient):
"""Client for the image service."""
default_service_name = 'image'
service_type = 'bosdyn.api.ImageService'
def __init__(self):
super(ImageClient, self).__init__(image_service_pb2_grpc.ImageServiceStub)
def list_image_sources(self, **kwargs):
""" Obtain the list of ImageSources.
Returns:
A list of the different image sources as strings.
Raises:
RpcError: Problem communicating with the robot.
"""
req = self._get_list_image_source_request()
return self.call(self._stub.ListImageSources, req, _list_image_sources_value,
common_header_errors, copy_request=False, **kwargs)
def list_image_sources_async(self, **kwargs):
"""Async version of list_image_sources()"""
req = self._get_list_image_source_request()
return self.call_async(self._stub.ListImageSources, req, _list_image_sources_value,
common_header_errors, copy_request=False, **kwargs)
def get_image_from_sources(self, image_sources, **kwargs):
"""Obtain images from sources using default parameters.
Args:
image_sources (list of strings): The different image sources to request images from.
Returns:
A list of image responses for each of the requested sources.
Raises:
RpcError: Problem communicating with the robot.
UnknownImageSourceError: Provided image source was invalid or not found
image.SourceDataError: Failed to fill out ImageSource. All other fields are not filled
UnsetStatusError: An internal ImageService issue has happened
ImageDataError: Problem with the image data. Only ImageSource is filled
"""
return self.get_image([build_image_request(source) for source in image_sources], **kwargs)
def get_image_from_sources_async(self, image_sources, **kwargs):
"""Obtain images from sources using default parameters."""
return self.get_image_async([build_image_request(source) for source in image_sources],
**kwargs)
def get_image(self, image_requests, **kwargs):
"""Obtain the set of images from the robot.
Args:
image_requests (list of ImageRequest): A list of the ImageRequest protobuf messages which
specify which images to collect.
Returns:
A list of image responses for each of the requested sources.
Raises:
RpcError: Problem communicating with the robot.
UnknownImageSourceError: Provided image source was invalid or not found
image.SourceDataError: Failed to fill out ImageSource. All other fields are not filled
UnsetStatusError: An internal ImageService issue has happened
ImageDataError: Problem with the image data. Only ImageSource is filled
"""
req = self._get_image_request(image_requests)
return self.call(self._stub.GetImage, req, _get_image_value, _error_from_response,
copy_request=False, **kwargs)
def get_image_async(self, image_requests, **kwargs):
"""Async version of get_image()"""
req = self._get_image_request(image_requests)
return self.call_async(self._stub.GetImage, req, _get_image_value, _error_from_response,
copy_request=False, **kwargs)
@staticmethod
def _get_image_request(image_requests):
return image_pb2.GetImageRequest(image_requests=image_requests)
@staticmethod
def _get_list_image_source_request():
return image_pb2.ListImageSourcesRequest()
def build_image_request(image_source_name, quality_percent=75, image_format=None, pixel_format=None,
resize_ratio=None, fallback_formats=None):
"""Helper function which builds an ImageRequest from an image source name.
By default the robot will choose an appropriate format when no image format
is provided. For example, it will choose JPEG for visual images, or RAW for
depth images. Clients can provide an image_format for other cases.
Args:
image_source_name (string): The image source to query.
quality_percent (int): The image quality from [0,100] (percent-value).
image_format (image_pb2.Image.Format): The type of format for the image
data, such as JPEG, RAW, or RLE.
pixel_format (image_pb2.Image.PixelFormat) The pixel format of the image.
resize_ratio (double): Resize ratio for image dimensions.
fallback_formats (image_pb2.Image.PixelFormat) Fallback pixel formats to use
if the pixel_format is invalid.
Returns:
The ImageRequest protobuf message for the given parameters.
"""
return image_pb2.ImageRequest(image_source_name=image_source_name,
quality_percent=quality_percent, image_format=image_format,
pixel_format=pixel_format, fallback_formats=fallback_formats,
resize_ratio=resize_ratio)
def _list_image_sources_value(response):
return response.image_sources
def _get_image_value(response):
return response.image_responses
def pixel_format_to_numpy_type(pixel_format):
if pixel_format == image_pb2.Image.PIXEL_FORMAT_GREYSCALE_U8:
dtype = np.uint8
elif pixel_format == image_pb2.Image.PIXEL_FORMAT_RGB_U8:
dtype = np.uint8
elif pixel_format == image_pb2.Image.PIXEL_FORMAT_RGBA_U8:
dtype = np.uint8
elif pixel_format == image_pb2.Image.PIXEL_FORMAT_DEPTH_U16:
dtype = np.uint16
elif pixel_format == image_pb2.Image.PIXEL_FORMAT_GREYSCALE_U16:
dtype = ">u2" # Big endian
else:
raise Exception('Image PixelFormat type {} not supported'.format(pixel_format))
return dtype
def write_pgm_or_ppm(image_response, filename="", filepath=".", include_pixel_format=False):
"""Write raw data from image_response to a PGM file.
Args:
image_response (image_pb2.ImageResponse): The ImageResponse proto to parse.
filename (string): Name of the output file, if None is passed, then "image-{SOURCENAME}.pgm" is used.
filepath(string): The directory to save the image.
include_pixel_format(bool): append the pixel format to the image name when generating
a filename ("image-{SOURCENAME}-{PIXELFORMAT}.pgm").
"""
# Determine the data type to decode the image.
remote_dtype = pixel_format_to_numpy_type(image_response.shot.image.pixel_format)
if image_response.shot.image.pixel_format in (image_pb2.Image.PIXEL_FORMAT_DEPTH_U16,
image_pb2.Image.PIXEL_FORMAT_GREYSCALE_U16):
local_dtype = np.uint16
max_val = np.iinfo(np.uint16).max
else:
local_dtype = np.uint8
max_val = np.iinfo(np.uint8).max
num_channels = 1
pgm_header_number = 'P5'
file_extension = ".pgm"
# Determine the pixel format to get the number of channels the data comes in.
if image_response.shot.image.pixel_format == image_pb2.Image.PIXEL_FORMAT_RGB_U8:
num_channels = 3
pgm_header_number = 'P6'
file_extension = ".ppm"
elif image_response.shot.image.pixel_format == image_pb2.Image.PIXEL_FORMAT_RGBA_U8:
print("PGM/PPM format does not support RGBA encodings.")
return
elif image_response.shot.image.pixel_format in (image_pb2.Image.PIXEL_FORMAT_GREYSCALE_U8,
image_pb2.Image.PIXEL_FORMAT_DEPTH_U16,
image_pb2.Image.PIXEL_FORMAT_GREYSCALE_U16):
num_channels = 1
else:
print("Unsupported pixel format for PGM/PPM: %s." %
image_pb2.Image.PixelFormat.Name(image_response.shot.image.pixel_format))
return
img = np.frombuffer(image_response.shot.image.data, dtype=remote_dtype)
height = image_response.shot.image.rows
width = image_response.shot.image.cols
try:
img = img.astype(local_dtype).reshape((height, width, num_channels))
except ValueError as err:
print(
"Cannot convert raw image into expected shape (rows %d, cols %d, color channels %d)." %
(height, width, num_channels))
print(err)
return
if not filename:
if include_pixel_format:
filename = "image-{}-{}{}".format(
image_response.source.name,
image_pb2.Image.PixelFormat.Name(image_response.shot.image.pixel_format),
file_extension)
else:
filename = "image-{}{}".format(image_response.source.name, file_extension)
filename = os.path.join(filepath, filename)
try:
fd_out = open(filename, 'w')
except IOError as err:
print("Cannot open file %s. Exception thrown: %s" % (filename, err))
return
pgm_header = pgm_header_number + ' ' + str(width) + ' ' + str(height) + ' ' + str(
max_val) + '\n'
fd_out.write(pgm_header)
img.tofile(fd_out)
print('Saved matrix with pixel values from camera "%s" to file "%s".' %
(image_response.source.name, filename))
def write_image_data(image_response, filename="", filepath=".", include_pixel_format=False):
"""Write image data from image_response to a file.
Args:
image_response (image_pb2.ImageResponse): The ImageResponse proto to parse.
filename (string): Name of the output file (including the file extension), if None is
passed, then "image-{SOURCENAME}.jpg" is used.
filepath(string): The directory to save the image.
include_pixel_format(bool): append the pixel format to the image name when generating
a filename ("image-{SOURCENAME}-{PIXELFORMAT}.jpg").
"""
if not filename:
if include_pixel_format:
filename = 'image-{}-{}.jpg'.format(
image_response.source.name,
image_pb2.Image.PixelFormat.Name(image_response.shot.image.pixel_format))
else:
filename = 'image-{}.jpg'.format(image_response.source.name)
filename = os.path.join(filepath, filename)
try:
with open(filename, 'wb') as outfile:
outfile.write(image_response.shot.image.data)
print('Saved "{}" to "{}".'.format(image_response.source.name, filename))
except IOError as err:
print('Failed to save "{}".'.format(image_response.source.name))
print(err)
def save_images_as_files(image_responses, filename="", filepath=".", include_pixel_format=False):
"""Write image responses to files.
Args:
image_responses (List[image_pb2.ImageResponse]): The list of image responses to save.
filename (string): Name prefix of the output files (made unique by an integer suffix), if None
is passed the image source name is used.
filepath(string): The directory to save the image files.
include_pixel_format(bool): append the pixel format to the image name when generating
a filename ("image-{SOURCENAME}-{PIXELFORMAT}.jpg").
"""
for index, image in enumerate(image_responses):
save_file_name = ""
if filename:
# Add a suffix of the index of the image to ensure the filename is unique.
save_file_name = filename + str(index)
if image.shot.image.format == image_pb2.Image.FORMAT_UNKNOWN:
# Don't save an image with no format.
continue
elif not image.shot.image.format == image_pb2.Image.FORMAT_JPEG:
# Save raw and rle sources as text files with the full matrix saved as text values. The matrix will
# be of size: rows X cols X color channels. Color channels is determined through the pixel format.
write_pgm_or_ppm(image, save_file_name, filepath,
include_pixel_format=include_pixel_format)
else:
# Save jpeg format as a jpeg image.
write_image_data(image, save_file_name, filepath,
include_pixel_format=include_pixel_format)
def pixel_to_camera_space(image_proto, pixel_x, pixel_y, depth=1.0):
"""Using the camera intrinsics, determine the (x,y,z) point in the camera frame for
the (u,v) pixel coordinates.
Args:
image_proto (image_pb2.ImageSource): The image source proto which the pixel coordinates are from.
Use of image_pb2.ImageCaptureAndSource or image_pb2.ImageResponse types here have been deprecated.
pixel_x (int): x-coordinate.
pixel_y (int): y-coordinate.
depth (double): The depth from the camera to the point of interest.
Returns:
An (x,y,z) tuple representing the pixel point of interest now described as a point
in the camera frame.
"""
if 'source' in image_proto.ListFields() or isinstance(
image_proto, image_pb2.ImageResponse) or isinstance(image_proto,
image_pb2.ImageCaptureAndSource):
warnings.warn(
"Use of image_pb2.ImageCaptureAndSource or image_pb2.ImageResponse types for image_proto"
" argument have been deprecated, use image_pb2.ImageSource instead. version=4.0.0",
DeprecationWarning, stacklevel=2)
image_source = image_proto.source
else:
image_source = image_proto
if not image_source.HasField('pinhole'):
raise ValueError('Requires a pinhole camera_model.')
focal_x = image_source.pinhole.intrinsics.focal_length.x
principal_x = image_source.pinhole.intrinsics.principal_point.x
focal_y = image_source.pinhole.intrinsics.focal_length.y
principal_y = image_source.pinhole.intrinsics.principal_point.y
x_rt_camera = depth * (pixel_x - principal_x) / focal_x
y_rt_camera = depth * (pixel_y - principal_y) / focal_y
return (x_rt_camera, y_rt_camera, depth)
# Depth images use PIXEL_FORMAT_DEPTH_U16. A value of 0 or MAX_DEPTH_IMAGE_RANGE
# represents invalid data.
MAX_DEPTH_IMAGE_RANGE = np.iinfo(np.uint16).max
def _depth_image_get_valid_indices(depth_array, min_dist=1, max_dist=MAX_DEPTH_IMAGE_RANGE - 1):
"""Returns an array of indices containing valid depth data.
Args:
depth_array: A numpy array representation of the depth data.
min_dist (uint16): All points in the returned point cloud will be greater than min_dist from the image plane.
max_dist (uint16): All points in the returned point cloud will be less than max_dist from the image plane.
Returns:
A numpy of indices containing valid depth data.
"""
# Saturate the input to valid values.
min_dist = np.clip(min_dist, 1, MAX_DEPTH_IMAGE_RANGE - 1)
max_dist = np.clip(max_dist, 1, MAX_DEPTH_IMAGE_RANGE - 1)
valid_range_idx = np.logical_and(depth_array >= min_dist, depth_array <= max_dist)
return valid_range_idx
def _depth_image_data_to_numpy(image_response):
"""Interprets the image data as a numpy array.
Args:
image_response (image_pb2.ImageResponse): An ImageResponse containing a depth image.
Returns:
A numpy array representation of the depth data.
"""
depth_array = np.frombuffer(image_response.shot.image.data, dtype=np.uint16)
depth_array.shape = (image_response.shot.image.rows, image_response.shot.image.cols, -1)
if depth_array.shape[2] == 1:
depth_array.shape = depth_array.shape[:2]
return depth_array
def depth_image_to_pointcloud(image_response, min_dist=0, max_dist=1000):
"""Converts a depth image into a point cloud using the camera intrinsics. The point
cloud is represented as a numpy array of (x,y,z) values. Requests can optionally filter
the results based on the points distance to the image plane. A depth image is represented
with an unsigned 16-bit integer and a scale factor to convert that distance to meters. In
addition, values of zero and 2^16 (uint 16 maximum) are used to represent invalid indices.
A (min_dist * depth_scale) value that casts to an integer value <=0 will be assigned a
value of 1 (the minimum representational distance). Similarly, a (max_dist * depth_scale)
value that casts to >= 2^16 will be assigned a value of 2^16 - 1 (the maximum
representational distance).
Args:
image_response (image_pb2.ImageResponse): An ImageResponse containing a depth image.
min_dist (double): All points in the returned point cloud will be greater than min_dist from the image plane [meters].
max_dist (double): All points in the returned point cloud will be less than max_dist from the image plane [meters].
Returns:
A numpy stack of (x,y,z) values representing depth image as a point cloud expressed in the sensor frame.
"""
if image_response.source.image_type != image_pb2.ImageSource.IMAGE_TYPE_DEPTH:
raise ValueError('requires an image_type of IMAGE_TYPE_DEPTH.')
if image_response.shot.image.pixel_format != image_pb2.Image.PIXEL_FORMAT_DEPTH_U16:
raise ValueError(
'IMAGE_TYPE_DEPTH with an unsupported format, requires PIXEL_FORMAT_DEPTH_U16.')
if not image_response.source.HasField('pinhole'):
raise ValueError('Requires a pinhole camera_model.')
source_rows = image_response.source.rows
source_cols = image_response.source.cols
fx = image_response.source.pinhole.intrinsics.focal_length.x
fy = image_response.source.pinhole.intrinsics.focal_length.y
cx = image_response.source.pinhole.intrinsics.principal_point.x
cy = image_response.source.pinhole.intrinsics.principal_point.y
depth_scale = image_response.source.depth_scale
# Convert the proto representation into a numpy array.
depth_array = _depth_image_data_to_numpy(image_response)
# Determine which indices have valid data in the user requested range.
valid_inds = _depth_image_get_valid_indices(depth_array, np.rint(min_dist * depth_scale),
np.rint(max_dist * depth_scale))
# Compute the valid data.
rows, cols = np.mgrid[0:source_rows, 0:source_cols]
depth_array = depth_array[valid_inds]
rows = rows[valid_inds]
cols = cols[valid_inds]
# Convert the valid distance data to (x,y,z) values expressed in the sensor frame.
z = depth_array / depth_scale
x = np.multiply(z, (cols - cx)) / fx
y = np.multiply(z, (rows - cy)) / fy
return np.vstack((x, y, z)).T