Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 79 additions & 135 deletions blinkDetect.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
# -*- coding: utf-8 -*-
"""
Created on Tue Oct 29 19:51:37 2019

@author: Lenovo
Driver Drowsiness Detection – CSV logging with drowsy duration
"""

import dlib
Expand All @@ -14,16 +12,23 @@
from threading import Thread
import playsound
import queue
import csv
from datetime import datetime

# 📂 CSV File create/open
csv_file = open("blink_log.csv", mode="w", newline="")
csv_writer = csv.writer(csv_file)
csv_writer.writerow(["Date", "Time", "Status", "Duration (sec)"])

# 👁‍🗨 Eye state tracking variables
eye_closed_start_time = None
is_eye_closed = False

# 🔧 Model & settings
FACE_DOWNSAMPLE_RATIO = 1.5
RESIZE_HEIGHT = 460
thresh = 0.27 # EAR threshold (eyes closed if below this)

thresh = 0.27

# IMPORTANT: You must download the shape_predictor_68_face_landmarks.dat file from
# https://dlib.net/files/shape_predictor_68_face_landmarks.dat.bz2
# and place it in the 'models' folder
modelPath = "models/shape_predictor_68_face_landmarks.dat"
sound_path = "alarm.wav"

Expand All @@ -36,15 +41,18 @@
blinkCount = 0
drowsy = 0
state = 0
blinkTime = 0.15 #150ms
drowsyTime = 1.5 #1200ms
blinkTime = 0.15 # 150ms
drowsyTime = 1.5 # 1500ms (1.5 sec)
ALARM_ON = False
GAMMA = 1.5
threadStatusQ = queue.Queue()

invGamma = 1.0/GAMMA
# Gamma correction table
invGamma = 1.0 / GAMMA
table = np.array([((i / 255.0) ** invGamma) * 255 for i in range(0, 256)]).astype("uint8")


# 📌 Utility Functions
def gamma_correction(image):
return cv2.LUT(image, table)

Expand All @@ -71,83 +79,39 @@ def eye_aspect_ratio(eye):
B = dist.euclidean(eye[2], eye[4])
C = dist.euclidean(eye[0], eye[3])
ear = (A + B) / (2.0 * C)

return ear


def checkEyeStatus(landmarks):
mask = np.zeros(frame.shape[:2], dtype = np.float32)
mask = np.zeros(frame.shape[:2], dtype=np.float32)

hullLeftEye = []
for i in range(0, len(leftEyeIndex)):
hullLeftEye.append((landmarks[leftEyeIndex[i]][0], landmarks[leftEyeIndex[i]][1]))
hullLeftEye = [(landmarks[i][0], landmarks[i][1]) for i in leftEyeIndex]
hullRightEye = [(landmarks[i][0], landmarks[i][1]) for i in rightEyeIndex]

cv2.fillConvexPoly(mask, np.int32(hullLeftEye), 255)

hullRightEye = []
for i in range(0, len(rightEyeIndex)):
hullRightEye.append((landmarks[rightEyeIndex[i]][0], landmarks[rightEyeIndex[i]][1]))


cv2.fillConvexPoly(mask, np.int32(hullRightEye), 255)

leftEAR = eye_aspect_ratio(hullLeftEye)
rightEAR = eye_aspect_ratio(hullRightEye)

ear = (leftEAR + rightEAR) / 2.0

eyeStatus = 1 # 1 = Open, 0 = closed
if (ear < thresh):
eyeStatus = 0

return eyeStatus

def checkBlinkStatus(eyeStatus):
global state, blinkCount, drowsy
if(state >= 0 and state <= falseBlinkLimit):
if(eyeStatus):
state = 0

else:
state += 1

elif(state >= falseBlinkLimit and state < drowsyLimit):
if(eyeStatus):
blinkCount += 1
state = 0

else:
state += 1


else:
if(eyeStatus):
state = 0
drowsy = 3
blinkCount += 1

else:
drowsy = 3
return 1 if ear >= thresh else 0 # 1=open, 0=closed

def getLandmarks(im):
imSmall = cv2.resize(im, None,
fx = 1.0/FACE_DOWNSAMPLE_RATIO,
fy = 1.0/FACE_DOWNSAMPLE_RATIO,
interpolation = cv2.INTER_LINEAR)

imSmall = cv2.resize(im, None, fx=1.0/FACE_DOWNSAMPLE_RATIO, fy=1.0/FACE_DOWNSAMPLE_RATIO, interpolation=cv2.INTER_LINEAR)
rects = detector(imSmall, 0)
if len(rects) == 0:
return 0

