Skip to content

Commit 6a60405

Browse files
authored
Merge pull request #89 from makerforgetech/feature/imx500
Feature/imx500
2 parents 6c0db5d + cba3c39 commit 6a60405

File tree

7 files changed

+229
-9
lines changed

7 files changed

+229
-9
lines changed

config/servos.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
servos:
2+
port: /dev/ttyAMA0
23
conf:
34
leg_l_hip: { id: 0, pin: 9, range: [0, 180], start: 40 }
45
leg_l_knee: { id: 1, pin: 10, range: [0, 180], start: 10 }

install.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
# sudo apt-get install libatlas-base-dev
2626

2727
# CREATE VIRTUAL ENVIRONMENT
28-
python3 -m venv myenv
28+
python3 -m venv --system-site-package myenv
2929
source myenv/bin/activate
3030

3131
# INSTALL ALL PYTHON DEPENDENCIES

main.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,11 @@ def main():
7272

7373
# Throw exception to safely exit script when terminated
7474
signal.signal(signal.SIGTERM, Config.exit)
75-
7675
# GPIO
7776
# gpio = pigpio.pi()
7877

7978
# Arduino connection
80-
serial = ArduinoSerial()
79+
serial = ArduinoSerial(port=Config.get('servos','port'))
8180

8281
servos = dict()
8382
servo_conf = Config.get('servos','conf')
@@ -184,7 +183,12 @@ def main():
184183
minute_loop = time()
185184
loop = True
186185
# pub.sendMessage('speak', msg='hi')
187-
# pub.sendMessage('animate', action='celebrate')
186+
# print('taking over servos')
187+
# pub.sendMessage('servo:pan:mvabs', percentage=50)
188+
# print('done')
189+
# print('nodding')
190+
# pub.sendMessage('animate', action='head_nod')
191+
# print('done')
188192

189193
try:
190194
pub.sendMessage('log', msg="[Main] Loop started")

main_viam.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,9 @@ async def main():
159159
# print(f"my-webcam get_image return value: {my_webcam_return_value}")
160160
# roboflow_test = VisionClient.from_robot(robot, "roboflow-test")
161161

162-
162+
# print('taking over servos')
163+
# pub.sendMessage('servo:pan:mvabs', percentage=50)
164+
# print('done')
163165

164166
try:
165167
while loop:

modules/arduinoserial.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@ class ArduinoSerial:
1616
DEVICE_SERVO_RELATIVE = 4
1717
ORDER_RECEIVED = 5
1818
def __init__(self, **kwargs):
19-
self.serial_file = ArduinoSerial.initialise()
19+
self.port = kwargs.get('port', '/dev/ttyAMA0')
20+
self.baudrate = kwargs.get('baudrate', 115200)
21+
self.serial_file = ArduinoSerial.initialise(self.port, self.baudrate)
2022
self.file = None
2123
pub.subscribe(self.send, 'serial')
2224

2325
@staticmethod
24-
def initialise():
26+
def initialise(port, baudrate):
2527
try:
26-
serial_file = open_serial_port(baudrate=115200, timeout=None)
28+
print('Trying to select port ' + port)
29+
serial_file = open_serial_port(serial_port=port, baudrate=baudrate, timeout=None)
2730
except Exception as e:
2831
raise e
2932
is_connected = True # assume connection

