Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion marvain_cli/argparse_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ def run(argv: list[str]) -> int:
m_revoke.add_argument("--hub-rest-api-base", default=None)
m_revoke.add_argument("--dry-run", action="store_true")

devices = sub.add_parser("devices", help="Device token management (Hub API)")
devices = sub.add_parser("devices", help="Device token management and detection")
devices_sub = devices.add_subparsers(dest="devices_cmd")
d_register = devices_sub.add_parser("register", help="Register a new device (mint device token)")
d_register.add_argument("--agent-id", required=True)
Expand All @@ -266,6 +266,11 @@ def run(argv: list[str]) -> int:
d_register.add_argument("--hub-rest-api-base", default=None)
d_register.add_argument("--dry-run", action="store_true")

d_detect = devices_sub.add_parser("detect", help="Detect USB and direct-attach devices")
d_detect.add_argument("--type", "-t", dest="device_type", default=None, help="Filter: video, audio_input, audio_output, serial")
d_detect.add_argument("--connection", "-c", dest="connection_type", default=None, help="Filter: usb, direct")
d_detect.add_argument("--format", "-f", dest="output_format", default="table", help="Output: table, json")

args = ap.parse_args(argv)

if args.version:
Expand Down Expand Up @@ -644,6 +649,27 @@ def run(argv: list[str]) -> int:
if not bool(args.dry_run):
print(json.dumps(data, indent=2, sort_keys=True))
return 0
if args.devices_cmd == "detect":
from marvain_cli.ops import list_detected_devices

detected = list_detected_devices(
device_type=args.device_type,
connection_type=args.connection_type,
output_format=args.output_format,
)
if args.output_format == "json":
print(json.dumps(detected, indent=2))
else:
if not detected:
print("No devices detected.")
return 0
print(f"{'TYPE':<14} {'CONNECTION':<10} {'NAME':<40} {'PATH'}")
print("-" * 90)
for d in detected:
name = d["name"][:38] + ".." if len(d["name"]) > 40 else d["name"]
print(f"{d['device_type']:<14} {d['connection_type']:<10} {name:<40} {d['path']}")
print(f"\nTotal: {len(detected)} device(s) detected")
return 0
devices.print_help()
return 2