newRect = dlib.rectangle(int(rects[0].left() * FACE_DOWNSAMPLE_RATIO),
int(rects[0].top() * FACE_DOWNSAMPLE_RATIO),
int(rects[0].right() * FACE_DOWNSAMPLE_RATIO),
int(rects[0].bottom() * FACE_DOWNSAMPLE_RATIO))
int(rects[0].top() * FACE_DOWNSAMPLE_RATIO),
int(rects[0].right() * FACE_DOWNSAMPLE_RATIO),
int(rects[0].bottom() * FACE_DOWNSAMPLE_RATIO))

points = []
[points.append((p.x, p.y)) for p in predictor(im, newRect).parts()]
points = [(p.x, p.y) for p in predictor(im, newRect).parts()]
return points


# 🎥 Camera setup
capture = cv2.VideoCapture(0)

for i in range(10):
Expand All @@ -160,8 +124,8 @@ def getLandmarks(im):
validFrames = 0
dummyFrames = 100

print("Caliberation in Progress!")
while(validFrames < dummyFrames):
print("Calibrating camera...")
while validFrames < dummyFrames:
validFrames += 1
t = time.time()
ret, frame = capture.read()
Expand All @@ -170,105 +134,85 @@ def getLandmarks(im):
break

height, width = frame.shape[:2]
IMAGE_RESIZE = np.float32(height)/RESIZE_HEIGHT
frame = cv2.resize(frame, None,
fx = 1/IMAGE_RESIZE,
fy = 1/IMAGE_RESIZE,
interpolation = cv2.INTER_LINEAR)
IMAGE_RESIZE = np.float32(height) / RESIZE_HEIGHT
frame = cv2.resize(frame, None, fx=1/IMAGE_RESIZE, fy=1/IMAGE_RESIZE, interpolation=cv2.INTER_LINEAR)

#adjusted = gamma_correction(frame)
adjusted = histogram_equalization(frame)

landmarks = getLandmarks(adjusted)
timeLandmarks = time.time() - t

