1
- from flask import Flask , render_template , Response , request , jsonify
1
+ from fastapi import FastAPI , WebSocket , Request
2
+ from fastapi .responses import HTMLResponse , StreamingResponse
3
+ from fastapi .templating import Jinja2Templates
2
4
import cv2
3
5
import pytesseract
4
- import time
5
6
import threading
6
7
import serial
8
+ import asyncio
7
9
8
- app = Flask (__name__ )
10
+ app = FastAPI ()
11
+ templates = Jinja2Templates (directory = "templates" )
9
12
10
13
# All Cameras
11
- outside_garage_cam = cv2 .VideoCapture (0 )
12
- inside_garage_cam = cv2 .VideoCapture (1 )
13
- entrance_cam = cv2 .VideoCapture (2 )
14
-
14
+ cameras = [cv2 .VideoCapture (i ) for i in range (3 )]
15
15
ser = serial .Serial ('/dev/ttyUSB0' , 115200 ) # Arduino's serial port
16
16
result_message = ""
17
17
24
24
# Pytesseract configuration
25
25
pytesseract .pytesseract .tesseract_cmd = '/usr/bin/tesseract' # Path to Tesseract OCR executable
26
26
27
- def gen_frames (cam ):
27
+ def gen_frames (cam_index ):
28
+ cam = cameras [cam_index ]
28
29
while True :
30
+ if not cam .isOpened ():
31
+ cam .open (cam_index )
29
32
success , frame = cam .read ()
30
33
if not success :
31
34
break
@@ -35,9 +38,9 @@ def gen_frames(cam):
35
38
yield (b'--frame\r \n '
36
39
b'Content-Type: image/jpeg\r \n \r \n ' + frame + b'\r \n ' )
37
40
38
- def garage_control ():
41
+ async def garage_control ():
39
42
while True :
40
- ret , frame = outside_garage_cam .read ()
43
+ ret , frame = cameras [ 0 ] .read ()
41
44
if ret :
42
45
gray = cv2 .cvtColor (frame , cv2 .COLOR_BGR2GRAY )
43
46
# Perform OCR using Pytesseract
@@ -46,68 +49,59 @@ def garage_control():
46
49
# Check if the target plate number is recognized
47
50
if "your_target_plate" in text :
48
51
ser .write (b'OPEN_GARAGE\n ' )
49
- print ("Opening garage for plate 'your_target_plate'" )
50
- time .sleep (1 )
51
-
52
- def audio_detection ():
53
- while True :
54
- if detect_doorbell ():
55
- print ("ringtone detected!" )
56
- # Start camera streaming at home entrance
57
- # Notify and view the camera as needed
58
- time .sleep (1 )
52
+ print ("Garage opens for plate 'your_target_plate'" )
53
+ await asyncio .sleep (1 )
59
54
60
- def detect_doorbell ():
61
- return False
62
-
63
- def read_from_arduino ():
55
+ async def read_from_arduino ():
64
56
global result_message
65
57
while True :
66
58
if ser .in_waiting > 0 :
67
59
line = ser .readline ().decode ('utf-8' ).strip ()
68
60
if line .startswith ("RESULT:" ):
69
61
result_message = line .split ("RESULT:" )[1 ]
62
+ await asyncio .sleep (0.1 )
63
+
64
+ @app .get ("/" , response_class = HTMLResponse )
65
+ async def index (request : Request ):
66
+ return templates .TemplateResponse ("index.html" , {"request" : request })
67
+
68
+ @app .get ("/video_feed/{cam_id}" )
69
+ async def video_feed (cam_id : int ):
70
+ if 0 <= cam_id < len (cameras ):
71
+ return StreamingResponse (gen_frames (cam_id ), media_type = "multipart/x-mixed-replace; boundary=frame" )
72
+ return "Invalid Camera ID" , 404
70
73
71
- @app .route ('/video_feed/<int:cam_id>' )
72
- def video_feed (cam_id ):
73
- if cam_id == 0 :
74
- return Response (gen_frames (outside_garage_cam ), mimetype = 'multipart/x-mixed-replace; boundary=frame' )
75
- elif cam_id == 1 :
76
- return Response (gen_frames (inside_garage_cam ), mimetype = 'multipart/x-mixed-replace; boundary=frame' )
77
- elif cam_id == 2 :
78
- return Response (gen_frames (entrance_cam ), mimetype = 'multipart/x-mixed-replace; boundary=frame' )
79
- else :
80
- return "Invalid Camera ID" , 404
81
-
82
- @app .route ('/open_door' , methods = ['POST' ])
83
- def open_door ():
74
+ @app .post ("/open_door" )
75
+ async def open_door ():
84
76
ser .write (b'OPEN_DOOR\n ' )
85
- return "Door Opened" , 200
86
-
87
- @app .route ('/validate_password' , methods = ['POST' ])
88
- def validate_password ():
89
- data = request .json
90
- user = data .get ('user' )
91
- password = data .get ('password' )
92
-
77
+ return {"message" : "Door Opened" }
78
+
79
+ @app .post ("/validate_password" )
80
+ async def validate_password (user : str , password : str ):
93
81
if user in user_passwords and user_passwords [user ] == password :
94
82
ser .write (b'VALID_PASSWORD\n ' )
95
- return "Password Validated" , 200
96
- else :
97
- return "Invalid Password" , 401
83
+ return {"message" : "Password Validated" }
84
+ return {"message" : "Invalid Password" }, 401
98
85
99
- @app .route ( ' /result' )
100
- def result ():
86
+ @app .get ( " /result" )
87
+ async def result ():
101
88
global result_message
102
- return jsonify (result = result_message )
89
+ return {"result" : result_message }
90
+
91
+ @app .websocket ("/ws" )
92
+ async def websocket_endpoint (websocket : WebSocket ):
93
+ await websocket .accept ()
94
+ while True :
95
+ data = await websocket .receive_text ()
96
+ if data == "result" :
97
+ await websocket .send_text (result_message )
98
+
99
+ if __name__ == "__main__" :
100
+ threading .Thread (target = asyncio .run , args = (garage_control (),)).start ()
101
+ threading .Thread (target = asyncio .run , args = (read_from_arduino (),)).start ()
102
+ import uvicorn
103
+ uvicorn .run (app , host = "0.0.0.0" , port = 8000 )
104
+
103
105
104
- @app .route ('/' )
105
- def index ():
106
- return render_template ('index.html' )
107
106
108
- if __name__ == '__main__' :
109
- threading .Thread (target = garage_control ).start ()
110
- threading .Thread (target = audio_detection ).start ()
111
- threading .Thread (target = read_from_arduino ).start ()
112
- app .run (host = '0.0.0.0' , port = 5000 , debug = True )
113
107
0 commit comments