Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 161 additions & 88 deletions blinkDetect.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# -*- coding: utf-8 -*-
# -*- coding: utf-8 -*-
"""
Created on Tue Oct 29 19:51:37 2019

@author: Lenovo

Fixed version with proper camera error handling
"""

import dlib
Expand Down Expand Up @@ -121,11 +123,9 @@ def eye_aspect_ratio(eye):
B = dist.euclidean(eye[2], eye[4])
C = dist.euclidean(eye[0], eye[3])
ear = (A + B) / (2.0 * C)

return ear


def checkEyeStatus(landmarks):
def checkEyeStatus(landmarks, frame):
global session_tracker, current_ear
mask = np.zeros(frame.shape[:2], dtype = np.float32)

Expand All @@ -139,7 +139,6 @@ def checkEyeStatus(landmarks):
for i in range(0, len(rightEyeIndex)):
hullRightEye.append((landmarks[rightEyeIndex[i]][0], landmarks[rightEyeIndex[i]][1]))


cv2.fillConvexPoly(mask, np.int32(hullRightEye), 255)

leftEAR = eye_aspect_ratio(hullLeftEye)
Expand All @@ -161,19 +160,16 @@ def checkBlinkStatus(eyeStatus):
if(state >= 0 and state <= falseBlinkLimit):
if(eyeStatus):
state = 0

else:
state += 1

elif(state >= falseBlinkLimit and state < drowsyLimit):
if(eyeStatus):
blinkCount += 1
state = 0

else:
state += 1


else:
if(eyeStatus):
state = 0
Expand All @@ -182,7 +178,6 @@ def checkBlinkStatus(eyeStatus):
# Phase 2: Track alert when drowsiness is detected
if session_tracker:
session_tracker.add_alert()

else:
drowsy = 3
# Phase 2: Track alert when drowsiness persists
Expand All @@ -208,6 +203,55 @@ def getLandmarks(im):
[points.append((p.x, p.y)) for p in predictor(im, newRect).parts()]
return points

def initialize_camera(max_attempts=3):
"""
Initialize camera with proper error handling
Try different camera indices and handle various failure modes
"""
camera_indices = [0, 1, 2, -1] # Try multiple camera indices

for camera_idx in camera_indices:
print(f"Attempting to open camera {camera_idx}...")

for attempt in range(max_attempts):
try:
capture = cv2.VideoCapture(camera_idx)

# Check if camera opened successfully
if not capture.isOpened():
print(f"Camera {camera_idx} failed to open (attempt {attempt + 1}/{max_attempts})")
capture.release()
time.sleep(1) # Wait before retry
continue

# Try to read a frame to verify camera is actually working
ret, test_frame = capture.read()
if not ret or test_frame is None:
print(f"Camera {camera_idx} opened but can't read frames (attempt {attempt + 1}/{max_attempts})")
capture.release()
time.sleep(1)
continue

# Camera is working properly
print(f"Successfully opened camera {camera_idx}")
return capture, test_frame

except Exception as e:
print(f"Exception opening camera {camera_idx} (attempt {attempt + 1}): {e}")
if 'capture' in locals():
capture.release()
time.sleep(1)

# If all attempts failed
print("\nERROR: Failed to initialize any camera!")
print("Possible solutions:")
print("1. Check if your camera is working with other applications")
print("2. Make sure no other application is using the camera")
print("3. Try running the script with administrator privileges")
print("4. Check camera drivers and connections")

return None, None

# Phase 2: Getter functions for external access (for session_history.py)
def get_current_ear():
"""Get the current EAR value"""
Expand Down Expand Up @@ -246,26 +290,29 @@ def end_current_session():
session_tracker.end_session()
session_tracker = None

capture = cv2.VideoCapture(0)

for i in range(10):
ret, frame = capture.read()
if not capture.isOpened():
print("Error: Could not open webcam.")
sys.exit()
# FIXED: Proper camera initialization with error handling
capture, frame = initialize_camera()
if capture is None:
print("Exiting due to camera initialization failure.")
sys.exit(1)

