Skip to content

Commit 9a7b474

Browse files
committed
Initial working code
1 parent e8379b8 commit 9a7b474

File tree

1 file changed

+261
-0
lines changed

1 file changed

+261
-0
lines changed

modules/picamimx500.py

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
import argparse
2+
import sys
3+
from functools import lru_cache
4+
5+
import cv2, json
6+
import numpy as np
7+
8+
from picamera2 import MappedArray, Picamera2
9+
from picamera2.devices import IMX500
10+
from picamera2.devices.imx500 import (NetworkIntrinsics,
11+
postprocess_nanodet_detection)
12+
from libcamera import Transform
13+
14+
from pubsub import pub
15+
16+
17+
class Detection:
18+
def __init__(self, coords, category, conf, metadata):
19+
"""Create a Detection object, recording the bounding box, category and confidence."""
20+
self.category = category
21+
self.conf = conf
22+
self.box = imx500.convert_inference_coords(coords, metadata, picam2)
23+
def display(self):
24+
label = f"{PiCamImx500.get_labels()[int(self.category)]} ({self.conf:.2f}%): {self.box}"
25+
print(label)
26+
print("")
27+
def json_out(self):
28+
return {
29+
'category': PiCamImx500.get_labels()[int(self.category)],
30+
'confidence': self.conf,
31+
'box': self.box
32+
}
33+
34+
35+
36+
class PiCamImx500:
37+
def __init__(self, **kwargs):
38+
self.last_detections = []
39+
# self.translator = kwargs.get('translator', None)
40+
# self.service = kwargs.get('service', 'pyttsx3')
41+
# if self.service == 'elevenlabs':
42+
# self.init_elevenlabs(kwargs.get('voice_id', ''))
43+
# else:
44+
# self.init_pyttsx3()
45+
# # Set subscribers
46+
# pub.subscribe(self.speak, 'tts')
47+
48+
# def speak(self, msg):
49+
# if self.service == 'elevenlabs':
50+
# self.speak_elevenlabs(msg)
51+
# else:
52+
# self.speak_pyttsx3(msg)#
53+
#
54+
55+
def detect(self, captures):
56+
# This must be called before instantiation of Picamera2
57+
imx500 = IMX500(args.model)
58+
intrinsics = imx500.network_intrinsics
59+
if not intrinsics:
60+
intrinsics = NetworkIntrinsics()
61+
intrinsics.task = "object detection"
62+
elif intrinsics.task != "object detection":
63+
print("Network is not an object detection task", file=sys.stderr)
64+
exit()
65+
66+
# Override intrinsics from args
67+
for key, value in vars(args).items():
68+
if key == 'labels' and value is not None:
69+
with open(value, 'r') as f:
70+
intrinsics.labels = f.read().splitlines()
71+
elif hasattr(intrinsics, key) and value is not None:
72+
setattr(intrinsics, key, value)
73+
74+
# Defaults
75+
if intrinsics.labels is None:
76+
with open("assets/coco_labels.txt", "r") as f:
77+
intrinsics.labels = f.read().splitlines()
78+
intrinsics.update_with_defaults()
79+
80+
if args.print_intrinsics:
81+
print(intrinsics)
82+
exit()
83+
84+
picam2 = Picamera2(imx500.camera_num)
85+
config = picam2.create_preview_configuration(controls={"FrameRate": intrinsics.inference_rate}, buffer_count=12, transform=Transform(vflip=True, hflip=True))
86+
87+
imx500.show_network_fw_progress_bar()
88+
picam2.start(config, show_preview=False)
89+
90+
if intrinsics.preserve_aspect_ratio:
91+
imx500.set_auto_aspect_ratio()
92+
93+
last_results = None
94+
mycam = PiCamImx500()
95+
picam2.pre_callback = PiCamImx500.draw_detections
96+
97+
json_array = []
98+
for i in range(captures):
99+
last_results = mycam.parse_detections(picam2.capture_metadata())
100+
for i in last_results:
101+
this_capture = [obj.json_out() for obj in last_results]
102+
json_array.push(this_capture)
103+
return json_array
104+
105+
def parse_detections(self, metadata: dict):
106+
"""Parse the output tensor into a number of detected objects, scaled to the ISP out."""
107+
bbox_normalization = intrinsics.bbox_normalization
108+
threshold = args.threshold
109+
iou = args.iou
110+
max_detections = args.max_detections
111+
112+
np_outputs = imx500.get_outputs(metadata, add_batch=True)
113+
input_w, input_h = imx500.get_input_size()
114+
if np_outputs is None:
115+
return self.last_detections
116+
if intrinsics.postprocess == "nanodet":
117+
boxes, scores, classes = \
118+
postprocess_nanodet_detection(outputs=np_outputs[0], conf=threshold, iou_thres=iou,
119+
max_out_dets=max_detections)[0]
120+
from picamera2.devices.imx500.postprocess import scale_boxes
121+
boxes = scale_boxes(boxes, 1, 1, input_h, input_w, False, False)
122+
else:
123+
boxes, scores, classes = np_outputs[0][0], np_outputs[1][0], np_outputs[2][0]
124+
if bbox_normalization:
125+
boxes = boxes / input_h
126+
127+
boxes = np.array_split(boxes, 4, axis=1)
128+
boxes = zip(*boxes)
129+
130+
self.last_detections = [
131+
Detection(box, category, score, metadata)
132+
for box, score, category in zip(boxes, scores, classes)
133+
if score > threshold
134+
]
135+
return self.last_detections
136+
137+
@staticmethod
138+
@lru_cache
139+
def get_labels():
140+
labels = intrinsics.labels
141+
142+
if intrinsics.ignore_dash_labels:
143+
labels = [label for label in labels if label and label != "-"]
144+
return labels
145+
146+
147+
@staticmethod
148+
def draw_detections(request, stream="main"):
149+
"""Draw the detections for this request onto the ISP output."""
150+
detections = last_results
151+
if detections is None:
152+
return
153+
labels = PiCamImx500.get_labels()
154+
with MappedArray(request, stream) as m:
155+
for detection in detections:
156+
x, y, w, h = detection.box
157+
label = f"{labels[int(detection.category)]} ({detection.conf:.2f})"
158+
159+
# Calculate text size and position
160+
(text_width, text_height), baseline = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
161+
text_x = x + 5
162+
text_y = y + 15
163+
164+
# Create a copy of the array to draw the background with opacity
165+
overlay = m.array.copy()
166+
167+
# Draw the background rectangle on the overlay
168+
cv2.rectangle(overlay,
169+
(text_x, text_y - text_height),
170+
(text_x + text_width, text_y + baseline),
171+
(255, 255, 255), # Background color (white)
172+
cv2.FILLED)
173+
174+
alpha = 0.30
175+
cv2.addWeighted(overlay, alpha, m.array, 1 - alpha, 0, m.array)
176+
177+
# Draw text on top of the background
178+
cv2.putText(m.array, label, (text_x, text_y),
179+
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
180+
181+
# Draw detection box
182+
cv2.rectangle(m.array, (x, y), (x + w, y + h), (0, 255, 0, 0), thickness=2)
183+
184+
if intrinsics.preserve_aspect_ratio:
185+
b_x, b_y, b_w, b_h = imx500.get_roi_scaled(request)
186+
color = (255, 0, 0) # red
187+
cv2.putText(m.array, "ROI", (b_x + 5, b_y + 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
188+
cv2.rectangle(m.array, (b_x, b_y), (b_x + b_w, b_y + b_h), (255, 0, 0, 0))
189+
190+
@staticmethod
191+
def get_args():
192+
parser = argparse.ArgumentParser()
193+
parser.add_argument("--model", type=str, help="Path of the model",
194+
default="/usr/share/imx500-models/imx500_network_ssd_mobilenetv2_fpnlite_320x320_pp.rpk")
195+
parser.add_argument("--fps", type=int, help="Frames per second")
196+
parser.add_argument("--bbox-normalization", action=argparse.BooleanOptionalAction, help="Normalize bbox")
197+
parser.add_argument("--threshold", type=float, default=0.55, help="Detection threshold")
198+
parser.add_argument("--iou", type=float, default=0.65, help="Set iou threshold")
199+
parser.add_argument("--max-detections", type=int, default=10, help="Set max detections")
200+
parser.add_argument("--ignore-dash-labels", action=argparse.BooleanOptionalAction, help="Remove '-' labels ")
201+
parser.add_argument("--postprocess", choices=["", "nanodet"],
202+
default=None, help="Run post process of type")
203+
parser.add_argument("-r", "--preserve-aspect-ratio", action=argparse.BooleanOptionalAction,
204+
help="preserve the pixel aspect ratio of the input tensor")
205+
parser.add_argument("--labels", type=str,
206+
help="Path to the labels file")
207+
parser.add_argument("--print-intrinsics", action="store_true",
208+
help="Print JSON network_intrinsics then exit")
209+
return parser.parse_args()
210+
211+
212+
if __name__ == "__main__":
213+
mycam = PiCamImx500()
214+
args = PiCamImx500.get_args()
215+
216+
# This must be called before instantiation of Picamera2
217+
imx500 = IMX500(args.model)
218+
intrinsics = imx500.network_intrinsics
219+
if not intrinsics:
220+
intrinsics = NetworkIntrinsics()
221+
intrinsics.task = "object detection"
222+
elif intrinsics.task != "object detection":
223+
print("Network is not an object detection task", file=sys.stderr)
224+
exit()
225+
226+
# Override intrinsics from args
227+
for key, value in vars(args).items():
228+
if key == 'labels' and value is not None:
229+
with open(value, 'r') as f:
230+
intrinsics.labels = f.read().splitlines()
231+
elif hasattr(intrinsics, key) and value is not None:
232+
setattr(intrinsics, key, value)
233+
234+
# Defaults
235+
if intrinsics.labels is None:
236+
with open("assets/coco_labels.txt", "r") as f:
237+
intrinsics.labels = f.read().splitlines()
238+
intrinsics.update_with_defaults()
239+
240+
if args.print_intrinsics:
241+
print(intrinsics)
242+
exit()
243+
244+
picam2 = Picamera2(imx500.camera_num)
245+
config = picam2.create_preview_configuration(controls={"FrameRate": intrinsics.inference_rate}, buffer_count=12, transform=Transform(vflip=True, hflip=True))
246+
247+
imx500.show_network_fw_progress_bar()
248+
picam2.start(config, show_preview=False)
249+
250+
if intrinsics.preserve_aspect_ratio:
251+
imx500.set_auto_aspect_ratio()
252+
253+
last_results = None
254+
255+
picam2.pre_callback = PiCamImx500.draw_detections
256+
while True:
257+
258+
last_results = mycam.parse_detections(picam2.capture_metadata())
259+
for i in last_results:
260+
json_array = [obj.json_out() for obj in last_results]
261+
print(json_array)

0 commit comments

Comments
 (0)