if landmarks == 0:
validFrames -= 1
cv2.putText(frame, "Unable to detect face, Please check proper lighting", (10, 30), cv2.FONT_HERSHEY_COMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
cv2.putText(frame, "or decrease FACE_DOWNSAMPLE_RATIO", (10, 50), cv2.FONT_HERSHEY_COMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
cv2.imshow("Blink Detection Demo", frame)
if cv2.waitKey(1) & 0xFF == 27:
break

continue
else:
totalTime += timeLandmarks
print("Caliberation Complete!")

spf = totalTime/dummyFrames
print("Current SPF (seconds per frame) is {:.2f} ms".format(spf * 1000))
print("Calibration Complete!")

drowsyLimit = drowsyTime/spf
falseBlinkLimit = blinkTime/spf
print("drowsy limit: {}, false blink limit: {}".format(drowsyLimit, falseBlinkLimit))
spf = totalTime / dummyFrames
print("SPF: {:.2f} ms".format(spf * 1000))

# 🏁 Main Loop
if __name__ == "__main__":
vid_writer = cv2.VideoWriter('output-low-light-2.avi',cv2.VideoWriter_fourcc('M','J','P','G'), 15, (frame.shape[1],frame.shape[0]))
while(1):
vid_writer = cv2.VideoWriter('output.avi', cv2.VideoWriter_fourcc('M','J','P','G'), 15, (frame.shape[1], frame.shape[0]))

while True:
try:
t = time.time()
ret, frame = capture.read()
if not ret:
break

height, width = frame.shape[:2]
IMAGE_RESIZE = np.float32(height)/RESIZE_HEIGHT
frame = cv2.resize(frame, None,
fx = 1/IMAGE_RESIZE,
fy = 1/IMAGE_RESIZE,
interpolation = cv2.INTER_LINEAR)
IMAGE_RESIZE = np.float32(height) / RESIZE_HEIGHT
frame = cv2.resize(frame, None, fx=1/IMAGE_RESIZE, fy=1/IMAGE_RESIZE, interpolation=cv2.INTER_LINEAR)

# adjusted = gamma_correction(frame)
adjusted = histogram_equalization(frame)

landmarks = getLandmarks(adjusted)

if landmarks == 0:
validFrames -= 1
cv2.putText(frame, "Unable to detect face, Please check proper lighting", (10, 30), cv2.FONT_HERSHEY_COMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
cv2.putText(frame, "or decrease FACE_DOWNSAMPLE_RATIO", (10, 50), cv2.FONT_HERSHEY_COMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
cv2.imshow("Blink Detection Demo", frame)
cv2.putText(frame, "Unable to detect face", (10, 30), cv2.FONT_HERSHEY_COMPLEX, 0.5, (0, 0, 255), 1)
cv2.imshow("Drowsiness Detection", frame)
if cv2.waitKey(1) & 0xFF == 27:
break
continue

eyeStatus = checkEyeStatus(landmarks)
checkBlinkStatus(eyeStatus)

for i in range(0, len(leftEyeIndex)):
cv2.circle(frame, (landmarks[leftEyeIndex[i]][0], landmarks[leftEyeIndex[i]][1]), 1, (0, 0, 255), -1, lineType=cv2.LINE_AA)

for i in range(0, len(rightEyeIndex)):
cv2.circle(frame, (landmarks[rightEyeIndex[i]][0], landmarks[rightEyeIndex[i]][1]), 1, (0, 0, 255), -1, lineType=cv2.LINE_AA)

if drowsy:
cv2.putText(frame, "! ! ! DROWSINESS ALERT ! ! !", (70, 50), cv2.FONT_HERSHEY_COMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA)
if not ALARM_ON:
ALARM_ON = True
threadStatusQ.put(not ALARM_ON)
thread = Thread(target=soundAlert, args=(sound_path, threadStatusQ,))
thread.setDaemon(True)
thread.start()
# EAR calculation
leftEAR = eye_aspect_ratio([landmarks[i] for i in leftEyeIndex])
rightEAR = eye_aspect_ratio([landmarks[i] for i in rightEyeIndex])
ear = (leftEAR + rightEAR) / 2.0

if ear < thresh:
if not is_eye_closed:
eye_closed_start_time = time.time()
is_eye_closed = True
else:
cv2.putText(frame, "Blinks : {}".format(blinkCount), (460, 80), cv2.FONT_HERSHEY_COMPLEX, 0.8, (0,0,255), 2, cv2.LINE_AA)
# (0, 400)
ALARM_ON = False


cv2.imshow("Blink Detection", frame)
if is_eye_closed:
eye_closed_duration = time.time() - eye_closed_start_time
is_eye_closed = False

if eye_closed_duration > 2.5:
current_time = datetime.now()
date_str = current_time.strftime("%d-%m-%Y")
time_str = current_time.strftime("%H:%M:%S")
csv_writer.writerow([date_str, time_str, "Drowsy", f"{eye_closed_duration:.2f}"])
print(f"[LOG] Drowsy for {eye_closed_duration:.2f} sec")

for i in leftEyeIndex:
cv2.circle(frame, landmarks[i], 1, (0, 0, 255), -1)
for i in rightEyeIndex:
cv2.circle(frame, landmarks[i], 1, (0, 0, 255), -1)

cv2.imshow("Drowsiness Detection", frame)
vid_writer.write(frame)

k = cv2.waitKey(1)
if k == ord('r'):
state = 0
drowsy = 0
ALARM_ON = False
threadStatusQ.put(not ALARM_ON)

elif k == ord('q'):
k = cv2.waitKey(1)
if k == ord('q'):
break

# print("Time taken", time.time() - t)

except Exception as e:
print(e)
print("Error:", e)

capture.release()
vid_writer.release()
csv_file.close()
cv2.destroyAllWindows()
5 changes: 5 additions & 0 deletions blink_log.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Date,Time,Status,Duration (sec)
31-07-2025,11:30:29,Drowsy,9.53
31-07-2025,11:30:42,Drowsy,12.06
31-07-2025,11:30:56,Drowsy,7.50
31-07-2025,11:31:00,Drowsy,2.78
2 changes: 1 addition & 1 deletion face-try.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import sys

# Load Haar cascade
cascade_path = '/home/happy/gssoc/driver-drowsiness-detection-system/models/haarcascade_frontalface_default.xml'
cascade_path = 'models/haarcascade_frontalface_default.xml'
face_cascade = cv2.CascadeClassifier(cascade_path)

# Check if cascade loaded successfully
Expand Down
4 changes: 2 additions & 2 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
def run_face_detection():
global face_proc
try:
face_proc = subprocess.Popen(["python", "face-try.py"])
face_proc = subprocess.Popen([r"venv\Scripts\python.exe", "face-try.py"])
except Exception as e:
messagebox.showerror("Error", f"Failed to run face detection:\n{e}")


def run_blink_detection():
try:
subprocess.call(["python", "blinkDetect.py"])
subprocess.call([r"venv\Scripts\python.exe", "blinkDetect.py"])
except Exception as e:
messagebox.showerror("Error", f"Failed to run blink detection:\n{e}")

Expand Down
Binary file added models/shape_predictor_68_face_landmarks.dat
Binary file not shown.
Binary file added output-low-light-2.avi
Binary file not shown.
Binary file added output.avi
Binary file not shown.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ opencv-python>=4.5.0,<5.0.0
# ====================================================================
# FACIAL LANDMARK DETECTION AND SHAPE PREDICTION
# ====================================================================
dlib>=19.22.0,<20.0.0
#dlib>=19.22.0,<20.0.0

# ====================================================================
# NUMERICAL COMPUTING AND MATHEMATICAL OPERATIONS
Expand Down