diff --git a/examples/exampleDoubleBodyTracking.py b/examples/exampleDoubleBodyTracking.py new file mode 100644 index 0000000..546f241 --- /dev/null +++ b/examples/exampleDoubleBodyTracking.py @@ -0,0 +1,62 @@ +import sys +import cv2 + +sys.path.insert(1, '../') +import pykinect_azure as pykinect + +if __name__ == "__main__": + + # Initialize the library, if the library is not found, add the library path as argument + pykinect.initialize_libraries(track_body=True) + + # Modify camera configuration + device_config = pykinect.default_configuration + device_config.color_resolution = pykinect.K4A_COLOR_RESOLUTION_OFF + device_config.depth_mode = pykinect.K4A_DEPTH_MODE_WFOV_2X2BINNED + # print(device_config) + + # Start device + device = pykinect.start_device(config=device_config, device_index=0) + device1 = pykinect.start_device(config=device_config, device_index=1) + + # Start body tracker + bodyTracker = pykinect.start_body_tracker() + bodyTracker1 = pykinect.start_body_tracker() + + cv2.namedWindow('Depth image with skeleton', cv2.WINDOW_NORMAL) + cv2.namedWindow('Depth image with skeleton1', cv2.WINDOW_NORMAL) + while True: + + # Get capture + capture = device.update() + capture1 = device1.update() + + # Get body tracker frame + body_frame = bodyTracker.update(capture) + body_frame1 = bodyTracker1.update(capture1) + + # Get the color depth image from the capture + ret, depth_color_image = capture.get_colored_depth_image() + ret, depth_color_image1 = capture1.get_colored_depth_image() + + # Get the colored body segmentation + ret, body_image_color = body_frame.get_segmentation_image() + ret, body_image_color1 = body_frame1.get_segmentation_image() + if not ret: + continue + + # Combine both images + combined_image = cv2.addWeighted(depth_color_image, 0.6, body_image_color, 0.4, 0) + combined_image1 = cv2.addWeighted(depth_color_image1, 0.6, body_image_color1, 0.4, 0) + + # Draw the skeletons + combined_image = body_frame.draw_bodies(combined_image) + combined_image1 = body_frame1.draw_bodies(combined_image1) + + # Overlay body segmentation on depth image + cv2.imshow('Depth image with skeleton', combined_image) + cv2.imshow('Depth image with skeleton1', combined_image1) + + # Press q key to stop + if cv2.waitKey(1) == ord('q'): + break diff --git a/pykinect_azure/k4a/capture.py b/pykinect_azure/k4a/capture.py index 85e4d11..9e59335 100644 --- a/pykinect_azure/k4a/capture.py +++ b/pykinect_azure/k4a/capture.py @@ -1,116 +1,112 @@ -import cv2 +import cv2 from pykinect_azure.k4a import _k4a from pykinect_azure.k4a.image import Image from pykinect_azure.k4a.transformation import Transformation from pykinect_azure.utils.postProcessing import smooth_depth_image -class Capture: - - def __init__(self, capture_handle, calibration_handle): - - self._handle = capture_handle - self.calibration_handle = calibration_handle - self.camera_transform = Transformation(calibration_handle) - - def __del__(self): - self.reset() - def is_valid(self): - return self._handle - - def handle(self): - return self._handle - - def reset(self): - if self.is_valid(): - self.release_handle() - self._handle = None +class Capture: - def release_handle(self): - if self.is_valid(): - _k4a.k4a_capture_release(self._handle) + def __init__(self, capture_handle, calibration_handle): - @staticmethod - def create(): + self._handle = capture_handle + self.calibration_handle = calibration_handle + self.camera_transform = Transformation(calibration_handle) - handle = _k4a.k4a_capture_t - _k4a.VERIFY(Capture._k4a.k4a_capture_create(handle),"Create capture failed!") + def __del__(self): + self.reset() - return Capture(handle) + def is_valid(self): + return self._handle - def get_color_image_object(self): - - return Image(_k4a.k4a_capture_get_color_image(self._handle)) + def handle(self): + return self._handle - def get_depth_image_object(self): + def reset(self): + if self.is_valid(): + self.release_handle() + self._handle = None - return Image(_k4a.k4a_capture_get_depth_image(self._handle)) + def release_handle(self): + if self.is_valid(): + _k4a.k4a_capture_release(self._handle) - def get_ir_image_object(self): + @staticmethod + def create(): - return Image(_k4a.k4a_capture_get_ir_image(self._handle)) + handle = _k4a.k4a_capture_t + _k4a.VERIFY(Capture._k4a.k4a_capture_create(handle), "Create capture failed!") - def get_transformed_depth_object(self): - return self.camera_transform.depth_image_to_color_camera(self.get_depth_image_object()) + return Capture(handle) - def get_transformed_color_object(self): - return self.camera_transform.color_image_to_depth_camera(self.get_depth_image_object(),self.get_color_image_object()) + def get_color_image_object(self): - def get_pointcloud_object(self, calibration_type = _k4a.K4A_CALIBRATION_TYPE_DEPTH): - return self.camera_transform.depth_image_to_point_cloud(self.get_depth_image_object(), calibration_type) + return Image(_k4a.k4a_capture_get_color_image(self._handle)) - def get_color_image(self): - return self.get_color_image_object().to_numpy() + def get_depth_image_object(self): - def get_depth_image(self): + return Image(_k4a.k4a_capture_get_depth_image(self._handle)) - return self.get_depth_image_object().to_numpy() + def get_ir_image_object(self): - def get_colored_depth_image(self): - ret, depth_image = self.get_depth_image() - if not ret: - return ret, None + return Image(_k4a.k4a_capture_get_ir_image(self._handle)) - return ret, self.color_depth_image(depth_image) + def get_transformed_depth_object(self): + return self.camera_transform.depth_image_to_color_camera(self.get_depth_image_object()) - def get_ir_image(self): - return self.get_ir_image_object().to_numpy() + def get_transformed_color_object(self): + return self.camera_transform.color_image_to_depth_camera(self.get_depth_image_object(), + self.get_color_image_object()) - def get_transformed_depth_image(self): - return self.get_transformed_depth_object().to_numpy() + def get_pointcloud_object(self, calibration_type=_k4a.K4A_CALIBRATION_TYPE_DEPTH): + return self.camera_transform.depth_image_to_point_cloud(self.get_depth_image_object(), calibration_type) - def get_transformed_colored_depth_image(self): - ret, transformed_depth_image = self.get_transformed_depth_image() + def get_color_image(self): + return self.get_color_image_object().to_numpy() - return ret, self.color_depth_image(transformed_depth_image) + def get_depth_image(self): - def get_transformed_color_image(self): - return self.get_transformed_color_object().to_numpy() + return self.get_depth_image_object().to_numpy() - def get_smooth_depth_image(self, maximum_hole_size=10): - ret, depth_image = self.get_depth_image() - return ret, smooth_depth_image(depth_image,maximum_hole_size) + def get_colored_depth_image(self): + ret, depth_image = self.get_depth_image() + if not ret: + return ret, None - def get_smooth_colored_depth_image(self, maximum_hole_size=10): - ret, smooth_depth_image = self.get_smooth_depth_image(maximum_hole_size) - return ret, self.color_depth_image(smooth_depth_image) + return ret, self.color_depth_image(depth_image) - def get_pointcloud(self, calibration_type = _k4a.K4A_CALIBRATION_TYPE_DEPTH): - ret, points = self.get_pointcloud_object(calibration_type).to_numpy() - points = points.reshape((-1, 3)) - return ret, points + def get_ir_image(self): + return self.get_ir_image_object().to_numpy() - @staticmethod - def color_depth_image(depth_image): - depth_color_image = cv2.convertScaleAbs (depth_image, alpha=0.05) #alpha is fitted by visual comparison with Azure k4aviewer results - depth_color_image = cv2.applyColorMap(depth_color_image, cv2.COLORMAP_JET) + def get_transformed_depth_image(self): + return self.get_transformed_depth_object().to_numpy() - return depth_color_image + def get_transformed_colored_depth_image(self): + ret, transformed_depth_image = self.get_transformed_depth_image() + return ret, self.color_depth_image(transformed_depth_image) + def get_transformed_color_image(self): + return self.get_transformed_color_object().to_numpy() + def get_smooth_depth_image(self, maximum_hole_size=10): + ret, depth_image = self.get_depth_image() + return ret, smooth_depth_image(depth_image, maximum_hole_size) + def get_smooth_colored_depth_image(self, maximum_hole_size=10): + ret, smooth_depth_image = self.get_smooth_depth_image(maximum_hole_size) + return ret, self.color_depth_image(smooth_depth_image) + def get_pointcloud(self, calibration_type=_k4a.K4A_CALIBRATION_TYPE_DEPTH): + ret, points = self.get_pointcloud_object(calibration_type).to_numpy() + points = points.reshape((-1, 3)) + return ret, points + @staticmethod + def color_depth_image(depth_image): + depth_color_image = cv2.convertScaleAbs(depth_image, + alpha=0.05) # alpha is fitted by visual comparison with Azure k4aviewer results + depth_color_image = cv2.applyColorMap(depth_color_image, cv2.COLORMAP_JET) + return depth_color_image diff --git a/pykinect_azure/k4a/device.py b/pykinect_azure/k4a/device.py index 25fbcc4..efbb9fe 100644 --- a/pykinect_azure/k4a/device.py +++ b/pykinect_azure/k4a/device.py @@ -7,153 +7,185 @@ from pykinect_azure.k4a.configuration import Configuration from pykinect_azure.k4arecord.record import Record from pykinect_azure.k4a._k4atypes import K4A_WAIT_INFINITE +from dataclasses import dataclass +from typing import * + +@dataclass +class DeviceData: + calibration: Calibration = None + capture: Capture = None + imu_sample: ImuSample = None + class Device: - calibration = None - capture = None - imu_sample = None + data_dict: List[DeviceData] = {} + + # calibration = None + # capture = None + # imu_sample = None + + def __init__(self, index=0): + self._handle = None + self._handle = self.open(index) + self.recording = False + self.index = index + Device.data_dict[self.index] = DeviceData() + + def __del__(self): + self.close() + + def get_data_dict(self): + return Device.data_dict[self.index] + + def is_valid(self): + return self._handle + + def is_capture_initialized(self): + # return Device.capture + return self.get_data_dict().capture + + def is_imu_sample_initialized(self): + # return Device.imu_sample + return self.get_data_dict().imu_sample + + def handle(self): + return self._handle + + def start(self, configuration, record=False, record_filepath="output.mkv"): + self.configuration = configuration + self.start_cameras(configuration) + self.start_imu() + + if record: + self.record = Record(self._handle, self.configuration.handle(), record_filepath) + self.recording = True - def __init__(self, index=0): - self._handle = None - self._handle = self.open(index) - self.recording = False + def close(self): + if self.is_valid(): + self.stop_imu() + self.stop_cameras() + _k4a.k4a_device_close(self._handle) - def __del__(self): - self.close() + # Clear members + self._handle = None + self.record = None + self.recording = False - def is_valid(self): - return self._handle + def update(self, timeout_in_ms=K4A_WAIT_INFINITE): + # Get cameras capture + capture_handle = self.get_capture(timeout_in_ms) - def is_capture_initialized(self): - return Device.capture + if self.is_capture_initialized(): + self.get_data_dict().capture._handle = capture_handle + else: + self.get_data_dict().capture = Capture(capture_handle, self.get_data_dict().calibration.handle()) - def is_imu_sample_initialized(self): - return Device.imu_sample + # Write capture if recording + if self.recording: + self.record.write_capture(self.get_data_dict().capture.handle()) - def handle(self): - return self._handle + return self.get_data_dict().capture - def start(self, configuration, record=False, record_filepath="output.mkv"): - self.configuration = configuration - self.start_cameras(configuration) - self.start_imu() + def update_imu(self, timeout_in_ms=K4A_WAIT_INFINITE): - if record: - self.record = Record(self._handle, self.configuration.handle(), record_filepath) - self.recording = True + # Get imu sample + imu_sample = self.get_imu_sample(timeout_in_ms) - def close(self): - if self.is_valid(): - self.stop_imu() - self.stop_cameras() - _k4a.k4a_device_close(self._handle) + if self.is_imu_sample_initialized(): + self.get_data_dict().imu_sample._struct = imu_sample + self.get_data_dict().imu_sample.parse_data() + else: + self.get_data_dict().imu_sample = ImuSample(imu_sample) - # Clear members - self._handle = None - self.record = None - self.recording = False + return self.get_data_dict().imu_sample - def update(self, timeout_in_ms=K4A_WAIT_INFINITE): - # Get cameras capture - capture_handle = self.get_capture(timeout_in_ms) + def get_capture(self, timeout_in_ms=_k4a.K4A_WAIT_INFINITE): - if self.is_capture_initialized(): - Device.capture._handle = capture_handle - else : - Device.capture = Capture(capture_handle, Device.calibration.handle()) - - # Write capture if recording - if self.recording: - self.record.write_capture(Device.capture.handle()) - - return Device.capture + # Release current handle + if self.is_capture_initialized(): + self.get_data_dict().capture.release_handle() - def update_imu(self, timeout_in_ms=K4A_WAIT_INFINITE): - - # Get imu sample - imu_sample = self.get_imu_sample(timeout_in_ms) + capture_handle = _k4a.k4a_capture_t() + _k4a.VERIFY(_k4a.k4a_device_get_capture(self._handle, capture_handle, timeout_in_ms), "Get capture failed!") - if self.is_imu_sample_initialized(): - Device.imu_sample._struct = imu_sample - Device.imu_sample.parse_data() - else : - Device.imu_sample = ImuSample(imu_sample) - - return Device.imu_sample + return capture_handle - def get_capture(self, timeout_in_ms=_k4a.K4A_WAIT_INFINITE): + def get_imu_sample(self, timeout_in_ms=_k4a.K4A_WAIT_INFINITE): - # Release current handle - if self.is_capture_initialized(): - Device.capture.release_handle() + imu_sample = _k4a.k4a_imu_sample_t() - capture_handle = _k4a.k4a_capture_t() - _k4a.VERIFY(_k4a.k4a_device_get_capture(self._handle, capture_handle, timeout_in_ms),"Get capture failed!") - - return capture_handle + _k4a.VERIFY(_k4a.k4a_device_get_imu_sample(self._handle, imu_sample, timeout_in_ms), "Get IMU failed!") - def get_imu_sample(self, timeout_in_ms=_k4a.K4A_WAIT_INFINITE): + return imu_sample - imu_sample = _k4a.k4a_imu_sample_t() + def start_cameras(self, device_config): + self.get_data_dict().calibration = self.get_calibration(device_config.depth_mode, + device_config.color_resolution) - _k4a.VERIFY(_k4a.k4a_device_get_imu_sample(self._handle,imu_sample,timeout_in_ms),"Get IMU failed!") + _k4a.VERIFY(_k4a.k4a_device_start_cameras(self._handle, device_config.handle()), "Start K4A cameras failed!") - return imu_sample + def stop_cameras(self): - def start_cameras(self, device_config): - Device.calibration = self.get_calibration(device_config.depth_mode, device_config.color_resolution) + _k4a.k4a_device_stop_cameras(self._handle) - _k4a.VERIFY(_k4a.k4a_device_start_cameras(self._handle, device_config.handle()),"Start K4A cameras failed!") + def start_imu(self): - def stop_cameras(self): + _k4a.VERIFY(_k4a.k4a_device_start_imu(self._handle), "Start K4A IMU failed!") - _k4a.k4a_device_stop_cameras(self._handle) + def stop_imu(self): - def start_imu(self): + _k4a.k4a_device_stop_imu(self._handle) - _k4a.VERIFY(_k4a.k4a_device_start_imu(self._handle),"Start K4A IMU failed!") + def get_serialnum(self): - def stop_imu(self): + serial_number_size = ctypes.c_size_t() + result = _k4a.k4a_device_get_serialnum(self._handle, None, serial_number_size) - _k4a.k4a_device_stop_imu(self._handle) + if result == _k4a.K4A_BUFFER_RESULT_TOO_SMALL: + serial_number = ctypes.create_string_buffer(serial_number_size.value) - def get_serialnum(self): + _k4a.VERIFY(_k4a.k4a_device_get_serialnum(self._handle, serial_number, serial_number_size), + "Read serial number failed!") - serial_number_size = ctypes.c_size_t() - result = _k4a.k4a_device_get_serialnum(self._handle, None, serial_number_size) + return serial_number.value.decode("utf-8") - if result == _k4a.K4A_BUFFER_RESULT_TOO_SMALL: - serial_number = ctypes.create_string_buffer(serial_number_size.value) + def get_calibration(self, depth_mode, color_resolution): - _k4a.VERIFY(_k4a.k4a_device_get_serialnum(self._handle,serial_number,serial_number_size),"Read serial number failed!") + calibration_handle = _k4a.k4a_calibration_t() - return serial_number.value.decode("utf-8") + _k4a.VERIFY(_k4a.k4a_device_get_calibration(self._handle, depth_mode, color_resolution, calibration_handle), + "Get calibration failed!") - def get_calibration(self, depth_mode, color_resolution): + return Calibration(calibration_handle) - calibration_handle = _k4a.k4a_calibration_t() + def get_version(self): - _k4a.VERIFY(_k4a.k4a_device_get_calibration(self._handle,depth_mode,color_resolution,calibration_handle),"Get calibration failed!") - - return Calibration(calibration_handle) + version = _k4a.k4a_hardware_version_t() - def get_version(self): + _k4a.VERIFY(_k4a.k4a_device_get_version(self._handle, version), "Get version failed!") - version = _k4a.k4a_hardware_version_t() + return version - _k4a.VERIFY(_k4a.k4a_device_get_version(self._handle,version),"Get version failed!") + @staticmethod + def open(index=0): + device_handle = _k4a.k4a_device_t() - return version + _k4a.VERIFY(_k4a.k4a_device_open(index, device_handle), "Open K4A Device failed!") - @staticmethod - def open(index=0): - device_handle = _k4a.k4a_device_t() + return device_handle - _k4a.VERIFY(_k4a.k4a_device_open(index, device_handle),"Open K4A Device failed!") + @staticmethod + def device_get_installed_count(): + return int(_k4a.k4a_device_get_installed_count()) - return device_handle + @staticmethod + def get_device_capture(index=0): + return Device.data_dict[index].capture - @staticmethod - def device_get_installed_count(): - return int(_k4a.k4a_device_get_installed_count()) + @staticmethod + def get_device_calibration(index=0): + return Device.data_dict[index].calibration + @staticmethod + def get_device_dict(index=0): + return Device.data_dict[index] diff --git a/pykinect_azure/pykinect.py b/pykinect_azure/pykinect.py index c874c86..1c3fbe2 100644 --- a/pykinect_azure/pykinect.py +++ b/pykinect_azure/pykinect.py @@ -6,9 +6,7 @@ from pykinect_azure.k4arecord import _k4arecord from pykinect_azure.k4arecord.playback import Playback from pykinect_azure.utils import get_k4a_module_path, get_k4abt_module_path, get_k4arecord_module_path - def initialize_libraries(module_k4a_path=None, module_k4abt_path=None, track_body=False): - # Search the module path for k4a if not available if module_k4a_path is None: module_k4a_path = get_k4a_module_path() @@ -29,25 +27,25 @@ def initialize_libraries(module_k4a_path=None, module_k4abt_path=None, track_bod # Initialize k4abt related wrappers init_k4abt(module_k4abt_path) -def init_k4a(module_k4a_path): +def init_k4a(module_k4a_path): _k4a.setup_library(module_k4a_path) -def init_k4abt(module_k4abt_path): +def init_k4abt(module_k4abt_path): _k4abt.setup_library(module_k4abt_path) try: ctypes.cdll.LoadLibrary("C:/Program Files/Azure Kinect Body Tracking SDK/tools/directml.dll") except Exception as e: - k4abt_tracker_default_configuration.processing_mode = K4ABT_TRACKER_PROCESSING_MODE_GPU_CUDA + k4abt_tracker_default_configuration.processing_mode = K4ABT_TRACKER_PROCESSING_MODE_GPU_CUDA -def init_k4arecord(module_k4arecord_path): +def init_k4arecord(module_k4arecord_path): _k4arecord.setup_library(module_k4arecord_path) + def start_device(device_index=0, config=default_configuration, record=False, record_filepath="output.mkv"): - # Create device object device = Device(device_index) @@ -56,14 +54,13 @@ def start_device(device_index=0, config=default_configuration, record=False, rec return device + def start_body_tracker(model_type=_k4abt.K4ABT_DEFAULT_MODEL, calibration=None): if calibration: return Tracker(calibration, model_type) else: - return Tracker(Device.calibration, model_type) + return Tracker(Device.get_device_calibration(), model_type) -def start_playback(filepath): +def start_playback(filepath): return Playback(filepath) - -