-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathsynchronization.py
More file actions
82 lines (66 loc) · 3.92 KB
/
synchronization.py
File metadata and controls
82 lines (66 loc) · 3.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import sys, os
if os.getcwd() not in sys.path:
sys.path.append(os.getcwd())
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from pyeyesweb.data_models.sliding_window import SlidingWindow
import cv2
import mediapipe as mp
from pyeyesweb.analysis_primitives.synchronization import Synchronization
# Function to extract the y-coordinate of a specified keypoint (e.g., wrist) from the Mediapipe Pose results.
# The keypoint's visibility is checked to ensure it's sufficiently visible before processing.
def extract_wrist_coordinates(results, keypoint_idx, image_width, image_height):
"""Extract coordinates of a given keypoint."""
keypoint = results.pose_landmarks.landmark[keypoint_idx]
if keypoint.visibility > 0.5: # Only consider keypoints with visibility above the threshold
# Convert normalized coordinates (0 to 1) to pixel values based on image dimensions
x = int(keypoint.x * image_width)
y = int(keypoint.y * image_height)
return y # Return the y-coordinate for tracking vertical movement as a time series
return None # Return None if the keypoint is not sufficiently visible
# Main function that handles the video feed, pose estimation, and synchronization processing.
def main():
# Initialize Mediapipe Pose for real-time pose detection
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5)
# Create an instance of the Synchronization class with specified parameters
# filter_params can be used for preprocessing time-series data (e.g., applying filters)
sync = Synchronization(sensitivity=100, output_phase=True, filter_params=None)
sliding_window = SlidingWindow(max_length=50, n_columns=2)
# Set up video capture using the default webcam (index 0)
cap = cv2.VideoCapture(0)
# Define Mediapipe landmark indices for left and right wrists
LEFT_WRIST_IDX = 15
RIGHT_WRIST_IDX = 16
try:
while cap.isOpened(): # Continue processing frames while the webcam is open
ret, frame = cap.read() # Read a frame from the video feed
if not ret:
print("Failed to grab frame")
break # Exit the loop if no frame is captured
# Flip the frame horizontally for a mirrored view (I find it more intuitive from a user perspective.)
frame = cv2.flip(frame, 1)
image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Convert the frame to RGB for Mediapipe processing
# Process the current frame with Mediapipe Pose to detect landmarks
results = pose.process(image)
height, width, _ = image.shape # Get the dimensions of the frame
if results.pose_landmarks: # If pose landmarks are detected in the frame
# Extract the vertical (y) coordinates of the left and right wrists
left_wrist_y = extract_wrist_coordinates(results, LEFT_WRIST_IDX, width, height)
right_wrist_y = extract_wrist_coordinates(results, RIGHT_WRIST_IDX, width, height)
if left_wrist_y is not None and right_wrist_y is not None:
# Process the extracted wrist coordinates using the Synchronization library
sliding_window.append([left_wrist_y, right_wrist_y])
sync(sliding_window)
# Display the processed video frame in a window named 'MediaPipe Pose'
cv2.imshow('MediaPipe Pose', frame)
# Break the loop and close the window if the 'q' key is pressed
if cv2.waitKey(1) & 0xFF == ord('q'):
break
finally:
# Release the video capture and close any OpenCV windows when the program exits
cap.release()
cv2.destroyAllWindows()
pose.close() # Close the Mediapipe Pose instance to release resources
# Run the main function if the script is executed directly
if __name__ == "__main__":
main()