# Continue with calibration using the verified working camera
totalTime = 0.0
validFrames = 0
dummyFrames = 100

print("Caliberation in Progress!")
print("Calibration in Progress!")
while(validFrames < dummyFrames):
validFrames += 1
t = time.time()
ret, frame = capture.read()

# Additional safety check during calibration
if not ret or frame is None:
print("Error: Could not read frame from webcam.")
break
print("Error: Lost camera connection during calibration.")
capture.release()
cv2.destroyAllWindows()
sys.exit(1)

height, width = frame.shape[:2]
IMAGE_RESIZE = np.float32(height)/RESIZE_HEIGHT
Expand All @@ -287,10 +334,10 @@ def end_current_session():
cv2.imshow("Blink Detection Demo", frame)
if cv2.waitKey(1) & 0xFF == 27:
break

else:
totalTime += timeLandmarks
print("Caliberation Complete!")

print("Calibration Complete!")

spf = totalTime/dummyFrames
print("Current SPF (seconds per frame) is {:.2f} ms".format(spf * 1000))
Expand All @@ -304,76 +351,102 @@ def end_current_session():

if __name__ == "__main__":
vid_writer = cv2.VideoWriter('output-low-light-2.avi',cv2.VideoWriter_fourcc('M','J','P','G'), 15, (frame.shape[1],frame.shape[0]))
while(1):
try:
t = time.time()
ret, frame = capture.read()
height, width = frame.shape[:2]
IMAGE_RESIZE = np.float32(height)/RESIZE_HEIGHT
frame = cv2.resize(frame, None,
fx = 1/IMAGE_RESIZE,
fy = 1/IMAGE_RESIZE,
interpolation = cv2.INTER_LINEAR)

# adjusted = gamma_correction(frame)
adjusted = histogram_equalization(frame)

