diff --git a/blinkDetect.py b/blinkDetect.py index 23b6e53..56b1b50 100644 --- a/blinkDetect.py +++ b/blinkDetect.py @@ -1,8 +1,6 @@ # -*- coding: utf-8 -*- """ -Created on Tue Oct 29 19:51:37 2019 - -@author: Lenovo +Driver Drowsiness Detection – CSV logging with drowsy duration """ import dlib @@ -14,16 +12,23 @@ from threading import Thread import playsound import queue +import csv +from datetime import datetime + +# 📂 CSV File create/open +csv_file = open("blink_log.csv", mode="w", newline="") +csv_writer = csv.writer(csv_file) +csv_writer.writerow(["Date", "Time", "Status", "Duration (sec)"]) +# 👁‍🗨 Eye state tracking variables +eye_closed_start_time = None +is_eye_closed = False +# 🔧 Model & settings FACE_DOWNSAMPLE_RATIO = 1.5 RESIZE_HEIGHT = 460 +thresh = 0.27 # EAR threshold (eyes closed if below this) -thresh = 0.27 - -# IMPORTANT: You must download the shape_predictor_68_face_landmarks.dat file from -# https://dlib.net/files/shape_predictor_68_face_landmarks.dat.bz2 -# and place it in the 'models' folder modelPath = "models/shape_predictor_68_face_landmarks.dat" sound_path = "alarm.wav" @@ -36,15 +41,18 @@ blinkCount = 0 drowsy = 0 state = 0 -blinkTime = 0.15 #150ms -drowsyTime = 1.5 #1200ms +blinkTime = 0.15 # 150ms +drowsyTime = 1.5 # 1500ms (1.5 sec) ALARM_ON = False GAMMA = 1.5 threadStatusQ = queue.Queue() -invGamma = 1.0/GAMMA +# Gamma correction table +invGamma = 1.0 / GAMMA table = np.array([((i / 255.0) ** invGamma) * 255 for i in range(0, 256)]).astype("uint8") + +# 📌 Utility Functions def gamma_correction(image): return cv2.LUT(image, table) @@ -71,83 +79,39 @@ def eye_aspect_ratio(eye): B = dist.euclidean(eye[2], eye[4]) C = dist.euclidean(eye[0], eye[3]) ear = (A + B) / (2.0 * C) - return ear - def checkEyeStatus(landmarks): - mask = np.zeros(frame.shape[:2], dtype = np.float32) + mask = np.zeros(frame.shape[:2], dtype=np.float32) - hullLeftEye = [] - for i in range(0, len(leftEyeIndex)): - hullLeftEye.append((landmarks[leftEyeIndex[i]][0], landmarks[leftEyeIndex[i]][1])) + hullLeftEye = [(landmarks[i][0], landmarks[i][1]) for i in leftEyeIndex] + hullRightEye = [(landmarks[i][0], landmarks[i][1]) for i in rightEyeIndex] cv2.fillConvexPoly(mask, np.int32(hullLeftEye), 255) - - hullRightEye = [] - for i in range(0, len(rightEyeIndex)): - hullRightEye.append((landmarks[rightEyeIndex[i]][0], landmarks[rightEyeIndex[i]][1])) - - cv2.fillConvexPoly(mask, np.int32(hullRightEye), 255) leftEAR = eye_aspect_ratio(hullLeftEye) rightEAR = eye_aspect_ratio(hullRightEye) ear = (leftEAR + rightEAR) / 2.0 - - eyeStatus = 1 # 1 = Open, 0 = closed - if (ear < thresh): - eyeStatus = 0 - - return eyeStatus - -def checkBlinkStatus(eyeStatus): - global state, blinkCount, drowsy - if(state >= 0 and state <= falseBlinkLimit): - if(eyeStatus): - state = 0 - - else: - state += 1 - - elif(state >= falseBlinkLimit and state < drowsyLimit): - if(eyeStatus): - blinkCount += 1 - state = 0 - - else: - state += 1 - - - else: - if(eyeStatus): - state = 0 - drowsy = 3 - blinkCount += 1 - - else: - drowsy = 3 + return 1 if ear >= thresh else 0 # 1=open, 0=closed def getLandmarks(im): - imSmall = cv2.resize(im, None, - fx = 1.0/FACE_DOWNSAMPLE_RATIO, - fy = 1.0/FACE_DOWNSAMPLE_RATIO, - interpolation = cv2.INTER_LINEAR) - + imSmall = cv2.resize(im, None, fx=1.0/FACE_DOWNSAMPLE_RATIO, fy=1.0/FACE_DOWNSAMPLE_RATIO, interpolation=cv2.INTER_LINEAR) rects = detector(imSmall, 0) if len(rects) == 0: return 0 newRect = dlib.rectangle(int(rects[0].left() * FACE_DOWNSAMPLE_RATIO), - int(rects[0].top() * FACE_DOWNSAMPLE_RATIO), - int(rects[0].right() * FACE_DOWNSAMPLE_RATIO), - int(rects[0].bottom() * FACE_DOWNSAMPLE_RATIO)) + int(rects[0].top() * FACE_DOWNSAMPLE_RATIO), + int(rects[0].right() * FACE_DOWNSAMPLE_RATIO), + int(rects[0].bottom() * FACE_DOWNSAMPLE_RATIO)) - points = [] - [points.append((p.x, p.y)) for p in predictor(im, newRect).parts()] + points = [(p.x, p.y) for p in predictor(im, newRect).parts()] return points + +# 🎥 Camera setup capture = cv2.VideoCapture(0) for i in range(10): @@ -160,8 +124,8 @@ def getLandmarks(im): validFrames = 0 dummyFrames = 100 -print("Caliberation in Progress!") -while(validFrames < dummyFrames): +print("Calibrating camera...") +while validFrames < dummyFrames: validFrames += 1 t = time.time() ret, frame = capture.read() @@ -170,105 +134,85 @@ def getLandmarks(im): break height, width = frame.shape[:2] - IMAGE_RESIZE = np.float32(height)/RESIZE_HEIGHT - frame = cv2.resize(frame, None, - fx = 1/IMAGE_RESIZE, - fy = 1/IMAGE_RESIZE, - interpolation = cv2.INTER_LINEAR) + IMAGE_RESIZE = np.float32(height) / RESIZE_HEIGHT + frame = cv2.resize(frame, None, fx=1/IMAGE_RESIZE, fy=1/IMAGE_RESIZE, interpolation=cv2.INTER_LINEAR) - #adjusted = gamma_correction(frame) adjusted = histogram_equalization(frame) - landmarks = getLandmarks(adjusted) timeLandmarks = time.time() - t if landmarks == 0: validFrames -= 1 - cv2.putText(frame, "Unable to detect face, Please check proper lighting", (10, 30), cv2.FONT_HERSHEY_COMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA) - cv2.putText(frame, "or decrease FACE_DOWNSAMPLE_RATIO", (10, 50), cv2.FONT_HERSHEY_COMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA) - cv2.imshow("Blink Detection Demo", frame) - if cv2.waitKey(1) & 0xFF == 27: - break - + continue else: totalTime += timeLandmarks -print("Caliberation Complete!") -spf = totalTime/dummyFrames -print("Current SPF (seconds per frame) is {:.2f} ms".format(spf * 1000)) +print("Calibration Complete!") -drowsyLimit = drowsyTime/spf -falseBlinkLimit = blinkTime/spf -print("drowsy limit: {}, false blink limit: {}".format(drowsyLimit, falseBlinkLimit)) +spf = totalTime / dummyFrames +print("SPF: {:.2f} ms".format(spf * 1000)) +# 🏁 Main Loop if __name__ == "__main__": - vid_writer = cv2.VideoWriter('output-low-light-2.avi',cv2.VideoWriter_fourcc('M','J','P','G'), 15, (frame.shape[1],frame.shape[0])) - while(1): + vid_writer = cv2.VideoWriter('output.avi', cv2.VideoWriter_fourcc('M','J','P','G'), 15, (frame.shape[1], frame.shape[0])) + + while True: try: - t = time.time() ret, frame = capture.read() + if not ret: + break + height, width = frame.shape[:2] - IMAGE_RESIZE = np.float32(height)/RESIZE_HEIGHT - frame = cv2.resize(frame, None, - fx = 1/IMAGE_RESIZE, - fy = 1/IMAGE_RESIZE, - interpolation = cv2.INTER_LINEAR) + IMAGE_RESIZE = np.float32(height) / RESIZE_HEIGHT + frame = cv2.resize(frame, None, fx=1/IMAGE_RESIZE, fy=1/IMAGE_RESIZE, interpolation=cv2.INTER_LINEAR) - # adjusted = gamma_correction(frame) adjusted = histogram_equalization(frame) - landmarks = getLandmarks(adjusted) + if landmarks == 0: - validFrames -= 1 - cv2.putText(frame, "Unable to detect face, Please check proper lighting", (10, 30), cv2.FONT_HERSHEY_COMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA) - cv2.putText(frame, "or decrease FACE_DOWNSAMPLE_RATIO", (10, 50), cv2.FONT_HERSHEY_COMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA) - cv2.imshow("Blink Detection Demo", frame) + cv2.putText(frame, "Unable to detect face", (10, 30), cv2.FONT_HERSHEY_COMPLEX, 0.5, (0, 0, 255), 1) + cv2.imshow("Drowsiness Detection", frame) if cv2.waitKey(1) & 0xFF == 27: break continue - eyeStatus = checkEyeStatus(landmarks) - checkBlinkStatus(eyeStatus) - - for i in range(0, len(leftEyeIndex)): - cv2.circle(frame, (landmarks[leftEyeIndex[i]][0], landmarks[leftEyeIndex[i]][1]), 1, (0, 0, 255), -1, lineType=cv2.LINE_AA) - - for i in range(0, len(rightEyeIndex)): - cv2.circle(frame, (landmarks[rightEyeIndex[i]][0], landmarks[rightEyeIndex[i]][1]), 1, (0, 0, 255), -1, lineType=cv2.LINE_AA) - - if drowsy: - cv2.putText(frame, "! ! ! DROWSINESS ALERT ! ! !", (70, 50), cv2.FONT_HERSHEY_COMPLEX, 1, (0, 0, 255), 2, cv2.LINE_AA) - if not ALARM_ON: - ALARM_ON = True - threadStatusQ.put(not ALARM_ON) - thread = Thread(target=soundAlert, args=(sound_path, threadStatusQ,)) - thread.setDaemon(True) - thread.start() + # EAR calculation + leftEAR = eye_aspect_ratio([landmarks[i] for i in leftEyeIndex]) + rightEAR = eye_aspect_ratio([landmarks[i] for i in rightEyeIndex]) + ear = (leftEAR + rightEAR) / 2.0 + if ear < thresh: + if not is_eye_closed: + eye_closed_start_time = time.time() + is_eye_closed = True else: - cv2.putText(frame, "Blinks : {}".format(blinkCount), (460, 80), cv2.FONT_HERSHEY_COMPLEX, 0.8, (0,0,255), 2, cv2.LINE_AA) - # (0, 400) - ALARM_ON = False - - - cv2.imshow("Blink Detection", frame) + if is_eye_closed: + eye_closed_duration = time.time() - eye_closed_start_time + is_eye_closed = False + + if eye_closed_duration > 2.5: + current_time = datetime.now() + date_str = current_time.strftime("%d-%m-%Y") + time_str = current_time.strftime("%H:%M:%S") + csv_writer.writerow([date_str, time_str, "Drowsy", f"{eye_closed_duration:.2f}"]) + print(f"[LOG] Drowsy for {eye_closed_duration:.2f} sec") + + for i in leftEyeIndex: + cv2.circle(frame, landmarks[i], 1, (0, 0, 255), -1) + for i in rightEyeIndex: + cv2.circle(frame, landmarks[i], 1, (0, 0, 255), -1) + + cv2.imshow("Drowsiness Detection", frame) vid_writer.write(frame) - k = cv2.waitKey(1) - if k == ord('r'): - state = 0 - drowsy = 0 - ALARM_ON = False - threadStatusQ.put(not ALARM_ON) - - elif k == ord('q'): + k = cv2.waitKey(1) + if k == ord('q'): break - # print("Time taken", time.time() - t) - except Exception as e: - print(e) + print("Error:", e) capture.release() vid_writer.release() + csv_file.close() cv2.destroyAllWindows() diff --git a/blink_log.csv b/blink_log.csv new file mode 100644 index 0000000..3f56e4a --- /dev/null +++ b/blink_log.csv @@ -0,0 +1,5 @@ +Date,Time,Status,Duration (sec) +31-07-2025,11:30:29,Drowsy,9.53 +31-07-2025,11:30:42,Drowsy,12.06 +31-07-2025,11:30:56,Drowsy,7.50 +31-07-2025,11:31:00,Drowsy,2.78 diff --git a/face-try.py b/face-try.py index 92d9580..f5e4282 100644 --- a/face-try.py +++ b/face-try.py @@ -8,7 +8,7 @@ import sys # Load Haar cascade -cascade_path = '/home/happy/gssoc/driver-drowsiness-detection-system/models/haarcascade_frontalface_default.xml' +cascade_path = 'models/haarcascade_frontalface_default.xml' face_cascade = cv2.CascadeClassifier(cascade_path) # Check if cascade loaded successfully diff --git a/main.py b/main.py index f7d3d16..f5e409f 100644 --- a/main.py +++ b/main.py @@ -7,14 +7,14 @@ def run_face_detection(): global face_proc try: - face_proc = subprocess.Popen(["python", "face-try.py"]) + face_proc = subprocess.Popen([r"venv\Scripts\python.exe", "face-try.py"]) except Exception as e: messagebox.showerror("Error", f"Failed to run face detection:\n{e}") def run_blink_detection(): try: - subprocess.call(["python", "blinkDetect.py"]) + subprocess.call([r"venv\Scripts\python.exe", "blinkDetect.py"]) except Exception as e: messagebox.showerror("Error", f"Failed to run blink detection:\n{e}") diff --git a/models/shape_predictor_68_face_landmarks.dat b/models/shape_predictor_68_face_landmarks.dat new file mode 100644 index 0000000..e0ec20d Binary files /dev/null and b/models/shape_predictor_68_face_landmarks.dat differ diff --git a/output-low-light-2.avi b/output-low-light-2.avi new file mode 100644 index 0000000..8531541 Binary files /dev/null and b/output-low-light-2.avi differ diff --git a/output.avi b/output.avi new file mode 100644 index 0000000..722709c Binary files /dev/null and b/output.avi differ diff --git a/requirements.txt b/requirements.txt index a812ee8..f54fb68 100644 --- a/requirements.txt +++ b/requirements.txt @@ -28,7 +28,7 @@ opencv-python>=4.5.0,<5.0.0 # ==================================================================== # FACIAL LANDMARK DETECTION AND SHAPE PREDICTION # ==================================================================== -dlib>=19.22.0,<20.0.0 +#dlib>=19.22.0,<20.0.0 # ==================================================================== # NUMERICAL COMPUTING AND MATHEMATICAL OPERATIONS