Skip to content
Open

1030 #2344

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ node_modules/
poetry.lock
uv.lock
Pipfile.lock

miniconda3
### Build & Distribution ###
build/
dist/
Expand Down
2 changes: 2 additions & 0 deletions cuda.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
import torch
print(torch.cuda.is_available())
72 changes: 72 additions & 0 deletions keyboard_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env python3

"""
Simple keyboard test utility.

It collects all keys pressed on stdin and prints the batch every second.
Press Ctrl+C to exit.
"""

from __future__ import annotations

import select
import sys
import termios
import time
import tty


def _format_keys(keys: list[str]) -> str:
"""Return a readable representation of the collected keys."""
pretty = []
for key in keys:
if key == "\n":
pretty.append("<ENTER>")
elif key == "\r":
continue # handled with newline on most terminals
elif key == "\t":
pretty.append("<TAB>")
elif key == "\x7f":
pretty.append("<BACKSPACE>")
elif key.isprintable():
pretty.append(key)
else:
pretty.append(f"<0x{ord(key):02x}>")
return " ".join(pretty) if pretty else "<none>"


def main() -> None:
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)

print("Keyboard test started. Press keys (Ctrl+C to quit).")

try:
tty.setcbreak(fd)
window: list[str] = []
last_tick = time.monotonic()

while True:
# poll stdin; timeout keeps loop responsive for the 1s ticker
ready, _, _ = select.select([sys.stdin], [], [], 0.1)
if ready:
key = sys.stdin.read(1)
if key == "\x03": # Ctrl+C
raise KeyboardInterrupt
window.append(key)

now = time.monotonic()
if now - last_tick >= 1.0:
stamp = time.strftime("%H:%M:%S")
print(f"[{stamp}] { _format_keys(window) }")
window.clear()
last_tick = now

except KeyboardInterrupt:
print("\nExiting keyboard test.")
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)


if __name__ == "__main__":
main()
119 changes: 119 additions & 0 deletions rk_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
#!/usr/bin/env python3
"""Command line client for the RobotKinematics HTTP service."""

from __future__ import annotations

import argparse
import json
import sys
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.request import Request, urlopen

import numpy as np


def _post_json(url: str, payload: dict[str, Any]) -> dict[str, Any]:
data = json.dumps(payload).encode("utf-8")
request = Request(url, data=data, headers={"Content-Type": "application/json"})

try:
with urlopen(request) as response:
body = response.read().decode("utf-8")
return json.loads(body)
except HTTPError as exc:
error_body = exc.read().decode("utf-8", errors="ignore")
raise RuntimeError(f"Server returned HTTP {exc.code}: {error_body}") from exc
except URLError as exc:
raise RuntimeError(f"Failed to reach {url}: {exc.reason}") from exc


def command_fk(args: argparse.Namespace) -> None:
payload = {"joint_positions": args.joint_positions}
response = _post_json(f"{args.url}/fk", payload)
print(json.dumps(response, indent=2))


def command_ik(args: argparse.Namespace) -> None:
pose_matrix = np.asarray(args.pose, dtype=float).reshape(4, 4).tolist()
payload = {
"current_joint_positions": args.current_joint_positions,
"desired_pose": pose_matrix,
}
if args.position_weight is not None:
payload["position_weight"] = args.position_weight
if args.orientation_weight is not None:
payload["orientation_weight"] = args.orientation_weight

response = _post_json(f"{args.url}/ik", payload)
print(json.dumps(response, indent=2))


def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Client for the RobotKinematics HTTP service.")
parser.add_argument(
"--url",
default="http://127.0.0.1:8081",
help="Base URL for the rk_service (default: http://127.0.0.1:8081)",
)

subparsers = parser.add_subparsers(dest="command", required=True)

fk_parser = subparsers.add_parser("fk", help="Call forward kinematics.")
fk_parser.add_argument(
"--joint-positions",
nargs="+",
type=float,
required=True,
metavar="ANGLE_DEG",
help="Joint positions in degrees.",
)
fk_parser.set_defaults(func=command_fk)

