Skip to content

Commit 764bee8

Browse files
authored
feat(abr-testing): Automate GoPro Recordings (#19074)
<!-- Thanks for taking the time to open a Pull Request (PR)! Please make sure you've read the "Opening Pull Requests" section of our Contributing Guide: https://github.com/Opentrons/opentrons/blob/edge/CONTRIBUTING.md#opening-pull-requests GitHub provides robust markdown to format your PR. Links, diagrams, pictures, and videos along with text formatting make it possible to create a rich and informative PR. For more information on GitHub markdown, see: https://docs.github.com/en/get-started/writing-on-github/getting-started-with-writing-and-formatting-on-github/basic-writing-and-formatting-syntax To ensure your code is reviewed quickly and thoroughly, please fill out the sections below to the best of your ability! --> # Overview Automate Deletion of GoPro Files and Recording Start ## Test Plan and Hands on Testing - Tested on macOS and Windows ## Changelog - Added a script that automatically turns on go pros, deletes files, and starts recording ## Review requests <!-- - What do you need from reviewers to feel confident this PR is ready to merge? - Ask questions. --> ## Risk assessment - Go pro wifi only connects if computer is in ABR room in close proximity to cameras
1 parent 2a61803 commit 764bee8

File tree

4 files changed

+269
-25
lines changed

4 files changed

+269
-25
lines changed
Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
"""Classes for controlling GoPros."""
2+
import requests
3+
from typing import Dict, Union
4+
from urllib.parse import urlparse
5+
import subprocess
6+
import time
7+
import argparse
8+
import os
9+
import json
10+
import sys
11+
import platform
12+
13+
14+
def connect_to_wifi(network_name: str, password: str) -> bool:
15+
"""Connect to a Wi-Fi network on macOS or Windows."""
16+
system = platform.system()
17+
18+
if system == "Darwin": # macOS
19+
airport_cmd = (
20+
"/System/Library/PrivateFrameworks/Apple80211.framework"
21+
"/Versions/Current/Resources/airport"
22+
)
23+
print("🔍 [macOS] Scanning for available Wi-Fi networks...\n")
24+
try:
25+
subprocess.run([airport_cmd, "-s"], check=True)
26+
except FileNotFoundError:
27+
print("❌ 'airport' tool not found.")
28+
return False
29+
except subprocess.CalledProcessError as e:
30+
print(f"❌ Error scanning networks: {e.stderr if e.stderr else str(e)}")
31+
return False
32+
33+
try:
34+
subprocess.run(
35+
["networksetup", "-setairportnetwork", "en0", network_name, password],
36+
check=True,
37+
capture_output=True,
38+
text=True,
39+
)
40+
41+
# Wait and verify connection
42+
time.sleep(5) # give it a few seconds to connect
43+
result = subprocess.run(
44+
["netsh", "wlan", "show", "interfaces"],
45+
capture_output=True,
46+
text=True,
47+
)
48+
49+
if network_name in result.stdout:
50+
print(f"✅ Connected to '{network_name}' on macOS")
51+
return True
52+
else:
53+
print(f"❌ Attempted to connect, but not connected to '{network_name}'.")
54+
return False
55+
except subprocess.CalledProcessError as e:
56+
print(
57+
f"""❌ Failed to connect to '{network_name}':
58+
{e.stderr.strip() if e.stderr else str(e)}"""
59+
)
60+
return False
61+
elif system == "Windows":
62+
print(
63+
f"🔍 [Windows] Attempting to connect to Wi-Fi network '{network_name}'...\n"
64+
)
65+
try:
66+
# Create Wi-Fi profile XML
67+
profile_xml = f"""<?xml version="1.0"?>
68+
<WLANProfile xmlns="http://www.microsoft.com/networking/WLAN/profile/v1">
69+
<name>{network_name}</name>
70+
<SSIDConfig>
71+
<SSID>
72+
<name>{network_name}</name>
73+
</SSID>
74+
</SSIDConfig>
75+
<connectionType>ESS</connectionType>
76+
<connectionMode>manual</connectionMode>
77+
<MSM>
78+
<security>
79+
<authEncryption>
80+
<authentication>WPA2PSK</authentication>
81+
<encryption>AES</encryption>
82+
<useOneX>false</useOneX>
83+
</authEncryption>
84+
<sharedKey>
85+
<keyType>passPhrase</keyType>
86+
<protected>false</protected>
87+
<keyMaterial>{password}</keyMaterial>
88+
</sharedKey>
89+
</security>
90+
</MSM>
91+
</WLANProfile>"""
92+
93+
profile_path = os.path.join(
94+
os.getenv("TEMP", "/tmp"), f"{network_name}.xml"
95+
)
96+
97+
with open(profile_path, "w") as f:
98+
f.write(profile_xml)
99+
100+
subprocess.run(
101+
["netsh", "wlan", "add", "profile", f"filename={profile_path}"],
102+
check=True,
103+
capture_output=True,
104+
text=True,
105+
)
106+
subprocess.run(
107+
["netsh", "wlan", "connect", f"name={network_name}"],
108+
check=True,
109+
capture_output=True,
110+
text=True,
111+
)
112+
# Wait and verify connection
113+
time.sleep(5) # give it a few seconds to connect
114+
result = subprocess.run(
115+
["netsh", "wlan", "show", "interfaces"],
116+
capture_output=True,
117+
text=True,
118+
)
119+
120+
if network_name in result.stdout:
121+
print(f"✅ Connected to '{network_name}' on Windows")
122+
return True
123+
else:
124+
print(f"❌ Attempted to connect, but not connected to '{network_name}'.")
125+
return False
126+
except subprocess.CalledProcessError as e:
127+
print(
128+
f"""❌ Failed to connect to '{network_name}':
129+
{e.stderr.strip() if e.stderr else str(e)}"""
130+
)
131+
return False
132+
except Exception as e:
133+
print(f"❌ Unexpected error: {e}")
134+
return False
135+
else:
136+
print(f"❌ Unsupported operating system: {system}")
137+
return False
138+
139+
140+
class GoProCamera:
141+
"""Commands for GoPro Control."""
142+
143+
def __init__(self, ip_address: str) -> None:
144+
"""Connect to GoPro."""
145+
parsed = urlparse(f"http://{ip_address}")
146+
self.ip = parsed.hostname # Strips port if given
147+
self.control_url = f"http://{self.ip}:8080/gp/gpControl"
148+
self.status_url = f"{self.control_url}:8080/status"
149+
self.media_url = f"http://{self.ip}:8080/gopro/media/list"
150+
151+
def start_recording(self) -> Dict[str, Union[str, bool]]:
152+
"""Start Recording."""
153+
print("📷 Started Recording")
154+
return self._send_command("command/shutter", {"p": "1"})
155+
156+
def stop_recording(self) -> Dict[str, Union[str, bool]]:
157+
"""Stop Recording."""
158+
print("🎬 Stopped Recording.")
159+
return self._send_command("command/shutter", {"p": "0"})
160+
161+
def get_status(self) -> Dict[str, Union[str | bool]]:
162+
"""Get status of gopro."""
163+
try:
164+
r = requests.get(self.status_url, timeout=3)
165+
return r.json() if r.ok else {"error": r.text}
166+
except Exception as e:
167+
return {"error": str(e)}
168+
169+
def get_files(self) -> int:
170+
"""Get all files."""
171+
print("📂 Getting Files.")
172+
self.stop_recording()
173+
time.sleep(5)
174+
total_files = 0
175+
try:
176+
response = requests.get(self.media_url, timeout=5)
177+
response.raise_for_status()
178+
media_data = response.json()
179+
try:
180+
total_files = len(media_data["media"][0])
181+
except IndexError:
182+
total_files = 0
183+
print(f"total files {total_files}")
184+
except Exception as e:
185+
print(f"Error: {e}")
186+
return total_files
187+
188+
def delete_files(self) -> None:
189+
"""Delete Files."""
190+
delete_url = f"http://{self.ip}/gp/gpControl/command/storage/delete/all"
191+
total_files = self.get_files()
192+
if total_files > 0:
193+
try:
194+
response = requests.get(delete_url, timeout=60)
195+
if response.status_code == 200:
196+
print("🗑️ Deleted Files.")
197+
except Exception as e:
198+
print(f"Failed to delete files. Status code: {e}")
199+
200+
def _send_command(
201+
self, endpoint: str, params: Dict[str, Union[str | bool]]
202+
) -> Dict[str, Union[str | bool]]:
203+
"""Send command."""
204+
url = f"{self.control_url}/{endpoint}"
205+
try:
206+
r = requests.get(url, params=params, timeout=3)
207+
r.raise_for_status()
208+
return {"success": True}
209+
except requests.RequestException as e:
210+
return {"error": str(e)}
211+
212+
213+
def run(storage_directory: str) -> None:
214+
"""Run script."""
215+
ip_json_file = os.path.join(storage_directory, "IPs.json")
216+
try:
217+
ip_file = json.load(open(ip_json_file))
218+
robot_dict = ip_file.get("ip_address_list")
219+
except FileNotFoundError:
220+
print(f"Add .json file with robot IPs to: {storage_directory}.")
221+
sys.exit()
222+
# Build dictionary: robot -> password (only if password exists and is not empty)
223+
robot_passwords = {
224+
values[0]: values[-1]
225+
for _, values in robot_dict.items()
226+
if len(values) >= 3 and values[-1]
227+
}
228+
gopro_ip = "10.5.5.9:8080"
229+
for robot, password in robot_passwords.items():
230+
connected = connect_to_wifi(robot, password)
231+
if connected:
232+
camera = GoProCamera(gopro_ip)
233+
camera.stop_recording()
234+
camera.delete_files()
235+
for robot, password in robot_passwords.items():
236+
connected = connect_to_wifi(robot, password)
237+
if connected:
238+
camera = GoProCamera(gopro_ip)
239+
camera.start_recording()
240+
241+
242+
if __name__ == "__main__":
243+
"""Connect to GoPros, Erase Footage and Start Recording."""
244+
parser = argparse.ArgumentParser(description="Read run logs on google drive.")
245+
parser.add_argument(
246+
"storage_directory",
247+
metavar="STORAGE_DIRECTORY",
248+
type=str,
249+
nargs=1,
250+
help="Path to long term storage directory for run logs.",
251+
)
252+
args = parser.parse_args()
253+
storage_directory = args.storage_directory[0]
254+
run(storage_directory)

abr-testing/abr_testing/data_collection/abr_calibration_logs.py

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -290,29 +290,14 @@ def run(
290290
except FileNotFoundError:
291291
print(f"Add .json file with robot IPs to: {storage_directory}.")
292292
sys.exit()
293-
ip_or_all = ""
294-
while not ip_or_all:
295-
ip_or_all = input("IP Address or ALL: ")
296-
calibration_data = []
297-
if ip_or_all.upper() == "ALL":
298-
ip_address_list = list(robot_dict.keys())
299-
for ip in ip_address_list:
300-
saved_file_path, calibration = read_robot_logs.get_calibration_offsets(
301-
ip, storage_directory
302-
)
303-
calibration_data.append(calibration)
304-
else:
305-
try:
306-
(
307-
saved_file_path,
308-
calibration,
309-
) = read_robot_logs.get_calibration_offsets(
310-
ip_or_all, storage_directory
311-
)
312-
calibration_data.append(calibration)
313-
except Exception:
314-
print("Invalid IP try again")
315-
ip_or_all = ""
293+
calibration_data = []
294+
ip_address_list = list(robot_dict.keys())
295+
for ip in ip_address_list:
296+
saved_file_path, calibration = read_robot_logs.get_calibration_offsets(
297+
ip, storage_directory
298+
)
299+
calibration_data.append(calibration)
300+
316301
try:
317302
upload_calibration_offsets(
318303
calibration_data,

abr-testing/abr_testing/data_collection/abr_hepauv.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,8 +159,10 @@ def run(
159159
robot_names_and_hepa_serials = get_hepa_serials(storage_directory)
160160
if turning_hepa_fan == "on":
161161
print("𖣘 TIME STAMPING START OF HEPA FANS & UV LIGHT.")
162-
else:
162+
elif turning_hepa_fan == "off":
163163
print("𖣘 TIME STAMPING END OF HEPA FANS & UV LIGHT.")
164+
else:
165+
return
164166

165167
answer = input("Do you want to exclude any robots (y/n)?")
166168
if answer.lower == "y":

abr-testing/abr_testing/tools/abr_setup.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from datetime import datetime, timedelta
88
from typing import Any
99
from hardware_testing.scripts import ABRAsairScript # type: ignore
10-
from abr_testing.automation import google_sheets_tool
10+
from abr_testing.automation import google_sheets_tool, gopro
1111
from abr_testing.data_collection import (
1212
get_run_logs,
1313
abr_google_drive,
@@ -189,6 +189,9 @@ def main(configurations: configparser.ConfigParser) -> None:
189189
"Storage, Email, Drive Folder, or Sheet name is missing, please fix configs"
190190
)
191191
sys.exit(1)
192+
# Set up go pros
193+
storage_directory = configurations["RUN-LOG"]["Storage"]
194+
gopro.run(storage_directory)
192195

193196

194197
if __name__ == "__main__":

0 commit comments

Comments
 (0)