-
Notifications
You must be signed in to change notification settings - Fork 240
Description
Hello and good day.
I've been working on some Raspberry Pi Zero W night vision goggles for a long time now, and have been trying to record .h264 video and stream to my tft displays at the same time. First of all, here's the code:
import os
import time
import threading
import subprocess
import logging
import datetime
import numpy as np
import cv2
from picamera2 import Picamera2
from picamera2.encoders import H264Encoder
from ina226 import INA226
import RPi.GPIO as GPIO
# --------------------------
# Configuration
# --------------------------
V_FULL = 8.4
V_EMPTY = 6.4
DISP_W, DISP_H = 240, 240 # display & recording size (square)
FRAME_BYTES = DISP_W * DISP_H * 2
VIDEO_DIR = "/home/PythoErgo/Videos"
PHOTO_DIR = "/home/PythoErgo/Pictures"
SPIPUSH_CMD = ["./spipush"] # must be executable & in working dir
BUTTON_PINS = {26: "left", 19: "select", 13: "right"}
PRESS_COOLDOWN = 0.25 # seconds debounce
os.makedirs(VIDEO_DIR, exist_ok=True)
os.makedirs(PHOTO_DIR, exist_ok=True)
# --------------------------
# Helpers
# --------------------------
def voltage_to_percentage(voltage):
voltage = max(min(voltage, V_FULL), V_EMPTY)
return round((voltage - V_EMPTY) / (V_FULL - V_EMPTY) * 100)
def timestamp_str():
return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
# --------------------------
# State & locks
# --------------------------
MODES = ["STREAM", "VIDEO", "PHOTO"]
mode_lock = threading.Lock()
mode_index = 0
def current_mode():
with mode_lock:
return MODES[mode_index]
def cycle_forward():
global mode_index
with mode_lock:
mode_index = (mode_index + 1) % len(MODES)
def cycle_backward():
global mode_index
with mode_lock:
mode_index = (mode_index - 1) % len(MODES)
state_lock = threading.Lock()
is_recording = False # true once recording has started
recording_encoder = None
recording_filename = None
photo_in_progress = False
# Frame buffers
frame_lock = threading.Lock()
latest_frame = None # live frame (BGR, oriented for display)
frozen_frame = None # frozen displayed frame while recording
stop_event = threading.Event()
# Battery
battery_percent = None
# --------------------------
# SPI subprocess (spipush)
# --------------------------
try:
proc = subprocess.Popen(SPIPUSH_CMD, stdin=subprocess.PIPE)
except Exception as e:
raise SystemExit(f"Failed to start spipush: {e}")
# --------------------------
# Camera: single 240x240 stream used for display + recording
# --------------------------
picam2 = Picamera2()
cfg = picam2.create_preview_configuration(main={"size": (DISP_W, DISP_H), "format": "RGB888"})
picam2.configure(cfg)
picam2.start()
# --------------------------
# Capture loop (background)
# --------------------------
def capture_loop():
global latest_frame
while not stop_event.is_set():
try:
arr = picam2.capture_array("main") # BGR by default from Picamera2
# same orientation you used previously: rotate 90° CCW
arr = np.ascontiguousarray(arr.transpose(1, 0, 2)[::-1, :, :])
with frame_lock:
latest_frame = arr
except Exception:
# don't crash the loop; small sleep to avoid busy spin on errors
time.sleep(0.01)
capture_thread = threading.Thread(target=capture_loop, daemon=True)
capture_thread.start()
# --------------------------
# Utility: convert BGR -> RGB565 bytes for SPI
# --------------------------
def rgb888_to_rgb565_bytes(frame_bgr):
bgr565 = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2BGR565)
return bgr565.view(np.uint16).byteswap().tobytes()
# --------------------------
# INA226 battery reader (background)
# --------------------------
try:
ina = INA226(busnum=1, max_expected_amps=25, log_level=logging.INFO)
ina.configure()
ina.set_low_battery(5)
except Exception as e:
ina = None
logging.warning("INA226 init failed: %s", e)
def battery_thread():
global battery_percent
while not stop_event.is_set():
if ina is not None:
try:
ina.wake(3)
time.sleep(0.08)
if ina.is_conversion_ready():
v = ina.voltage()
battery_percent = voltage_to_percentage(v)
except Exception:
battery_percent = None
time.sleep(1.0)
threading.Thread(target=battery_thread, daemon=True).start()
# --------------------------
# Recording control (background threads)
# --------------------------
recording_lock = threading.Lock()
def _do_start_recording(filename):
"""Background thread target to start recording."""
global recording_encoder, is_recording, recording_filename
try:
# create encoder (tune bitrate/framerate if needed)
recording_encoder = H264Encoder(framerate=30, bitrate=1_800_000)
# explicitly record from the main stream
picam2.start_recording(recording_encoder, filename, name="main")
with recording_lock:
is_recording = True
recording_filename = filename
print("Recording started:", filename)
except Exception:
logging.exception("Failed to start recording")
with recording_lock:
is_recording = False
recording_encoder = None
recording_filename = None
def _do_stop_recording():
"""Background thread target to stop recording and always unfreeze display."""
global recording_encoder, is_recording, recording_filename, frozen_frame
try:
# explicit stop for main stream
picam2.stop_recording(name="main")
except Exception:
logging.exception("Failed to stop recording cleanly")
finally:
with recording_lock:
is_recording = False
recording_encoder = None
recording_filename = None
# ensure display unfreezes even if stop_recording had an error
frozen_frame = None
print("Recording stopped")
def start_recording():
global frozen_frame
# freeze currently displayed frame immediately
with frame_lock:
frozen_frame = None if latest_frame is None else latest_frame.copy()
fname = os.path.join(VIDEO_DIR, f"video_{timestamp_str()}.h264")
threading.Thread(target=_do_start_recording, args=(fname,), daemon=True).start()
def stop_recording():
threading.Thread(target=_do_stop_recording, daemon=True).start()
def toggle_recording():
with recording_lock:
if is_recording:
stop_recording()
else:
start_recording()
# --------------------------
# Photo capture
# --------------------------
def take_photo():
global photo_in_progress
with state_lock:
if photo_in_progress:
return
photo_in_progress = True
try:
ts = timestamp_str()
path = os.path.join(PHOTO_DIR, f"photo_{ts}.jpg")
with frame_lock:
frame = None if latest_frame is None else latest_frame.copy()
if frame is not None:
cv2.imwrite(path, frame)
print("Photo saved:", path)
except Exception:
logging.exception("Photo failed")
finally:
with state_lock:
photo_in_progress = False
# --------------------------
# GPIO: buttons
# --------------------------
last_press_time = {pin: 0 for pin in BUTTON_PINS}
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
for pin in BUTTON_PINS:
GPIO.setup(pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
def button_callback(channel):
now = time.time()
if now - last_press_time.get(channel, 0) < PRESS_COOLDOWN:
return
last_press_time[channel] = now
btn = BUTTON_PINS.get(channel)
if btn == "left":
cycle_forward()
elif btn == "right":
cycle_backward()
elif btn == "select":
m = current_mode()
if m == "VIDEO":
toggle_recording()
elif m == "PHOTO":
threading.Thread(target=take_photo, daemon=True).start()
for pin in BUTTON_PINS:
GPIO.add_event_detect(pin, GPIO.FALLING, callback=button_callback, bouncetime=200)
# --------------------------
# Main display loop
# --------------------------
frame_count = 0
fps_timer = time.time()
fps_value = 0.0
try:
while True:
with frame_lock:
# when recording we use frozen_frame (display freeze). Otherwise use live frame.
if frozen_frame is not None:
frame = frozen_frame.copy()
else:
frame = None if latest_frame is None else latest_frame.copy()
if frame is None or frame.ndim != 3:
time.sleep(0.005)
continue
now = time.time()
# FPS counting
frame_count += 1
if now - fps_timer >= 1.0:
fps_value = frame_count / (now - fps_timer)
fps_timer = now
frame_count = 0
# Draw overlays (preserve black rectangles & %)
# FPS (top-left)
text_fps = f"{fps_value:.1f} FPS"
(tw, th), bl = cv2.getTextSize(text_fps, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
cv2.rectangle(frame, (5, 20 - th - bl), (5 + tw, 20 + bl), (0,0,0), thickness=cv2.FILLED)
cv2.putText(frame, text_fps, (5, 20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 1, cv2.LINE_AA)
# Battery (top-right)
if battery_percent is not None:
text_batt = f"BAT: {battery_percent}%"
(twb, thb), blb = cv2.getTextSize(text_batt, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
x_batt = DISP_W - twb - 5
y_batt = 20
cv2.rectangle(frame, (x_batt, y_batt - thb - blb), (x_batt + twb, y_batt + blb), (0,0,0), thickness=cv2.FILLED)
cv2.putText(frame, text_batt, (x_batt, y_batt), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 1, cv2.LINE_AA)
# VRNV-V2 bottom-center
text_vrnv = "VRNV-V2"
(tw2, th2), bl2 = cv2.getTextSize(text_vrnv, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
x2 = (DISP_W - tw2)//2
y2 = DISP_H - 5
cv2.rectangle(frame, (x2, y2 - th2 - bl2), (x2 + tw2, y2 + bl2), (0,0,0), thickness=cv2.FILLED)
cv2.putText(frame, text_vrnv, (x2, y2), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 1, cv2.LINE_AA)
# Crosshair center
cx, cy = DISP_W//2, DISP_H//2
cv2.circle(frame, (cx, cy), 15, (0,255,0), 1, lineType=cv2.LINE_AA)
cv2.line(frame, (cx-10, cy), (cx+10, cy), (0,255,0), 1, lineType=cv2.LINE_AA)
cv2.line(frame, (cx, cy-10), (cx, cy+10), (0,255,0), 1, lineType=cv2.LINE_AA)
# Mode text (VID / PIC) bottom-left / bottom-right (no icons)
m = current_mode()
with state_lock:
rec = is_recording
taking = photo_in_progress
if m == "VIDEO":
# red if recording, else green
color_vid = (0,0,255) if rec else (0,255,0)
text_vid = "VID"
(tvw, tvh), bvl = cv2.getTextSize(text_vid, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 1)
x_vid, y_vid = 5, DISP_H - 5
cv2.rectangle(frame, (x_vid, y_vid - tvh - bvl), (x_vid + tvw, y_vid + bvl), (0,0,0), thickness=cv2.FILLED)
cv2.putText(frame, text_vid, (x_vid, y_vid), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color_vid, 1, cv2.LINE_AA)
elif m == "PHOTO":
color_pic = (0,0,255) if taking else (0,255,0)
text_pic = "PIC"
(tpw, tph), bpl = cv2.getTextSize(text_pic, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 1)
x_pic, y_pic = DISP_W - tpw - 5, DISP_H - 5
cv2.rectangle(frame, (x_pic, y_pic - tph - bpl), (x_pic + tpw, y_pic + bpl), (0,0,0), thickness=cv2.FILLED)
cv2.putText(frame, text_pic, (x_pic, y_pic), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color_pic, 1, cv2.LINE_AA)
# If photo in progress show border
if taking:
cv2.rectangle(frame, (0,0), (DISP_W-1, DISP_H-1), (0,0,255), 2)
# Send to SPI
try:
buf = rgb888_to_rgb565_bytes(frame)
if len(buf) == FRAME_BYTES:
CHUNK = 4096
for i in range(0, len(buf), CHUNK):
proc.stdin.write(buf[i:i+CHUNK])
proc.stdin.flush()
except Exception:
logging.exception("SPI push failed - exiting")
break
# small sleep so loop isn't 100% busy
time.sleep(0.002)
except KeyboardInterrupt:
print("Exiting...")
finally:
stop_event.set()
try:
# ensure recording stopped
with recording_lock:
if is_recording:
_do_stop_recording()
except Exception:
pass
try:
picam2.stop()
except Exception:
pass
try:
proc.stdin.close()
proc.wait(timeout=1.0)
except Exception:
pass
GPIO.cleanup()
Also here's the console messages:
(pyenv) PythoErgo@VRNV-V2:~ $ python Python/VRNV-V2_video_photo2.py
[0:38:18.324133483] [1561] INFO Camera camera_manager.cpp:326 libcamera v0.5.1+100-e53bdf1f
[INFO] Initializing display
[0:38:18.553395417] [1580] WARN RPiSdn sdn.cpp:40 Using legacy SDN tuning - please consider moving SDN inside rpi.denoise
[0:38:18.582553773] [1580] INFO RPI vc4.cpp:440 Registered camera /base/soc/i2c0mux/i2c@1/ov5647@36 to Unicam device /dev/media2 and ISP device /dev/media0
[0:38:18.590820590] [1580] INFO RPI pipeline_base.cpp:1107 Using configuration file '/usr/share/libcamera/pipeline/rpi/vc4/rpi_apps.yaml'
[0:38:18.680720603] [1561] INFO Camera camera.cpp:1205 configuring streams: (0) 240x240-RGB888/sRGB (1) 640x480-SGBRG10_CSI2P/RAW
[0:38:18.684712515] [1580] INFO RPI vc4.cpp:615 Sensor: /base/soc/i2c0mux/i2c@1/ov5647@36 - Selected sensor format: 640x480-SGBRG10_1X10 - Selected unicam format: 640x480-pGAA
2025-09-20 23:09:20,755 - INFO - INA226 calibrate called with: bus max volts: 40V, max shunt volts: 0.08V, max expected amps: 25.000A
2025-09-20 23:09:20,761 - INFO - INA226 max possible current: 40.96A
2025-09-20 23:09:20,769 - INFO - INA226 max expected current: 25.000A
2025-09-20 23:09:20,773 - INFO - INA226 expected current LSB base on max_expected_amps: 7.629e-04 A/bit
2025-09-20 23:09:20,780 - INFO - INA226 current LSB: 7.629e-04 A/bit
2025-09-20 23:09:20,788 - INFO - INA226 power LSB: 1.923e-02 W/bit
2025-09-20 23:09:20,795 - INFO - INA226 max current before overflow: 24.9992A
2025-09-20 23:09:20,802 - INFO - INA226 max shunt voltage before overflow: 49.9985mV
2025-09-20 23:09:20,810 - INFO - INA226 calibration: 0x0d1b (3355)
Photo saved: /home/PythoErgo/Pictures/photo_20250920_230940.jpg
Recording started: /home/PythoErgo/Videos/video_20250920_230947.h264
2025-09-20 23:09:56,037 - ERROR - INA226 Failed to stop recording cleanly
Traceback (most recent call last):
File "/home/PythoErgo/Python/VRNV-V2_video_photo2.py", line 188, in _do_stop_recording
picam2.stop_recording(name="main")
TypeError: Picamera2.stop_recording() got an unexpected keyword argument 'name'
Recording stopped
Recording started: /home/PythoErgo/Videos/video_20250920_231542.h264
2025-09-20 23:16:28,948 - ERROR - INA226 Failed to stop recording cleanly
Traceback (most recent call last):
File "/home/PythoErgo/Python/VRNV-V2_video_photo2.py", line 188, in _do_stop_recording
picam2.stop_recording(name="main")
TypeError: Picamera2.stop_recording() got an unexpected keyword argument 'name'
Recording stopped
Recording started: /home/PythoErgo/Videos/video_20250920_232014.h264
2025-09-20 23:20:57,285 - ERROR - INA226 Failed to stop recording cleanly
Traceback (most recent call last):
File "/home/PythoErgo/Python/VRNV-V2_video_photo2.py", line 188, in _do_stop_recording
picam2.stop_recording(name="main")
TypeError: Picamera2.stop_recording() got an unexpected keyword argument 'name'
Recording stopped
^CExiting...
2025-09-20 23:27:18,550 - INFO - INA226 Camera stopped
2025-09-20 23:27:18,956 - INFO - INA226 Failed to open /dev/dma_heap/vidbuf_cached
2025-09-20 23:27:18,958 - INFO - INA226 Camera closed successfully.
(pyenv) PythoErgo@VRNV-V2:~ $
What's really interesting to me is that while testing with this code, when I recorded the first time, the display stream froze as expected and restarted when I stopped the recording. That's expected. But then I tried recording again and the frame froze as expected, and when I stopped recording the display stream restarted. Here's the thing though, when I watched back the footage, the first video was normal, but the second video never actually stopped recording until I came back inside and CTRL+C'ed the program. That means that it actually unintentionally recorded and streamed to the display at the same time, exactly what I've been trying to do this whole time.
Could anyone explain how this happened and how I can replicate these unintentionally good results? @davidplowman maybe? Haha. (You've helped me with an FPS issue before, thank you).
Here's the first video by the way. If anyone's interested I will posting a YouTube video some time in the future on the goggles.