ik_parser = subparsers.add_parser("ik", help="Call inverse kinematics.")
ik_parser.add_argument(
"--current-joint-positions",
nargs="+",
type=float,
required=True,
metavar="ANGLE_DEG",
help="Current joint positions in degrees used as the IK seed.",
)
ik_parser.add_argument(
"--pose",
nargs=16,
type=float,
required=True,
metavar="POSE_VALUE",
help="Desired end-effector 4x4 pose matrix in row-major order (16 values).",
)
ik_parser.add_argument(
"--position-weight",
type=float,
default=None,
help="Optional position weight for IK.",
)
ik_parser.add_argument(
"--orientation-weight",
type=float,
default=None,
help="Optional orientation weight for IK.",
)
ik_parser.set_defaults(func=command_ik)

return parser


def main(argv: list[str] | None = None) -> None:
parser = build_parser()
args = parser.parse_args(argv)

try:
args.func(args)
except RuntimeError as exc:
parser.error(str(exc))


if __name__ == "__main__":
main(sys.argv[1:])

192 changes: 192 additions & 0 deletions rk_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
#!/usr/bin/env python3
"""Simple HTTP service exposing RobotKinematics forward and inverse kinematics.

Run on a machine that has the `placo` dependency installed so RobotKinematics works:

python rk_service.py --urdf-path /path/to/robot.urdf --port 8000

Endpoints:
POST /fk
Body: {"joint_positions": [deg, ...]}
Response: {"pose": [[...], ...]}

POST /ik
Body: {
"current_joint_positions": [deg, ...],
"desired_pose": [[...], ...],
"position_weight": 1.0, # optional
"orientation_weight": 0.01 # optional
}
Response: {"joint_positions": [deg, ...]}
"""

from __future__ import annotations

import argparse
import json
import logging
from http import HTTPStatus
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import Any, ClassVar

import numpy as np

from lerobot.model.kinematics import RobotKinematics


LOGGER = logging.getLogger("rk_service")


def _json_response(handler: BaseHTTPRequestHandler, status: HTTPStatus, payload: dict[str, Any]) -> None:
"""Serialize payload and send it as the HTTP response."""
body = json.dumps(payload).encode("utf-8")
handler.send_response(status.value)
handler.send_header("Content-Type", "application/json")
handler.send_header("Content-Length", str(len(body)))
handler.end_headers()
handler.wfile.write(body)


class KinematicsRequestHandler(BaseHTTPRequestHandler):
"""HTTP request handler exposing RobotKinematics operations."""

kinematics: ClassVar[RobotKinematics]
suppress_logging: ClassVar[bool] = False

def log_message(self, format: str, *args: Any) -> None: # noqa: D401 - `BaseHTTPRequestHandler` signature
if not self.suppress_logging:
super().log_message(format, *args)

def do_GET(self) -> None: # noqa: N802 D401 - `BaseHTTPRequestHandler` signature
if self.path == "/healthz":
_json_response(self, HTTPStatus.OK, {"status": "ok"})
else:
_json_response(self, HTTPStatus.NOT_FOUND, {"error": "Endpoint not found"})

def do_POST(self) -> None: # noqa: N802 D401 - `BaseHTTPRequestHandler` signature
content_length = int(self.headers.get("Content-Length", "0"))
body = self.rfile.read(content_length) if content_length else b""

try:
payload = json.loads(body.decode("utf-8"))
except json.JSONDecodeError as exc:
LOGGER.exception("Failed to decode request body as JSON")
_json_response(self, HTTPStatus.BAD_REQUEST, {"error": f"Invalid JSON: {exc}"})
return

if self.path == "/fk":
self._handle_forward_kinematics(payload)
elif self.path == "/ik":
self._handle_inverse_kinematics(payload)
else:
_json_response(self, HTTPStatus.NOT_FOUND, {"error": "Endpoint not found"})