landmarks = getLandmarks(adjusted)
if landmarks == 0:
validFrames -= 1
cv2.putText(frame, "Unable to detect face, Please check proper lighting", (10, 30), cv2.FONT_HERSHEY_COMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
cv2.putText(frame, "or decrease FACE_DOWNSAMPLE_RATIO", (10, 50), cv2.FONT_HERSHEY_COMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
cv2.imshow("Blink Detection Demo", frame)
if cv2.waitKey(1) & 0xFF == 27:
break
continue

eyeStatus = checkEyeStatus(landmarks)
checkBlinkStatus(eyeStatus)

for i in range(0, len(leftEyeIndex)):
cv2.circle(frame, (landmarks[leftEyeIndex[i]][0], landmarks[leftEyeIndex[i]][1]), 1, (0, 0, 255), -1, lineType=cv2.LINE_AA)

for i in range(0, len(rightEyeIndex)):
cv2.circle(frame, (landmarks[rightEyeIndex[i]][0], landmarks[rightEyeIndex[i]][1]), 1, (0, 0, 255), -1, lineType=cv2.LINE_AA)

if drowsy:
cv2.putText(frame, "! ! ! DROWSINESS ALERT ! ! !", (70, 50), cv2.FONT_HERSHEY_COMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
if not ALARM_ON:
ALARM_ON = True

try:
while(1):
try:
t = time.time()
ret, frame = capture.read()

# FIXED: Better frame reading error handling
if not ret or frame is None:
print("Warning: Failed to read frame from camera")
# Try to reconnect
capture.release()
capture, frame = initialize_camera()
if capture is None:
print("Failed to reconnect to camera. Exiting...")
break
continue

height, width = frame.shape[:2]
IMAGE_RESIZE = np.float32(height)/RESIZE_HEIGHT
frame = cv2.resize(frame, None,
fx = 1/IMAGE_RESIZE,
fy = 1/IMAGE_RESIZE,
interpolation = cv2.INTER_LINEAR)

# adjusted = gamma_correction(frame)
adjusted = histogram_equalization(frame)

landmarks = getLandmarks(adjusted)
if landmarks == 0:
cv2.putText(frame, "Unable to detect face, Please check proper lighting", (10, 30), cv2.FONT_HERSHEY_COMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
cv2.putText(frame, "or decrease FACE_DOWNSAMPLE_RATIO", (10, 50), cv2.FONT_HERSHEY_COMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
cv2.imshow("Blink Detection Demo", frame)
if cv2.waitKey(1) & 0xFF == 27:
break
continue

eyeStatus = checkEyeStatus(landmarks, frame)
checkBlinkStatus(eyeStatus)

for i in range(0, len(leftEyeIndex)):
cv2.circle(frame, (landmarks[leftEyeIndex[i]][0], landmarks[leftEyeIndex[i]][1]), 1, (0, 0, 255), -1, lineType=cv2.LINE_AA)

for i in range(0, len(rightEyeIndex)):
cv2.circle(frame, (landmarks[rightEyeIndex[i]][0], landmarks[rightEyeIndex[i]][1]), 1, (0, 0, 255), -1, lineType=cv2.LINE_AA)

if drowsy:
cv2.putText(frame, "! ! ! DROWSINESS ALERT ! ! !", (70, 50), cv2.FONT_HERSHEY_COMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
if not ALARM_ON:
ALARM_ON = True
threadStatusQ.put(not ALARM_ON)
thread = Thread(target=soundAlert, args=(sound_path, threadStatusQ,))
thread.setDaemon(True)
thread.start()
else:
cv2.putText(frame, "Blinks : {}".format(blinkCount), (460, 80), cv2.FONT_HERSHEY_COMPLEX, 0.8, (0,0,255), 2, cv2.LINE_AA)
ALARM_ON = False

cv2.imshow("Blink Detection", frame)
vid_writer.write(frame)

k = cv2.waitKey(1)
if k == ord('r'):
state = 0
drowsy = 0
ALARM_ON = False
threadStatusQ.put(not ALARM_ON)
thread = Thread(target=soundAlert, args=(sound_path, threadStatusQ,))
thread.setDaemon(True)
thread.start()

else:
cv2.putText(frame, "Blinks : {}".format(blinkCount), (460, 80), cv2.FONT_HERSHEY_COMPLEX, 0.8, (0,0,255), 2, cv2.LINE_AA)
# (0, 400)
ALARM_ON = False


cv2.imshow("Blink Detection", frame)
vid_writer.write(frame)
elif k == ord('q'):
break

k = cv2.waitKey(1)
if k == ord('r'):
state = 0
drowsy = 0
ALARM_ON = False
threadStatusQ.put(not ALARM_ON)
# print("Time taken", time.time() - t)

elif k == ord('q'):
except Exception as e:
print(f"Error in main loop: {e}")
# Try to continue or break based on error type
import traceback
traceback.print_exc()
break

# print("Time taken", time.time() - t)

except Exception as e:
print(e)

# Phase 2: End session when detection stops
if session_tracker:
session_tracker.end_session()

capture.release()
vid_writer.release()
cv2.destroyAllWindows()
except KeyboardInterrupt:
print("\nInterrupted by user")

finally:
# Cleanup
print("Cleaning up...")

# Phase 2: End session when detection stops
if session_tracker:
session_tracker.end_session()

if 'capture' in locals() and capture is not None:
capture.release()

if 'vid_writer' in locals() and vid_writer is not None:
vid_writer.release()

cv2.destroyAllWindows()
print("Cleanup complete.")
13 changes: 11 additions & 2 deletions face-try.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,22 @@
print(f"[ERROR] Failed to load cascade from {cascade_path}")
sys.exit(1)

# Start video capture
print("[INFO] Trying to open default webcam (0)...")
cap = cv2.VideoCapture(0)

if not cap.isOpened():
print("[ERROR] Cannot access webcam.")
print("[WARNING] Default webcam not available. Trying external camera (1)...")
cap = cv2.VideoCapture(1)

if not cap.isOpened():
print("[WARNING] No webcam found. Falling back to test video (test2.mp4)...")
cap = cv2.VideoCapture("test2.mp4")

if not cap.isOpened():
print("[ERROR] No camera or video source available.")
sys.exit(1)


try:
while True:
ret, img = cap.read()
Expand Down