diff --git a/facetracker_mediapipe.py b/facetracker_mediapipe.py new file mode 100644 index 0000000..4534a91 --- /dev/null +++ b/facetracker_mediapipe.py @@ -0,0 +1,491 @@ +import copy +import os +import sys +import argparse +import traceback +import gc + +# mp_drawing = mp.solutions.drawing_utils +# mp_drawing_styles = mp.solutions.drawing_styles + +parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) +parser.add_argument("-i", "--ip", help="Set IP address for sending tracking data", default="127.0.0.1") +parser.add_argument("-p", "--port", type=int, help="Set port for sending tracking data", default=11573) +if os.name == 'nt': + parser.add_argument("-l", "--list-cameras", type=int, help="Set this to 1 to list the available cameras and quit, set this to 2 or higher to output only the names", default=0) + parser.add_argument("-a", "--list-dcaps", type=int, help="Set this to -1 to list all cameras and their available capabilities, set this to a camera id to list that camera's capabilities", default=None) + parser.add_argument("-W", "--width", type=int, help="Set camera and raw RGB width", default=640) + parser.add_argument("-H", "--height", type=int, help="Set camera and raw RGB height", default=360) + parser.add_argument("-D", "--dcap", type=int, help="Set which device capability line to use or -1 to use the default camera settings (FPS still need to be set separately)", default=None) + parser.add_argument("-B", "--blackmagic", type=int, help="When set to 1, special support for Blackmagic devices is enabled", default=0) +else: + parser.add_argument("-W", "--width", type=int, help="Set raw RGB width", default=640) + parser.add_argument("-H", "--height", type=int, help="Set raw RGB height", default=360) +parser.add_argument("-F", "--fps", type=int, help="Set camera frames per second", default=24) +parser.add_argument("-c", "--capture", help="Set camera ID (0, 1...) or video file", default="0") +parser.add_argument("-M", "--mirror-input", action="store_true", help="Process a mirror image of the input video") +parser.add_argument("-m", "--max-threads", type=int, help="Set the maximum number of threads", default=1) +parser.add_argument("-t", "--threshold", type=float, help="Set minimum confidence threshold for face tracking", default=None) +parser.add_argument("-d", "--detection-threshold", type=float, help="Set minimum confidence threshold for face detection", default=0.6) +parser.add_argument("-v", "--visualize", type=int, help="Set this to 1 to visualize the tracking, to 2 to also show face ids, to 3 to add confidence values or to 4 to add numbers to the point display", default=0) +parser.add_argument("-P", "--pnp-points", type=int, help="Set this to 1 to add the 3D fitting points to the visualization", default=0) +parser.add_argument("-s", "--silent", type=int, help="Set this to 1 to prevent text output on the console", default=0) +parser.add_argument("--hands", type=int, help="Set this to 1 to enable hands tracking, 0 to disable hands tracking", default=0) +parser.add_argument("--faces", type=int, help="Set the maximum number of faces (slow)", default=1) +parser.add_argument("--scan-retinaface", type=int, help="When set to 1, scanning for additional faces will be performed using RetinaFace in a background thread, otherwise a simpler, faster face detection mechanism is used. When the maximum number of faces is 1, this option does nothing.", default=0) +parser.add_argument("--scan-every", type=int, help="Set after how many frames a scan for new faces should run", default=3) +parser.add_argument("--discard-after", type=int, help="Set the how long the tracker should keep looking for lost faces", default=10) +parser.add_argument("--max-feature-updates", type=int, help="This is the number of seconds after which feature min/max/medium values will no longer be updated once a face has been detected.", default=900) +parser.add_argument("--no-3d-adapt", type=int, help="When set to 1, the 3D face model will not be adapted to increase the fit", default=1) +parser.add_argument("--try-hard", type=int, help="When set to 1, the tracker will try harder to find a face", default=0) +parser.add_argument("--video-out", help="Set this to the filename of an AVI file to save the tracking visualization as a video", default=None) +parser.add_argument("--video-scale", type=int, help="This is a resolution scale factor applied to the saved AVI file", default=1, choices=[1,2,3,4]) +parser.add_argument("--video-fps", type=float, help="This sets the frame rate of the output AVI file", default=24) +parser.add_argument("--raw-rgb", type=int, help="When this is set, raw RGB frames of the size given with \"-W\" and \"-H\" are read from standard input instead of reading a video", default=0) +parser.add_argument("--log-data", help="You can set a filename to which tracking data will be logged here", default="") +parser.add_argument("--log-output", help="You can set a filename to console output will be logged here", default="") +parser.add_argument("--model", type=int, help="This can be used to select the tracking model. Higher numbers are models with better tracking quality, but slower speed, except for model 4, which is wink optimized. Models 1 and 0 tend to be too rigid for expression and blink detection. Model -2 is roughly equivalent to model 1, but faster. Model -3 is between models 0 and -1.", default=3, choices=[-3, -2, -1, 0, 1, 2, 3, 4]) +parser.add_argument("--model-dir", help="This can be used to specify the path to the directory containing the .onnx model files", default=None) +parser.add_argument("--gaze-tracking", type=int, help="When set to 1, gaze tracking is enabled, which makes things slightly slower", default=1) +parser.add_argument("--face-id-offset", type=int, help="When set, this offset is added to all face ids, which can be useful for mixing tracking data from multiple network sources", default=0) +parser.add_argument("--repeat-video", type=int, help="When set to 1 and a video file was specified with -c, the tracker will loop the video until interrupted", default=0) +parser.add_argument("--dump-points", type=str, help="When set to a filename, the current face 3D points are made symmetric and dumped to the given file when quitting the visualization with the \"q\" key", default="") +parser.add_argument("--benchmark", type=int, help="When set to 1, the different tracking models are benchmarked, starting with the best and ending with the fastest and with gaze tracking disabled for models with negative IDs", default=0) +parser.add_argument("--frame-data", type=int, help="When set to 1, the server is sending webcam frame data", default=0) + +if os.name == 'nt': + parser.add_argument("--use-dshowcapture", type=int, help="When set to 1, libdshowcapture will be used for video input instead of OpenCV", default=1) + parser.add_argument("--blackmagic-options", type=str, help="When set, this additional option string is passed to the blackmagic capture library", default=None) + parser.add_argument("--priority", type=int, help="When set, the process priority will be changed", default=None, choices=[0, 1, 2, 3, 4, 5]) +args = parser.parse_args() + +os.environ["OMP_NUM_THREADS"] = str(args.max_threads) + +class OutputLog(object): + def __init__(self, fh, output): + self.fh = fh + self.output = output + def write(self, buf): + if self.fh is not None: + self.fh.write(buf) + self.output.write(buf) + self.flush() + def flush(self): + if self.fh is not None: + self.fh.flush() + self.output.flush() +output_logfile = None +if args.log_output != "": + output_logfile = open(args.log_output, "w") +sys.stdout = OutputLog(output_logfile, sys.stdout) +sys.stderr = OutputLog(output_logfile, sys.stderr) + +if os.name == 'nt': + import dshowcapture + if args.blackmagic == 1: + dshowcapture.set_bm_enabled(True) + if args.blackmagic_options is not None: + dshowcapture.set_options(args.blackmagic_options) + if args.priority is not None: + import psutil + classes = [psutil.IDLE_PRIORITY_CLASS, psutil.BELOW_NORMAL_PRIORITY_CLASS, psutil.NORMAL_PRIORITY_CLASS, psutil.ABOVE_NORMAL_PRIORITY_CLASS, psutil.HIGH_PRIORITY_CLASS, psutil.REALTIME_PRIORITY_CLASS] + p = psutil.Process(os.getpid()) + p.nice(classes[args.priority]) + +if os.name == 'nt' and (args.list_cameras > 0 or args.list_dcaps is not None): + cap = dshowcapture.DShowCapture() + info = cap.get_info() + unit = 10000000.; + if args.list_dcaps is not None: + formats = {0: "Any", 1: "Unknown", 100: "ARGB", 101: "XRGB", 200: "I420", 201: "NV12", 202: "YV12", 203: "Y800", 300: "YVYU", 301: "YUY2", 302: "UYVY", 303: "HDYC (Unsupported)", 400: "MJPEG", 401: "H264" } + for cam in info: + if args.list_dcaps == -1: + type = "" + if cam['type'] == "Blackmagic": + type = "Blackmagic: " + print(f"{cam['index']}: {type}{cam['name']}") + if args.list_dcaps != -1 and args.list_dcaps != cam['index']: + continue + for caps in cam['caps']: + format = caps['format'] + if caps['format'] in formats: + format = formats[caps['format']] + if caps['minCX'] == caps['maxCX'] and caps['minCY'] == caps['maxCY']: + print(f" {caps['id']}: Resolution: {caps['minCX']}x{caps['minCY']} FPS: {unit/caps['maxInterval']:.3f}-{unit/caps['minInterval']:.3f} Format: {format}") + else: + print(f" {caps['id']}: Resolution: {caps['minCX']}x{caps['minCY']}-{caps['maxCX']}x{caps['maxCY']} FPS: {unit/caps['maxInterval']:.3f}-{unit/caps['minInterval']:.3f} Format: {format}") + else: + if args.list_cameras == 1: + print("Available cameras:") + for cam in info: + type = "" + if cam['type'] == "Blackmagic": + type = "Blackmagic: " + if args.list_cameras == 1: + print(f"{cam['index']}: {type}{cam['name']}") + else: + print(f"{type}{cam['name']}") + cap.destroy_capture() + sys.exit(0) + +import math +import numpy as np +import time +import cv2 +import socket +import struct +import json +from input_reader import InputReader, VideoReader, DShowCaptureReader, try_int +from tracker_mediapipe import Tracker + +max_length = 65535-28 # 28 is for UDP header + + +target_ip = args.ip +target_port = args.port + +if args.faces >= 40: + print("Transmission of tracking data over network is not supported with 40 or more faces.") + +fps = args.fps +dcap = None +use_dshowcapture_flag = False +if os.name == 'nt': + dcap = args.dcap + use_dshowcapture_flag = True if args.use_dshowcapture == 1 else False + input_reader = InputReader(args.capture, args.raw_rgb, args.width, args.height, fps, use_dshowcapture=use_dshowcapture_flag, dcap=dcap) + if args.dcap == -1 and type(input_reader) == DShowCaptureReader: + fps = min(fps, input_reader.device.get_fps()) +else: + input_reader = InputReader(args.capture, args.raw_rgb, args.width, args.height, fps) +if type(input_reader.reader) == VideoReader: + fps = 0 + +log = None +out = None +first = True +height = 0 +width = 0 +tracker = None +sock = None +total_tracking_time = 0.0 +tracking_time = 0.0 +tracking_frames = 0 +frame_count = 0 + +features = ["eye_l", "eye_r", "eyebrow_steepness_l", "eyebrow_updown_l", "eyebrow_quirk_l", "eyebrow_steepness_r", "eyebrow_updown_r", "eyebrow_quirk_r", "mouth_corner_updown_l", "mouth_corner_inout_l", "mouth_corner_updown_r", "mouth_corner_inout_r", "mouth_open", "mouth_wide"] + +if args.log_data != "": + log = open(args.log_data, "w") + log.write("Frame,Time,Width,Height,FPS,Face,FaceID,RightOpen,LeftOpen,AverageConfidence,Success3D,PnPError,RotationQuat.X,RotationQuat.Y,RotationQuat.Z,RotationQuat.W,Euler.X,Euler.Y,Euler.Z,RVec.X,RVec.Y,RVec.Z,TVec.X,TVec.Y,TVec.Z") + for i in range(66): + log.write(f",Landmark[{i}].X,Landmark[{i}].Y,Landmark[{i}].Confidence") + for i in range(66): + log.write(f",Point3D[{i}].X,Point3D[{i}].Y,Point3D[{i}].Z") + for feature in features: + log.write(f",{feature}") + log.write("\r\n") + log.flush() + +is_camera = args.capture == str(try_int(args.capture)) + +try: + attempt = 0 + frame_time = time.perf_counter() + target_duration = 0 + if fps > 0: + target_duration = 1. / float(fps) + repeat = args.repeat_video != 0 and type(input_reader.reader) == VideoReader + need_reinit = 0 + failures = 0 + source_name = input_reader.name + + while repeat or input_reader.is_open(): + if not input_reader.is_open() or need_reinit == 1: + input_reader = InputReader(args.capture, args.raw_rgb, args.width, args.height, fps, use_dshowcapture=use_dshowcapture_flag, dcap=dcap) + if input_reader.name != source_name: + print(f"Failed to reinitialize camera and got {input_reader.name} instead of {source_name}.") + sys.exit(1) + need_reinit = 2 + time.sleep(0.02) + continue + if not input_reader.is_ready(): + time.sleep(0.02) + continue + + ret, frame = input_reader.read() + if ret and args.mirror_input: + frame = cv2.flip(frame, 1) + if not ret: + if repeat: + if need_reinit == 0: + need_reinit = 1 + continue + elif is_camera: + attempt += 1 + if attempt > 30: + break + else: + time.sleep(0.02) + if attempt == 3: + need_reinit = 1 + continue + else: + break; + + attempt = 0 + need_reinit = 0 + frame_count += 1 + now = time.time() + + if first: + first = False + height, width, channels = frame.shape + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + tracker = Tracker(width, height, threshold=args.threshold, max_threads=args.max_threads, max_faces=args.faces, discard_after=args.discard_after, scan_every=args.scan_every, silent=False if args.silent == 0 else True, model_type=args.model, model_dir=args.model_dir, no_gaze=False if args.gaze_tracking != 0 and args.model != -1 else True, detection_threshold=args.detection_threshold, use_retinaface=args.scan_retinaface, max_feature_updates=args.max_feature_updates, static_model=True if args.no_3d_adapt == 1 else False, try_hard=args.try_hard == 1) + if args.video_out is not None: + out = cv2.VideoWriter(args.video_out, cv2.VideoWriter_fourcc('F','F','V','1'), args.video_fps, (width * args.video_scale, height * args.video_scale)) + + try: + inference_start = time.perf_counter() + face = tracker.predict(frame) + if face is not None: + inference_time = (time.perf_counter() - inference_start) + total_tracking_time += inference_time + tracking_time += inference_time + tracking_frames += 1 + packet = bytearray() + detected = False + + if face is not None: + f = copy.copy(face) + + if f.eye_blink is None: + f.eye_blink = [1, 1] + right_state = "O" if f.eye_blink[0] > 0.30 else "-" + left_state = "O" if f.eye_blink[1] > 0.30 else "-" + # if args.silent == 0: + # print(f"Confidence: {f.conf:.4f} / 3D fitting error: {f.pnp_error:.4f} / Eyes: {left_state}, {right_state}") + # detected = True + if not f.success: + pts_3d = np.zeros((70, 3), np.float32) + # packet.extend(bytearray(struct.pack("d", now))) + # packet.extend(bytearray(struct.pack("f", width))) + # packet.extend(bytearray(struct.pack("f", height))) + # packet.extend(bytearray(struct.pack("f", f.eye_blink[0]))) + # packet.extend(bytearray(struct.pack("f", f.eye_blink[1]))) + # packet.extend(bytearray(struct.pack("B", 1 if f.success else 0))) + # packet.extend(bytearray(struct.pack("f", f.pnp_error))) + # packet.extend(bytearray(struct.pack("f", f.quaternion[0]))) + # packet.extend(bytearray(struct.pack("f", f.quaternion[1]))) + # packet.extend(bytearray(struct.pack("f", f.quaternion[2]))) + # packet.extend(bytearray(struct.pack("f", f.quaternion[3]))) + # packet.extend(bytearray(struct.pack("f", f.euler[0]))) + # packet.extend(bytearray(struct.pack("f", f.euler[1]))) + # packet.extend(bytearray(struct.pack("f", f.euler[2]))) + # packet.extend(bytearray(struct.pack("f", f.translation[0]))) + # packet.extend(bytearray(struct.pack("f", f.translation[1]))) + # packet.extend(bytearray(struct.pack("f", f.translation[2]))) + # if log is not None: + # log.write(f"{frame_count},{now},{width},{height},{fps},{f.id},{f.eye_blink[0]},{f.eye_blink[1]},{f.conf},{f.success},{f.pnp_error},{f.quaternion[0]},{f.quaternion[1]},{f.quaternion[2]},{f.quaternion[3]},{f.euler[0]},{f.euler[1]},{f.euler[2]},{f.rotation[0]},{f.rotation[1]},{f.rotation[2]},{f.translation[0]},{f.translation[1]},{f.translation[2]}") + # for (x,y,c) in f.lms: + # packet.extend(bytearray(struct.pack("f", c))) + # if args.visualize > 1: + # frame = cv2.putText(frame, str(f.id), (int(f.bbox[0]), int(f.bbox[1])), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255,0,255)) + # if args.visualize > 2: + # frame = cv2.putText(frame, f"{f.conf:.4f}", (int(f.bbox[0] + 18), int(f.bbox[1] - 6)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,255)) + # for pt_num, (x,y,c) in enumerate(f.lms): + # packet.extend(bytearray(struct.pack("f", y))) + # packet.extend(bytearray(struct.pack("f", x))) + # if log is not None: + # log.write(f",{y},{x},{c}") + # if pt_num == 66 and (f.eye_blink[0] < 0.30 or c < 0.20): + # continue + # if pt_num == 67 and (f.eye_blink[1] < 0.30 or c < 0.20): + # continue + # x = int(x + 0.5) + # y = int(y + 0.5) + # if args.visualize != 0 or out is not None: + # if args.visualize > 3: + # frame = cv2.putText(frame, str(pt_num), (int(y), int(x)), cv2.FONT_HERSHEY_SIMPLEX, 0.25, (255,255,0)) + # color = (0, 255, 0) + # if pt_num >= 66: + # color = (255, 255, 0) + # if not (x < 0 or y < 0 or x >= height or y >= width): + # cv2.circle(frame, (y, x), 1, color, -1) + # if args.pnp_points != 0 and (args.visualize != 0 or out is not None) and f.rotation is not None: + # if args.pnp_points > 1: + # projected = cv2.projectPoints(f.face_3d[0:66], f.rotation, f.translation, tracker.camera, tracker.dist_coeffs) + # else: + # projected = cv2.projectPoints(f.contour, f.rotation, f.translation, tracker.camera, tracker.dist_coeffs) + # for [(x,y)] in projected[0]: + # x = int(x + 0.5) + # y = int(y + 0.5) + # if not (x < 0 or y < 0 or x >= height or y >= width): + # frame[int(x), int(y)] = (0, 255, 255) + # x += 1 + # if not (x < 0 or y < 0 or x >= height or y >= width): + # frame[int(x), int(y)] = (0, 255, 255) + # y += 1 + # if not (x < 0 or y < 0 or x >= height or y >= width): + # frame[int(x), int(y)] = (0, 255, 255) + # x -= 1 + # if not (x < 0 or y < 0 or x >= height or y >= width): + # frame[int(x), int(y)] = (0, 255, 255) + # for (x,y,z) in f.pts_3d: + # packet.extend(bytearray(struct.pack("f", x))) + # packet.extend(bytearray(struct.pack("f", -y))) + # packet.extend(bytearray(struct.pack("f", -z))) + # if log is not None: + # log.write(f",{x},{-y},{-z}") + # if f.current_features is None: + # f.current_features = {} + # for feature in features: + # if not feature in f.current_features: + # f.current_features[feature] = 0 + # packet.extend(bytearray(struct.pack("f", f.current_features[feature]))) + # if log is not None: + # log.write(f",{f.current_features[feature]}") + # if log is not None: + # log.write("\r\n") + # log.flush() + + if detected and len(faces) < 40: + sock.sendto(packet, (target_ip, target_port)) + else: + sock.sendto(struct.pack("B", 0), (target_ip, target_port)) + + + if args.frame_data == 1: + cam_frame = frame if width <= 480 else cv2.resize(frame, (480, math.ceil(height * (480 / width))), interpolation=cv2.INTER_NEAREST) + retval, buffer = cv2.imencode(".jpg", cam_frame) + if retval: + # convert to byte array + buffer = buffer.tobytes() + # get size of the frame + buffer_size = len(buffer) + + num_of_packs = 1 + if buffer_size > max_length: + num_of_packs = math.ceil(buffer_size/max_length) + + # frame_info = {"packs":num_of_packs} + frame_info = bytearray(struct.pack("i", num_of_packs)) + + # send the number of packs to be expected + # print("Number of packs:", num_of_packs) + sock.sendto(frame_info, (target_ip, target_port)) + + left = 0 + right = max_length + + for i in range(num_of_packs): + # print("left:", left) + # print("right:", right) + + # truncate data to send + data = buffer[left:right] + left = right + right += max_length + + # send the frames accordingly + sock.sendto(data, (target_ip, target_port)) + + if out is not None: + video_frame = frame + if args.video_scale != 1: + video_frame = cv2.resize(frame, (width * args.video_scale, height * args.video_scale), interpolation=cv2.INTER_NEAREST) + out.write(video_frame) + if args.video_scale != 1: + del video_frame + + + if args.visualize != 0 and args.frame_data == 0: + cv2.imshow('OpenSeeFace Visualization', frame) + if cv2.waitKey(1) & 0xFF == ord('q'): + if args.dump_points != "" and faces is not None and len(faces) > 0: + np.set_printoptions(threshold=sys.maxsize, precision=15) + pairs = [ + (0, 16), + (1, 15), + (2, 14), + (3, 13), + (4, 12), + (5, 11), + (6, 10), + (7, 9), + (17, 26), + (18, 25), + (19, 24), + (20, 23), + (21, 22), + (31, 35), + (32, 34), + (36, 45), + (37, 44), + (38, 43), + (39, 42), + (40, 47), + (41, 46), + (48, 52), + (49, 51), + (56, 54), + (57, 53), + (58, 62), + (59, 61), + (65, 63) + ] + points = copy.copy(faces[0].face_3d) + for a, b in pairs: + x = (points[a, 0] - points[b, 0]) / 2.0 + y = (points[a, 1] + points[b, 1]) / 2.0 + z = (points[a, 2] + points[b, 2]) / 2.0 + points[a, 0] = x + points[b, 0] = -x + points[[a, b], 1] = y + points[[a, b], 2] = z + points[[8, 27, 28, 29, 33, 50, 55, 60, 64], 0] = 0.0 + points[30, :] = 0.0 + with open(args.dump_points, "w") as fh: + fh.write(repr(points)) + break + failures = 0 + except Exception as e: + if e.__class__ == KeyboardInterrupt: + if args.silent == 0: + print("Quitting") + break + traceback.print_exc() + failures += 1 + if failures > 30: + break + + collected = False + del frame + + duration = time.perf_counter() - frame_time + while duration < target_duration: + if not collected: + gc.collect() + collected = True + duration = time.perf_counter() - frame_time + sleep_time = target_duration - duration + if sleep_time > 0: + time.sleep(sleep_time) + duration = time.perf_counter() - frame_time + frame_time = time.perf_counter() +except KeyboardInterrupt: + if args.silent == 0: + print("Quitting") +if args.hands == 1 and holistic is not None: + holistic.close() +input_reader.close() +if out is not None: + out.release() +cv2.destroyAllWindows() + +if args.silent == 0 and tracking_frames > 0: + average_tracking_time = 1000 * tracking_time / tracking_frames + print(f"Average tracking time per detected face: {average_tracking_time:.2f} ms") + print(f"Tracking time: {total_tracking_time:.3f} s\nFrames: {tracking_frames}") + diff --git a/holistic.py b/holistic.py new file mode 100644 index 0000000..853fbc7 --- /dev/null +++ b/holistic.py @@ -0,0 +1,49 @@ +import cv2 +import mediapipe as mp +mp_drawing = mp.solutions.drawing_utils +mp_drawing_styles = mp.solutions.drawing_styles +mp_holistic = mp.solutions.holistic + + +# For webcam input: +print("1") +cap = cv2.VideoCapture(0) +print("2") +with mp_holistic.Holistic( + min_detection_confidence=0.5, + min_tracking_confidence=0.5) as holistic: + while cap.isOpened(): + print("3") + success, image = cap.read() + if not success: + print("Ignoring empty camera frame.") + # If loading a video, use 'break' instead of 'continue'. + continue + + # To improve performance, optionally mark the image as not writeable to + # pass by reference. + image.flags.writeable = False + image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) + results = holistic.process(image) + + # Draw landmark annotation on the image. + image.flags.writeable = True + image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) + mp_drawing.draw_landmarks( + image, + results.face_landmarks, + mp_holistic.FACEMESH_CONTOURS, + landmark_drawing_spec=None, + connection_drawing_spec=mp_drawing_styles + .get_default_face_mesh_contours_style()) + mp_drawing.draw_landmarks( + image, + results.pose_landmarks, + mp_holistic.POSE_CONNECTIONS, + landmark_drawing_spec=mp_drawing_styles + .get_default_pose_landmarks_style()) + # Flip the image horizontally for a selfie-view display. + cv2.imshow('MediaPipe Holistic', cv2.flip(image, 1)) + if cv2.waitKey(5) & 0xFF == 27: + break +cap.release() \ No newline at end of file diff --git a/models/face_landmarker.task b/models/face_landmarker.task new file mode 100644 index 0000000..c50c845 Binary files /dev/null and b/models/face_landmarker.task differ diff --git a/tracker_mediapipe.py b/tracker_mediapipe.py new file mode 100644 index 0000000..7ffc8eb --- /dev/null +++ b/tracker_mediapipe.py @@ -0,0 +1,544 @@ +import os +import numpy as np +import math +import cv2 +import time +import queue +import threading +import copy +from similaritytransform import SimilarityTransform +from retinaface import RetinaFaceDetector +from remedian import remedian + +import mediapipe as mp +mp_face_mesh = mp.solutions.face_mesh +mp_hands = mp.solutions.hands +mp_holistic = mp.solutions.holistic + +from mediapipe.tasks import python +from mediapipe.tasks.python import vision + +def resolve(name): + f = os.path.join(os.path.dirname(__file__), name) + return f + +def clamp_to_im(pt, w, h): + x = pt[0] + y = pt[1] + if x < 0: + x = 0 + if y < 0: + y = 0 + if x >= w: + x = w-1 + if y >= h: + y = h-1 + return (int(x), int(y+1)) + +def rotate(origin, point, a): + a = -a + ox, oy = origin + px, py = point + + qx = ox + math.cos(a) * (px - ox) - math.sin(a) * (py - oy) + qy = oy + math.sin(a) * (px - ox) + math.cos(a) * (py - oy) + return qx, qy + +def angle(p1, p2): + p1 = np.array(p1) + p2 = np.array(p2) + a = np.arctan2(*(p2 - p1)[::-1]) + return (a % (2 * np.pi)) + +def compensate(p1, p2): + a = angle(p1, p2) + return rotate(p1, p2, a), a + +def rotate_image(image, a, center): + (h, w) = image.shape[:2] + a = np.rad2deg(a) + M = cv2.getRotationMatrix2D((float(center[0]), float(center[1])), a, 1.0) + rotated = cv2.warpAffine(image, M, (w, h)) + return rotated + +def intersects(r1, r2, amount=0.3): + area1 = r1[2] * r1[3] + area2 = r2[2] * r2[3] + inter = 0.0 + total = area1 + area2 + + r1_x1, r1_y1, w, h = r1 + r1_x2 = r1_x1 + w + r1_y2 = r1_y1 + h + r2_x1, r2_y1, w, h = r2 + r2_x2 = r2_x1 + w + r2_y2 = r2_y1 + h + + left = max(r1_x1, r2_x1) + right = min(r1_x2, r2_x2) + top = max(r1_y1, r2_y1) + bottom = min(r1_y2, r2_y2) + if left < right and top < bottom: + inter = (right - left) * (bottom - top) + total -= inter + + if inter / total >= amount: + return True + + return False + + #return not (r1_x1 > r2_x2 or r1_x2 < r2_x1 or r1_y1 > r2_y2 or r1_y2 < r2_y1) + +def group_rects(rects): + rect_groups = {} + for rect in rects: + rect_groups[str(rect)] = [-1, -1, []] + group_id = 0 + for i, rect in enumerate(rects): + name = str(rect) + group = group_id + group_id += 1 + if rect_groups[name][0] < 0: + rect_groups[name] = [group, -1, []] + else: + group = rect_groups[name][0] + for j, other_rect in enumerate(rects): + if i == j: + continue; + inter = intersects(rect, other_rect) + if intersects(rect, other_rect): + rect_groups[str(other_rect)] = [group, -1, []] + return rect_groups + +def logit(p, factor=16.0): + if p >= 1.0: + p = 0.9999999 + if p <= 0.0: + p = 0.0000001 + p = p/(1-p) + return float(np.log(p)) / float(factor) + +def logit_arr(p, factor=16.0): + p = np.clip(p, 0.0000001, 0.9999999) + return np.log(p / (1 - p)) / float(factor) + +def matrix_to_quaternion(m): + t = 0.0 + q = [0.0, 0.0, 0, 0.0] + if m[2,2] < 0: + if m[0,0] > m[1,1]: + t = 1 + m[0,0] - m[1,1] - m[2,2] + q = [t, m[0,1]+m[1,0], m[2,0]+m[0,2], m[1,2]-m[2,1]] + else: + t = 1 - m[0,0] + m[1,1] - m[2,2] + q = [m[0,1]+m[1,0], t, m[1,2]+m[2,1], m[2,0]-m[0,2]] + else: + if m[0,0] < -m[1,1]: + t = 1 - m[0,0] - m[1,1] + m[2,2] + q = [m[2,0]+m[0,2], m[1,2]+m[2,1], t, m[0,1]-m[1,0]] + else: + t = 1 + m[0,0] + m[1,1] + m[2,2] + q = [m[1,2]-m[2,1], m[2,0]-m[0,2], m[0,1]-m[1,0], t] + q = np.array(q, np.float32) * 0.5 / np.sqrt(t) + return q + +def worker_thread(session, frame, input, crop_info, queue, input_name, idx, tracker): + output = session.run([], {input_name: input})[0] + conf, lms = tracker.landmarks(output[0], crop_info) + if conf > tracker.threshold: + try: + eye_state = tracker.get_eye_state(frame, lms) + except: + eye_state = [(1.0, 0.0, 0.0, 0.0), (1.0, 0.0, 0.0, 0.0)] + queue.put((session, conf, (lms, eye_state), crop_info, idx)) + else: + queue.put((session,)) + +class Feature(): + def __init__(self, threshold=0.15, alpha=0.2, hard_factor=0.15, decay=0.001, max_feature_updates=0): + self.median = remedian() + self.min = None + self.max = None + self.hard_min = None + self.hard_max = None + self.threshold = threshold + self.alpha = alpha + self.hard_factor = hard_factor + self.decay = decay + self.last = 0 + self.current_median = 0 + self.update_count = 0 + self.max_feature_updates = max_feature_updates + self.first_seen = -1 + self.updating = True + + def update(self, x, now=0): + if self.max_feature_updates > 0: + if self.first_seen == -1: + self.first_seen = now; + new = self.update_state(x, now=now) + filtered = self.last * self.alpha + new * (1 - self.alpha) + self.last = filtered + return filtered + + def update_state(self, x, now=0): + updating = self.updating and (self.max_feature_updates == 0 or now - self.first_seen < self.max_feature_updates) + if updating: + self.median + x + self.current_median = self.median.median() + else: + self.updating = False + median = self.current_median + + if self.min is None: + if x < median and (median - x) / median > self.threshold: + if updating: + self.min = x + self.hard_min = self.min + self.hard_factor * (median - self.min) + return -1 + return 0 + else: + if x < self.min: + if updating: + self.min = x + self.hard_min = self.min + self.hard_factor * (median - self.min) + return -1 + if self.max is None: + if x > median and (x - median) / median > self.threshold: + if updating: + self.max = x + self.hard_max = self.max - self.hard_factor * (self.max - median) + return 1 + return 0 + else: + if x > self.max: + if updating: + self.max = x + self.hard_max = self.max - self.hard_factor * (self.max - median) + return 1 + + if updating: + if self.min < self.hard_min: + self.min = self.hard_min * self.decay + self.min * (1 - self.decay) + if self.max > self.hard_max: + self.max = self.hard_max * self.decay + self.max * (1 - self.decay) + + if x < median: + return - (1 - (x - self.min) / (median - self.min)) + elif x > median: + return (x - median) / (self.max - median) + + return 0 + +class FeatureExtractor(): + def __init__(self, max_feature_updates=0): + self.eye_l = Feature(max_feature_updates=max_feature_updates) + self.eye_r = Feature(max_feature_updates=max_feature_updates) + self.eyebrow_updown_l = Feature(max_feature_updates=max_feature_updates) + self.eyebrow_updown_r = Feature(max_feature_updates=max_feature_updates) + self.eyebrow_quirk_l = Feature(threshold=0.05, max_feature_updates=max_feature_updates) + self.eyebrow_quirk_r = Feature(threshold=0.05, max_feature_updates=max_feature_updates) + self.eyebrow_steepness_l = Feature(threshold=0.05, max_feature_updates=max_feature_updates) + self.eyebrow_steepness_r = Feature(threshold=0.05, max_feature_updates=max_feature_updates) + self.mouth_corner_updown_l = Feature(max_feature_updates=max_feature_updates) + self.mouth_corner_updown_r = Feature(max_feature_updates=max_feature_updates) + self.mouth_corner_inout_l = Feature(threshold=0.02, max_feature_updates=max_feature_updates) + self.mouth_corner_inout_r = Feature(threshold=0.02, max_feature_updates=max_feature_updates) + self.mouth_open = Feature(max_feature_updates=max_feature_updates) + self.mouth_wide = Feature(threshold=0.02, max_feature_updates=max_feature_updates) + + def align_points(self, a, b, pts): + a = tuple(a) + b = tuple(b) + alpha = angle(a, b) + alpha = np.rad2deg(alpha) + if alpha >= 90: + alpha = - (alpha - 180) + if alpha <= -90: + alpha = - (alpha + 180) + alpha = np.deg2rad(alpha) + aligned_pts = [] + for pt in pts: + aligned_pts.append(np.array(rotate(a, pt, alpha))) + return alpha, np.array(aligned_pts) + + def update(self, pts, full=True): + features = {} + now = time.perf_counter() + + norm_distance_x = np.mean([pts[127, 0] - pts[356, 0], pts[234, 0] - pts[454, 0]]) + norm_distance_y = np.mean([pts[6, 1] - pts[197, 1], pts[197, 1] - pts[195, 1], pts[195, 1] - pts[5, 1]]) + + a1, f_pts = self.align_points(pts[33], pts[133], pts[[160,158,144,153]]) + f = abs((np.mean([f_pts[0,1], f_pts[1,1]]) - np.mean([f_pts[2,1], f_pts[3,1]])) / norm_distance_y) + features["eye_l"] = self.eye_l.update(f, now) + + a2, f_pts = self.align_points(pts[362], pts[263], pts[[385, 387, 380, 373]]) + f = abs((np.mean([f_pts[0,1], f_pts[1,1]]) - np.mean([f_pts[2,1], f_pts[3,1]])) / norm_distance_y) + features["eye_r"] = self.eye_r.update(f, now) + + if full: + a3, _ = self.align_points(pts[127], pts[356], []) + a4, _ = self.align_points(pts[131], pts[360], []) + norm_angle = np.mean(list(map(np.rad2deg, [a1, a2, a3, a4]))) + + a, f_pts = self.align_points(pts[22], pts[26], pts[[22, 23, 24, 25, 26]]) + features["eyebrow_steepness_l"] = self.eyebrow_steepness_l.update(-np.rad2deg(a) - norm_angle, now) + f = np.max(np.abs(np.array(f_pts[1:4]) - f_pts[0, 1])) / norm_distance_y + features["eyebrow_quirk_l"] = self.eyebrow_quirk_l.update(f, now) + + a, f_pts = self.align_points(pts[17], pts[21], pts[[17, 18, 19, 20, 21]]) + features["eyebrow_steepness_r"] = self.eyebrow_steepness_r.update(np.rad2deg(a) - norm_angle, now) + f = np.max(np.abs(np.array(f_pts[1:4]) - f_pts[0, 1])) / norm_distance_y + features["eyebrow_quirk_r"] = self.eyebrow_quirk_r.update(f, now) + else: + features["eyebrow_steepness_l"] = 0. + features["eyebrow_steepness_r"] = 0. + features["eyebrow_quirk_l"] = 0. + features["eyebrow_quirk_r"] = 0. + + f = (np.mean([pts[285, 1], pts[276, 1]]) - pts[168, 1]) / norm_distance_y + features["eyebrow_updown_l"] = self.eyebrow_updown_l.update(f, now) + + f = (np.mean([pts[46, 1], pts[55, 1]]) - pts[168, 1]) / norm_distance_y + features["eyebrow_updown_r"] = self.eyebrow_updown_r.update(f, now) + + upper_mouth_line = np.mean([pts[37, 1], pts[0, 1], pts[267, 1]]) + center_line = np.mean([pts[6, 0], pts[5, 0], pts[4, 0], pts[0, 0], pts[13, 0], pts[14, 0], pts[17, 0]]) + + f = (upper_mouth_line - pts[62, 1]) / norm_distance_y + features["mouth_corner_updown_l"] = self.mouth_corner_updown_l.update(f, now) + if full: + f = abs(center_line - pts[62, 0]) / norm_distance_x + features["mouth_corner_inout_l"] = self.mouth_corner_inout_l.update(f, now) + else: + features["mouth_corner_inout_l"] = 0. + + f = (upper_mouth_line - pts[58, 1]) / norm_distance_y + features["mouth_corner_updown_r"] = self.mouth_corner_updown_r.update(f, now) + if full: + f = abs(center_line - pts[58, 0]) / norm_distance_x + features["mouth_corner_inout_r"] = self.mouth_corner_inout_r.update(f, now) + else: + features["mouth_corner_inout_r"] = 0. + + f = abs(np.mean(pts[[59,60,61], 1], axis=0) - np.mean(pts[[63,64,65], 1], axis=0)) / norm_distance_y + features["mouth_open"] = self.mouth_open.update(f, now) + + f = abs(pts[58, 0] - pts[62, 0]) / norm_distance_x + features["mouth_wide"] = self.mouth_wide.update(f, now) + + return features + +class FaceInfo(): + def __init__(self, id, tracker): + self.id = id + self.frame_count = -1 + self.tracker = tracker + self.reset() + self.alive = False + self.coord = None + self.base_scale_v = 1 + self.base_scale_h = 1 + + self.limit_3d_adjustment = True + self.update_count_delta = 75. + self.update_count_max = 7500. + + if self.tracker.max_feature_updates > 0: + self.features = FeatureExtractor(self.tracker.max_feature_updates) + + def reset(self): + self.alive = False + self.conf = None + self.lms = None + self.eye_state = None + self.rotation = None + self.translation = None + self.success = None + self.quaternion = None + self.euler = None + self.pnp_error = None + self.pts_3d = None + self.eye_blink = None + self.bbox = None + self.holistic_info = None + self.pnp_error = 0 + if self.tracker.max_feature_updates < 1: + self.features = FeatureExtractor(0) + self.current_features = {} + self.contour = np.zeros((21,3)) + self.update_counts = np.zeros((66,2)) + self.fail_count = 0 + + def update(self, result, frame_count): + self.frame_count = frame_count + if result is None: + self.reset() + else: + self.holistic_info = result + self.alive = True + + def adjust_3d(self): + self.pts_3d = self.normalize_pts3d(self.holistic_info.face_landmarks) + self.current_features = self.features.update(self.pts_3d[:, 0:2]) + self.eye_blink = [] + self.eye_blink.append(1 - min(max(0, -self.current_features["eye_r"]), 1)) + self.eye_blink.append(1 - min(max(0, -self.current_features["eye_l"]), 1)) + + def normalize_pts3d(self, landmarks): + print(len(landmarks.landmark)) + pts_3d = np.array([[l.x, l.y, l.z] for l in landmarks.landmark]) + # Calculate angle using nose + pts_3d[:, 0:2] -= pts_3d[30, 0:2] + alpha = angle(pts_3d[30, 0:2], pts_3d[27, 0:2]) + alpha -= np.deg2rad(90) + + R = np.matrix([[np.cos(alpha), -np.sin(alpha)], [np.sin(alpha), np.cos(alpha)]]) + pts_3d[:, 0:2] = (pts_3d - pts_3d[30])[:, 0:2].dot(R) + pts_3d[30, 0:2] + + # Vertical scale + pts_3d[:, 1] /= np.mean((pts_3d[27:30, 1] - pts_3d[28:31, 1]) / self.base_scale_v) + + # Horizontal scale + pts_3d[:, 0] /= np.mean(np.abs(pts_3d[[0, 36, 42], 0] - pts_3d[[16, 39, 45], 0]) / self.base_scale_h) + + return pts_3d + +def get_model_base_path(model_dir): + model_base_path = resolve(os.path.join("models")) + if model_dir is None: + if not os.path.exists(model_base_path): + model_base_path = resolve(os.path.join("..", "models")) + else: + model_base_path = model_dir + return model_base_path + +class Tracker(): + def __init__(self, width, height, model_type=3, detection_threshold=0.6, threshold=None, max_faces=1, discard_after=5, scan_every=3, bbox_growth=0.0, max_threads=4, silent=False, model_dir=None, no_gaze=False, use_retinaface=False, max_feature_updates=0, static_model=False, feature_level=2, try_hard=False): + self.model_type = model_type + + self.holistic = mp_holistic.Holistic(model_complexity=1,min_detection_confidence=0.82,min_tracking_confidence=0.82,enable_segmentation=False,refine_face_landmarks=True) + # self.face = mp_face_mesh.Face_Mesh + + model_base_path = get_model_base_path(None) + model = os.path.join(model_base_path, "face_landmarker.task") + + with open(model, 'rb') as f: + vision_model = f.read() + + base_options = python.BaseOptions(model_asset_buffer=vision_model) + options = vision.FaceLandmarkerOptions(base_options=base_options, + output_face_blendshapes=True, + output_facial_transformation_matrixes=True, + num_faces=1) + self.detector = vision.FaceLandmarker.create_from_options(options) + + if threshold is None: + threshold = 0.6 + if model_type < 0: + threshold = 0.87 + + self.faces = [] + + # Image normalization constants + self.mean = np.float32(np.array([0.485, 0.456, 0.406])) + self.std = np.float32(np.array([0.229, 0.224, 0.225])) + self.mean = self.mean / self.std + self.std = self.std * 255.0 + + self.mean = - self.mean + self.std = 1.0 / self.std + self.mean_32 = np.tile(self.mean, [32, 32, 1]) + self.std_32 = np.tile(self.std, [32, 32, 1]) + self.mean_224 = np.tile(self.mean, [224, 224, 1]) + self.std_224 = np.tile(self.std, [224, 224, 1]) + + self.camera = np.array([[width, 0, width/2], [0, width, height/2], [0, 0, 1]], np.float32) + self.inverse_camera = np.linalg.inv(self.camera) + self.dist_coeffs = np.zeros((4,1)) + + self.frame_count = 0 + self.width = width + self.height = height + self.threshold = threshold + self.detection_threshold = detection_threshold + self.max_faces = max_faces + self.max_threads = max_threads + self.discard = 0 + self.discard_after = discard_after + self.detected = 0 + self.wait_count = 0 + self.scan_every = scan_every + self.bbox_growth = bbox_growth + self.silent = silent + self.try_hard = try_hard + + self.res = 224. + self.mean_res = self.mean_224 + self.std_res = self.std_224 + if model_type < 0: + self.res = 56. + self.mean_res = np.tile(self.mean, [56, 56, 1]) + self.std_res = np.tile(self.std, [56, 56, 1]) + if model_type < -1: + self.res = 112. + self.mean_res = np.tile(self.mean, [112, 112, 1]) + self.std_res = np.tile(self.std, [112, 112, 1]) + self.res_i = int(self.res) + self.out_res = 27. + if model_type < 0: + self.out_res = 6. + if model_type < -1: + self.out_res = 13. + self.out_res_i = int(self.out_res) + 1 + self.logit_factor = 16. + if model_type < 0: + self.logit_factor = 8. + if model_type < -1: + self.logit_factor = 16. + + self.no_gaze = no_gaze + self.debug_gaze = False + self.feature_level = feature_level + if model_type == -1: + self.feature_level = min(feature_level, 1) + self.max_feature_updates = max_feature_updates + self.static_model = static_model + self.face_info = FaceInfo(id, self) + self.fail_count = 0 + + def preprocess(self, im, crop): + x1, y1, x2, y2 = crop + im = np.float32(im[y1:y2, x1:x2,::-1]) # Crop and BGR to RGB + im = cv2.resize(im, (self.res_i, self.res_i), interpolation=cv2.INTER_LINEAR) * self.std_res + self.mean_res + im = np.expand_dims(im, 0) + im = np.transpose(im, (0,3,1,2)) + return im + + def equalize(self, im): + im_yuv = cv2.cvtColor(im, cv2.COLOR_BGR2YUV) + im_yuv[:,:,0] = cv2.equalizeHist(im_yuv[:,:,0]) + return cv2.cvtColor(im_yuv, cv2.COLOR_YUV2BGR) + + def predict(self, frame): + self.frame_count += 1 + start = time.perf_counter() + im = frame + + mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=im) + detection_result = self.detector.detect(mp_image) + + # results = self.holistic.process(im) + + duration = (time.perf_counter() - start) * 1000 + + print(detection_result.face_blendshapes[0][25]); + if not self.silent: + print(f"Took {duration:.2f}ms") + # results = sorted(results, key=lambda x: x.id) + + # self.face_info.update(results, self.frame_count) + # self.face_info.adjust_3d() + + return self.face_info