def _handle_forward_kinematics(self, payload: dict[str, Any]) -> None:
try:
joints = payload["joint_positions"]
joint_array = np.asarray(joints, dtype=float)
except (KeyError, TypeError, ValueError) as exc:
_json_response(self, HTTPStatus.BAD_REQUEST, {"error": f"Invalid joint_positions: {exc}"})
return

try:
pose_matrix = self.kinematics.forward_kinematics(joint_array)
except Exception as exc: # placo may raise C++ exceptions that surface as generic Exception
LOGGER.exception("Forward kinematics failed")
_json_response(self, HTTPStatus.INTERNAL_SERVER_ERROR, {"error": f"forward_kinematics failed: {exc}"})
return

_json_response(self, HTTPStatus.OK, {"pose": pose_matrix.tolist()})

def _handle_inverse_kinematics(self, payload: dict[str, Any]) -> None:
try:
current = np.asarray(payload["current_joint_positions"], dtype=float)
desired_pose = np.asarray(payload["desired_pose"], dtype=float)
except (KeyError, TypeError, ValueError) as exc:
_json_response(
self,
HTTPStatus.BAD_REQUEST,
{"error": f"Invalid current_joint_positions or desired_pose: {exc}"},
)
return

if desired_pose.shape != (4, 4):
_json_response(
self,
HTTPStatus.BAD_REQUEST,
{"error": f"desired_pose must be 4x4 matrix, got shape {desired_pose.shape}"},
)
return

position_weight = float(payload.get("position_weight", 1.0))
orientation_weight = float(payload.get("orientation_weight", 0.01))

try:
joint_solution = self.kinematics.inverse_kinematics(
current_joint_pos=current,
desired_ee_pose=desired_pose,
position_weight=position_weight,
orientation_weight=orientation_weight,
)
except Exception as exc: # placo errors
LOGGER.exception("Inverse kinematics failed")
_json_response(self, HTTPStatus.INTERNAL_SERVER_ERROR, {"error": f"inverse_kinematics failed: {exc}"})
return

_json_response(self, HTTPStatus.OK, {"joint_positions": joint_solution.tolist()})


def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Expose RobotKinematics via a simple HTTP service.")
parser.add_argument("--host", default="127.0.0.1", help="Interface to bind the HTTP server (default: 127.0.0.1)")
parser.add_argument("--port", type=int, default=8081, help="Port for the HTTP server (default: 8081)")
parser.add_argument("--urdf-path", default="Simulation/SO101/so101_new_calib.urdf", help="Path to the robot URDF file.")
parser.add_argument(
"--target-frame",
default="gripper_frame_link",
help="End-effector frame name (default: gripper_frame_link)",
)
parser.add_argument(
"--joint-names",
nargs="+",
default=None,
help="Optional list of joint names; defaults to URDF order when omitted.",
)
parser.add_argument(
"--suppress-request-logs",
action="store_true",
help="Disable per-request logging emitted by the HTTP handler.",
)
return parser


def main() -> None:
args = build_arg_parser().parse_args()

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(name)s: %(message)s")

LOGGER.info("Loading RobotKinematics with URDF: %s", args.urdf_path)
kinematics = RobotKinematics(
urdf_path=args.urdf_path,
target_frame_name=args.target_frame,
joint_names=args.joint_names,
)

KinematicsRequestHandler.kinematics = kinematics
KinematicsRequestHandler.suppress_logging = bool(args.suppress_request_logs)

server_address = (args.host, args.port)
httpd = ThreadingHTTPServer(server_address, KinematicsRequestHandler)
LOGGER.info("RobotKinematics service listening on http://%s:%d", args.host, args.port)

try:
httpd.serve_forever()
except KeyboardInterrupt:
LOGGER.info("Shutting down RobotKinematics service.")
finally:
httpd.server_close()


if __name__ == "__main__":
main()

Loading