Expand Down
293 changes: 293 additions & 0 deletions marvain_cli/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -2468,3 +2468,296 @@ def cognito_get_user(
return client.admin_get_user(UserPoolId=pool_id, Username=email)
except client.exceptions.UserNotFoundException:
return None


# ---------------------------------------------------------------------------
# Device Detection (USB and Direct-Attach)
# ---------------------------------------------------------------------------


@dataclass(frozen=True)
class DetectedDevice:
"""A detected local device (USB or direct-attach)."""

device_type: str # "video", "audio_input", "audio_output", "serial"
name: str
path: str # e.g., /dev/video0, /dev/ttyUSB0
connection_type: str # "usb" or "direct"
vendor_id: str | None = None
product_id: str | None = None
serial: str | None = None


def detect_local_devices() -> list[DetectedDevice]:
"""Detect USB and direct-attach devices on the local machine.

Detects:
- Video devices (cameras, webcams)
- Audio input devices (microphones)
- Audio output devices (speakers)
- Serial ports (USB-to-serial adapters)

Returns a list of DetectedDevice objects.
"""
devices: list[DetectedDevice] = []

# Detect video devices
devices.extend(_detect_video_devices())

# Detect audio devices
devices.extend(_detect_audio_devices())

# Detect serial ports
devices.extend(_detect_serial_ports())

return devices


def _detect_video_devices() -> list[DetectedDevice]:
"""Detect video capture devices (cameras, webcams)."""
devices: list[DetectedDevice] = []

if sys.platform == "darwin":
# macOS: Use system_profiler to list cameras
try:
result = subprocess.run(
["system_profiler", "SPCameraDataType", "-json"],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0:
data = json.loads(result.stdout)
cameras = data.get("SPCameraDataType", [])
for i, cam in enumerate(cameras):
name = cam.get("_name", f"Camera {i}")
# macOS doesn't expose /dev paths for cameras directly
# Use AVFoundation index as identifier
path = f"avfoundation:{i}"
conn_type = "usb" if "usb" in name.lower() else "direct"
devices.append(
DetectedDevice(
device_type="video",
name=name,
path=path,
connection_type=conn_type,
vendor_id=cam.get("spcamera_vendor-id"),
product_id=cam.get("spcamera_model-id"),
)
)
except (subprocess.TimeoutExpired, json.JSONDecodeError, FileNotFoundError):
pass

else:
# Linux: Check /dev/video* devices
import glob

for video_path in sorted(glob.glob("/dev/video*")):
try:
# Try to get device name from v4l2
result = subprocess.run(
["v4l2-ctl", "--device", video_path, "--info"],
capture_output=True,
text=True,
timeout=5,
)
name = video_path
if result.returncode == 0:
for line in result.stdout.split("\n"):
if "Card type" in line:
name = line.split(":", 1)[-1].strip()
break

# Check if USB device
conn_type = "usb" if "usb" in video_path.lower() or os.path.exists(
f"/sys/class/video4linux/{os.path.basename(video_path)}/device/driver"
) else "direct"
Comment on lines +2571 to +2574

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Use a real USB check for Linux video devices

On Linux, conn_type is set to "usb" whenever /sys/class/video4linux/<device>/device/driver exists. That path exists for essentially every v4l2 device (PCI, platform, virtual, etc.), so non‑USB cameras will still be marked as USB. In practice this makes --connection direct return no results even when direct‑attach devices are present, and mislabels devices in the output. Consider checking the sysfs device ancestry for a USB bus (e.g., via /sys/class/video4linux/<device>/device/subsystem or udevadm) instead of only the driver path.

Useful? React with 👍 / 👎.


devices.append(
DetectedDevice(
device_type="video",
name=name,
path=video_path,
connection_type=conn_type,
)
)
except (subprocess.TimeoutExpired, FileNotFoundError):
# v4l2-ctl not installed, just add the device path
devices.append(
DetectedDevice(
device_type="video",
name=video_path,
path=video_path,
connection_type="usb",
)
)

return devices


def _detect_audio_devices() -> list[DetectedDevice]:
"""Detect audio input/output devices."""
devices: list[DetectedDevice] = []

if sys.platform == "darwin":
# macOS: Use system_profiler for audio devices
try:
result = subprocess.run(
["system_profiler", "SPAudioDataType", "-json"],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0:
data = json.loads(result.stdout)
audio_devices = data.get("SPAudioDataType", [])
for device in audio_devices:
items = device.get("_items", [])
for item in items:
name = item.get("_name", "Unknown Audio Device")
# Determine if input or output
dev_type = "audio_input" if "input" in name.lower() or "microphone" in name.lower() else "audio_output"
# macOS uses CoreAudio, no /dev path
path = f"coreaudio:{name}"
conn_type = "usb" if "usb" in name.lower() else "direct"
devices.append(
DetectedDevice(
device_type=dev_type,
name=name,
path=path,
connection_type=conn_type,
)
)
except (subprocess.TimeoutExpired, json.JSONDecodeError, FileNotFoundError):
pass

else:
# Linux: Check ALSA devices
try:
# List capture devices (microphones)
result = subprocess.run(
["arecord", "-l"],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0:
for line in result.stdout.split("\n"):
if line.startswith("card"):
# Parse: "card 0: PCH [HDA Intel PCH], device 0: ALC..."
parts = line.split(":")
if len(parts) >= 2:
card_info = parts[0].split()
card_num = card_info[1] if len(card_info) > 1 else "0"
name = parts[1].strip().split(",")[0].strip()
path = f"hw:{card_num}"
devices.append(
DetectedDevice(
device_type="audio_input",
name=name,
path=path,
connection_type="direct",
)
)
except (subprocess.TimeoutExpired, FileNotFoundError):
pass

try:
# List playback devices (speakers)
result = subprocess.run(
["aplay", "-l"],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0:
for line in result.stdout.split("\n"):
if line.startswith("card"):
parts = line.split(":")
if len(parts) >= 2:
card_info = parts[0].split()
card_num = card_info[1] if len(card_info) > 1 else "0"
name = parts[1].strip().split(",")[0].strip()
path = f"hw:{card_num}"
devices.append(
DetectedDevice(
device_type="audio_output",
name=name,
path=path,
connection_type="direct",
)
)
except (subprocess.TimeoutExpired, FileNotFoundError):
pass

return devices


def _detect_serial_ports() -> list[DetectedDevice]:
"""Detect serial ports (USB-to-serial adapters, etc.)."""
devices: list[DetectedDevice] = []
import glob

# Common serial port patterns
patterns = [
"/dev/ttyUSB*", # USB-to-serial adapters (Linux)
"/dev/ttyACM*", # Arduino, etc. (Linux)
"/dev/tty.usb*", # USB serial (macOS)
"/dev/cu.usb*", # USB serial (macOS)
"/dev/tty.Bluetooth*", # Bluetooth serial (macOS)
]

for pattern in patterns:
for port_path in sorted(glob.glob(pattern)):
name = os.path.basename(port_path)
conn_type = "usb" if "usb" in port_path.lower() else "direct"
devices.append(
DetectedDevice(
device_type="serial",
name=name,
path=port_path,
connection_type=conn_type,
)
)

return devices


def list_detected_devices(
*,
device_type: str | None = None,
connection_type: str | None = None,
output_format: str = "table",
) -> list[dict[str, Any]]:
"""List detected local devices with optional filtering.

Args:
device_type: Filter by type ("video", "audio_input", "audio_output", "serial")
connection_type: Filter by connection ("usb", "direct")
output_format: Output format ("table", "json")

Returns:
List of device dictionaries.
"""
devices = detect_local_devices()

# Apply filters
if device_type:
devices = [d for d in devices if d.device_type == device_type]
if connection_type:
devices = [d for d in devices if d.connection_type == connection_type]

# Convert to dicts
result = []
for d in devices:
result.append({
"device_type": d.device_type,
"name": d.name,
"path": d.path,
"connection_type": d.connection_type,
"vendor_id": d.vendor_id,
"product_id": d.product_id,
"serial": d.serial,
})

return result
Loading
Loading