Skip to content

Commit cb3a53a

Browse files
committed
[feat] 1. Add realworld deployment code on robot. 2. Add mpc and pid controller. 3. InternVLA-N1 client
1 parent d1cbf71 commit cb3a53a

File tree

3 files changed

+559
-0
lines changed

3 files changed

+559
-0
lines changed

scripts/realworld/controllers.py

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
#!/usr/bin/env python
2+
# coding=utf-8
3+
4+
import casadi as ca
5+
import numpy as np
6+
import time
7+
import math
8+
import os
9+
import sys
10+
sys.path.append(os.path.dirname(os.path.abspath(__file__)))
11+
from scipy.interpolate import interp1d
12+
13+
class Mpc_controller:
14+
def __init__(self, global_planed_traj, N = 20, desired_v = 0.3, v_max = 0.4, w_max = 0.4, ref_gap = 4):
15+
self.N, self.desired_v, self.ref_gap, self.T = N, desired_v, ref_gap, 0.1
16+
self.ref_traj = self.make_ref_denser(global_planed_traj)
17+
self.ref_traj_len = N // ref_gap + 1
18+
19+
# setup mpc problem
20+
opti = ca.Opti()
21+
opt_controls = opti.variable(N, 2)
22+
v, w = opt_controls[:, 0], opt_controls[:, 1]
23+
24+
opt_states = opti.variable(N+1, 3)
25+
x, y, theta = opt_states[:, 0], opt_states[:, 1], opt_states[:, 2]
26+
27+
# parameters
28+
opt_x0 = opti.parameter(3)
29+
opt_xs = opti.parameter(3 * self.ref_traj_len) # the intermidia state may also be the parameter
30+
31+
# system dynamics for mobile manipulator
32+
f = lambda x_, u_: ca.vertcat(*[u_[0]*ca.cos(x_[2]), u_[0]*ca.sin(x_[2]), u_[1]])
33+
34+
# init_condition
35+
opti.subject_to(opt_states[0, :] == opt_x0.T)
36+
for i in range(N):
37+
x_next = opt_states[i, :] + f(opt_states[i, :], opt_controls[i, :]).T*self.T
38+
opti.subject_to(opt_states[i+1, :]==x_next)
39+
40+
# define the cost function
41+
Q = np.diag([10.0,10.0,0.0])
42+
R = np.diag([0.05,0.2])
43+
obj = 0
44+
for i in range(N):
45+
obj = obj +ca.mtimes([opt_controls[i, :], R, opt_controls[i, :].T])
46+
if i % ref_gap == 0:
47+
nn = i // ref_gap
48+
obj = obj + ca.mtimes([(opt_states[i, :]-opt_xs[nn*3:nn*3+3].T), Q, (opt_states[i, :]-opt_xs[nn*3:nn*3+3].T).T])
49+
50+
opti.minimize(obj)
51+
52+
# boundrary and control conditions
53+
opti.subject_to(opti.bounded(0, v, v_max))
54+
opti.subject_to(opti.bounded(-w_max, w, w_max))
55+
56+
opts_setting = {'ipopt.max_iter':100, 'ipopt.print_level':0, 'print_time':0, 'ipopt.acceptable_tol':1e-8, 'ipopt.acceptable_obj_change_tol':1e-6}
57+
opti.solver('ipopt', opts_setting)
58+
# opts_setting = { 'qpsol':'osqp','hessian_approximation':'limited-memory','max_iter':200,'convexify_strategy':'regularize','beta':0.5,'c1':1e-4,'tol_du':1e-3,'tol_pr':1e-6}
59+
# opti.solver('sqpmethod',opts_setting)
60+
61+
self.opti = opti
62+
self.opt_xs = opt_xs
63+
self.opt_x0 = opt_x0
64+
self.opt_controls = opt_controls
65+
self.opt_states = opt_states
66+
self.last_opt_x_states = None
67+
self.last_opt_u_controls = None
68+
def make_ref_denser(self, ref_traj, ratio = 50):
69+
x_orig = np.arange(len(ref_traj))
70+
new_x = np.linspace(0, len(ref_traj) - 1, num=len(ref_traj) * ratio)
71+
72+
interp_func_x = interp1d(x_orig, ref_traj[:, 0], kind='linear')
73+
interp_func_y = interp1d(x_orig, ref_traj[:, 1], kind='linear')
74+
75+
uniform_x = interp_func_x(new_x)
76+
uniform_y = interp_func_y(new_x)
77+
ref_traj = np.stack((uniform_x, uniform_y), axis=1)
78+
79+
return ref_traj
80+
81+
def update_ref_traj(self, global_planed_traj):
82+
self.ref_traj = self.make_ref_denser(global_planed_traj)
83+
self.ref_traj_len = self.N // self.ref_gap + 1
84+
85+
def solve(self, x0):
86+
ref_traj = self.find_reference_traj(x0, self.ref_traj)
87+
# fake a yaw angle
88+
ref_traj = np.concatenate((ref_traj, np.zeros((ref_traj.shape[0], 1))), axis=1).reshape(-1, 1)
89+
90+
self.opti.set_value(self.opt_xs, ref_traj.reshape(-1, 1))
91+
u0 = np.zeros((self.N, 2)) if self.last_opt_u_controls is None else self.last_opt_u_controls
92+
x00 = np.zeros((self.N+1, 3)) if self.last_opt_x_states is None else self.last_opt_x_states
93+
94+
self.opti.set_value(self.opt_x0, x0)
95+
self.opti.set_initial(self.opt_controls, u0)
96+
self.opti.set_initial(self.opt_states, x00)
97+
98+
sol = self.opti.solve()
99+
100+
self.last_opt_u_controls = sol.value(self.opt_controls)
101+
self.last_opt_x_states = sol.value(self.opt_states)
102+
103+
return self.last_opt_u_controls, self.last_opt_x_states
104+
def reset(self):
105+
self.last_opt_x_states = None
106+
self.last_opt_u_controls = None
107+
108+
def find_reference_traj(self, x0, global_planed_traj):
109+
ref_traj_pts = []
110+
# find the nearest point in global_planed_traj
111+
nearest_idx = np.argmin(np.linalg.norm(global_planed_traj - x0[:2].reshape((1, 2)), axis=1))
112+
desire_arc_length = self.desired_v * self.ref_gap * self.T
113+
cum_dist = np.cumsum(np.linalg.norm(np.diff(global_planed_traj, axis=0), axis=1))
114+
115+
# select the reference points from the nearest point to the end of global_planed_traj
116+
for i in range(nearest_idx, len(global_planed_traj) - 1):
117+
if cum_dist[i] - cum_dist[nearest_idx] >= desire_arc_length * len(ref_traj_pts):
118+
ref_traj_pts.append(global_planed_traj[i, :])
119+
if len(ref_traj_pts) == self.ref_traj_len:
120+
break
121+
# if the target is reached before the reference trajectory is complete, add the last point of global_planed_traj
122+
while len(ref_traj_pts) < self.ref_traj_len:
123+
ref_traj_pts.append(global_planed_traj[-1, :])
124+
return np.array(ref_traj_pts)
125+
126+
127+
class PID_controller:
128+
def __init__(self, Kp_trans=1.0, Kd_trans=0.1, Kp_yaw=1.0, Kd_yaw=1.0, max_v=1.0, max_w=1.2):
129+
self.Kp_trans = Kp_trans
130+
self.Kd_trans = Kd_trans
131+
self.Kp_yaw = Kp_yaw
132+
self.Kd_yaw = Kd_yaw
133+
self.max_v = max_v
134+
self.max_w = max_w
135+
136+
def solve(self, odom, target, vel=np.zeros(2)):
137+
translation_error, yaw_error = self.calculate_errors(odom, target)
138+
v, w = self.pd_step(translation_error, yaw_error, vel[0], vel[1])
139+
return v, w, translation_error, yaw_error
140+
141+
def pd_step(self, translation_error, yaw_error, linear_vel, angular_vel):
142+
translation_error = max(-1.0, min(1.0, translation_error))
143+
yaw_error = max(-1.0, min(1.0, yaw_error))
144+
145+
linear_velocity = self.Kp_trans * translation_error - self.Kd_trans * linear_vel
146+
angular_velocity = self.Kp_yaw * yaw_error - self.Kd_yaw * angular_vel
147+
148+
linear_velocity = max(-self.max_v, min(self.max_v, linear_velocity))
149+
angular_velocity = max(-self.max_w, min(self.max_w, angular_velocity))
150+
151+
return linear_velocity, angular_velocity
152+
153+
def calculate_errors(self, odom, target):
154+
155+
dx = target[0, 3] - odom[0, 3]
156+
dy = target[1, 3] - odom[1, 3]
157+
158+
odom_yaw = math.atan2(odom[1, 0], odom[0, 0])
159+
target_yaw = math.atan2(target[1, 0], target[0, 0])
160+
161+
translation_error = dx * np.cos(odom_yaw) + dy * np.sin(odom_yaw)
162+
163+
yaw_error = target_yaw - odom_yaw
164+
yaw_error = (yaw_error + math.pi) % (2 * math.pi) - math.pi
165+
166+
return translation_error, yaw_error

0 commit comments

Comments
 (0)