modules/picamimx500.py

Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
import argparse
2+
import sys
3+
from functools import lru_cache
4+
5+
import cv2, json
6+
import numpy as np
7+
8+
from picamera2 import MappedArray, Picamera2
9+
from picamera2.devices import IMX500
10+
from picamera2.devices.imx500 import (NetworkIntrinsics,
11+
postprocess_nanodet_detection)
12+
from libcamera import Transform
13+
14+
from pubsub import pub
15+
16+
17+
class Detection:
18+
def __init__(self, imx500, picam2, selfref, coords, category, conf, metadata):
19+
"""Create a Detection object, recording the bounding box, category and confidence."""
20+
self.category = category
21+
self.conf = conf
22+
self.box = imx500.convert_inference_coords(coords, metadata, picam2)
23+
self.piCamImx500 = selfref
24+
def display(self):
25+
label = f"{self.piCamImx500.get_labels()[int(self.category)]} ({self.conf:.2f}%): {self.box}"
26+
print(label)
27+
print("")
28+
def json_out(self):
29+
return {
30+
'category': self.piCamImx500.get_labels()[int(self.category)],
31+
'confidence': self.conf,
32+
'box': self.box
33+
}
34+
35+
36+
37+
class PiCamImx500:
38+
def __init__(self, **kwargs):
39+
self.last_detections = []
40+
self.last_results = []
41+
42+
self.args = PiCamImx500.get_args()
43+
44+
# This must be called before instantiation of Picamera2
45+
self.imx500 = IMX500(self.args.model)
46+
self.intrinsics = self.imx500.network_intrinsics
47+
if not self.intrinsics:
48+
self.intrinsics = NetworkIntrinsics()
49+
self.intrinsics.task = "object detection"
50+
elif self.intrinsics.task != "object detection":
51+
print("Network is not an object detection task", file=sys.stderr)
52+
exit()
53+
54+
# Override self.intrinsics from self.args
55+
for key, value in vars(self.args).items():
56+
if key == 'labels' and value is not None:
57+
with open(value, 'r') as f:
58+
self.intrinsics.labels = f.read().splitlines()
59+
elif hasattr(self.intrinsics, key) and value is not None:
60+
setattr(self.intrinsics, key, value)
61+
62+
# Defaults
63+
if self.intrinsics.labels is None:
64+
with open("assets/coco_labels.txt", "r") as f:
65+
self.intrinsics.labels = f.read().splitlines()
66+
self.intrinsics.update_with_defaults()
67+
68+
# if self.args.print_self.intrinsics:
69+
# print(self.intrinsics)
70+
# exit()
71+
72+
self.picam2 = Picamera2(self.imx500.camera_num)
73+
config = self.picam2.create_preview_configuration(controls={"FrameRate": self.intrinsics.inference_rate}, buffer_count=12, transform=Transform(vflip=False, hflip=False))
74+
75+
self.imx500.show_network_fw_progress_bar()
76+
self.picam2.start(config, show_preview=False)
77+
78+
if self.intrinsics.preserve_aspect_ratio:
79+
self.imx500.set_auto_aspect_ratio()
80+
81+
self.picam2.pre_callback = self.draw_detections
82+
83+
pub.subscribe(self.scan, 'vision:detect')
84+
85+
def scan(self, captures=1):
86+
json_array = []
87+
for i in range(captures):
88+
self.last_results = self.parse_detections(self.picam2.capture_metadata())
89+
for i in self.last_results:
90+
this_capture = [obj.json_out() for obj in self.last_results]
91+
if captures > 1:
92+
json_array = json_array + [this_capture]
93+
else:
94+
json_array = this_capture
95+
96+
pub.sendMessage('vision:detections', data=json_array)
97+
return json_array
98+
99+
def parse_detections(self, metadata: dict):
100+
"""Parse the output tensor into a number of detected objects, scaled to the ISP out."""
101+
bbox_normalization = self.intrinsics.bbox_normalization
102+
threshold = self.args.threshold
103+
iou = self.args.iou
104+
max_detections = self.args.max_detections
105+
106+
np_outputs = self.imx500.get_outputs(metadata, add_batch=True)
107+
input_w, input_h = self.imx500.get_input_size()
108+
if np_outputs is None:
109+
return self.last_detections
110+
if self.intrinsics.postprocess == "nanodet":
111+
boxes, scores, classes = \
112+
postprocess_nanodet_detection(outputs=np_outputs[0], conf=threshold, iou_thres=iou,
113+
max_out_dets=max_detections)[0]
114+
from picamera2.devices.imx500.postprocess import scale_boxes
115+
boxes = scale_boxes(boxes, 1, 1, input_h, input_w, False, False)
116+
else:
117+
boxes, scores, classes = np_outputs[0][0], np_outputs[1][0], np_outputs[2][0]
118+
if bbox_normalization:
119+
boxes = boxes / input_h
120+
121+
boxes = np.array_split(boxes, 4, axis=1)
122+
boxes = zip(*boxes)
123+
124+
self.last_detections = [
125+
Detection(self.imx500, self.picam2, self, box, category, score, metadata)
126+
for box, score, category in zip(boxes, scores, classes)
127+
if score > threshold
128+
]
129+
return self.last_detections
130+
131+
@lru_cache
132+
def get_labels(self):
133+
labels = self.intrinsics.labels
134+
135+
if self.intrinsics.ignore_dash_labels:
136+
labels = [label for label in labels if label and label != "-"]
137+
return labels
138+
139+
def draw_detections(self, request, stream="main"):
140+
"""Draw the detections for this request onto the ISP output."""
141+
detections = self.last_results
142+
if detections is None:
143+
return
144+
labels = self.get_labels()
145+
with MappedArray(request, stream) as m:
146+
for detection in detections:
147+
x, y, w, h = detection.box
148+
label = f"{labels[int(detection.category)]} ({detection.conf:.2f})"
149+
150+
# Calculate text size and position
151+
(text_width, text_height), baseline = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
152+
text_x = x + 5
153+
text_y = y + 15
154+
155+
# Create a copy of the array to draw the background with opacity
156+
overlay = m.array.copy()
157+
158+
# Draw the background rectangle on the overlay
159+
cv2.rectangle(overlay,
160+
(text_x, text_y - text_height),
161+
(text_x + text_width, text_y + baseline),
162+
(255, 255, 255), # Background color (white)
163+
cv2.FILLED)
164+
165+
alpha = 0.30
166+
cv2.addWeighted(overlay, alpha, m.array, 1 - alpha, 0, m.array)
167+
168+
# Draw text on top of the background
169+
cv2.putText(m.array, label, (text_x, text_y),
170+
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)
171+
172+
# Draw detection box
173+
cv2.rectangle(m.array, (x, y), (x + w, y + h), (0, 255, 0, 0), thickness=2)
174+
175+
if self.intrinsics.preserve_aspect_ratio:
176+
b_x, b_y, b_w, b_h = self.imx500.get_roi_scaled(request)
177+
color = (255, 0, 0) # red
178+
cv2.putText(m.array, "ROI", (b_x + 5, b_y + 15), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
179+
cv2.rectangle(m.array, (b_x, b_y), (b_x + b_w, b_y + b_h), (255, 0, 0, 0))
180+
181+
@staticmethod
182+
def get_args():
183+
parser = argparse.ArgumentParser()
184+
parser.add_argument("--model", type=str, help="Path of the model",
185+
default="/usr/share/imx500-models/imx500_network_ssd_mobilenetv2_fpnlite_320x320_pp.rpk")
186+
parser.add_argument("--fps", type=int, help="Frames per second")
187+
parser.add_argument("--bbox-normalization", action=argparse.BooleanOptionalAction, help="Normalize bbox")
188+
parser.add_argument("--threshold", type=float, default=0.55, help="Detection threshold")
189+
parser.add_argument("--iou", type=float, default=0.65, help="Set iou threshold")
190+
parser.add_argument("--max-detections", type=int, default=10, help="Set max detections")
191+
parser.add_argument("--ignore-dash-labels", action=argparse.BooleanOptionalAction, help="Remove '-' labels ")
192+
parser.add_argument("--postprocess", choices=["", "nanodet"],
193+
default=None, help="Run post process of type")
194+
parser.add_argument("-r", "--preserve-aspect-ratio", action=argparse.BooleanOptionalAction,
195+
help="preserve the pixel aspect ratio of the input tensor")
196+
parser.add_argument("--labels", type=str,
197+
help="Path to the labels file")
198+
parser.add_argument("--print-self.intrinsics", action="store_true",
199+
help="Print JSON network_intrinsics then exit")
200+
return parser.parse_args()
201+
202+
203+
if __name__ == "__main__":
204+
mycam = PiCamImx500()
205+
206+
# while True:
207+
print(mycam.scan(1))

modules/robust_serial/utils.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ def get_serial_ports():
5656
try:
5757
s = serial.Serial(port)
5858
s.close()
59+
print('[Serial] Found ' + port)
5960
results.append(port)
6061
except (OSError, serial.SerialException):
6162
pass
@@ -73,9 +74,11 @@ def open_serial_port(serial_port=None, baudrate=115200, timeout=0, write_timeout
7374
:param write_timeout: (int)
7475
:return: (Serial Object)
7576
"""
77+
ports = get_serial_ports()
78+
print('Serial port sent: ' + serial_port)
7679
# Open serial port (for communication with Arduino)
7780
if serial_port is None:
78-
serial_port = get_serial_ports()[0]
81+
serial_port = ports[0]
7982
# timeout=0 non-blocking mode, return immediately in any case, returning zero or more,
8083
# up to the requested number of bytes
8184
return serial.Serial(port=serial_port, baudrate=baudrate, timeout=timeout, writeTimeout=write_timeout)

0 commit comments

Comments
 (0)