forked from huggingface/lerobot
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_keyboard_control.py
More file actions
executable file
·458 lines (398 loc) · 20.3 KB
/
test_keyboard_control.py
File metadata and controls
executable file
·458 lines (398 loc) · 20.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
#!/usr/bin/env python
"""
自动控制 Isaac Lab Arena 环境的一个自由度(关节 0)
让关节 0 在 -1 到 1 之间自动循环,其他自由度保持为 0
用于测试环境是否响应动作
"""
import os
os.environ["ACCEPT_EULA"] = "Y"
os.environ["PRIVACY_CONSENT"] = "Y"
# 设置代理(如果需要)
if "http_proxy" not in os.environ:
os.environ["http_proxy"] = "http://127.0.0.1:7890"
os.environ["https_proxy"] = "http://127.0.0.1:7890"
os.environ["HTTP_PROXY"] = "http://127.0.0.1:7890"
os.environ["HTTPS_PROXY"] = "http://127.0.0.1:7890"
import torch
import numpy as np
import time
import math
from lerobot.envs.factory import make_env
from lerobot.envs.configs import IsaaclabArenaEnv
def main():
print("=" * 60)
print("Isaac Lab Arena 自动控制测试")
print("=" * 60)
# 创建环境配置
env_cfg = IsaaclabArenaEnv(
hub_path="nvidia/isaaclab-arena-envs",
environment="gr1_microwave",
embodiment="gr1_pink",
object="mustard_bottle",
headless=False,
enable_cameras=True,
state_keys="robot_joint_pos",
camera_keys="robot_pov_cam_rgb",
state_dim=54,
action_dim=36,
device="cuda:0",
episode_length=1000,
)
print(f"\n环境配置:")
print(f" - Environment: {env_cfg.environment}")
print(f" - Action 维度: {env_cfg.action_dim}")
# 创建环境
print("\n正在加载环境(这可能需要 30-60 秒)...")
envs_dict = make_env(env_cfg, n_envs=1, trust_remote_code=True)
suite_name = next(iter(envs_dict))
env = envs_dict[suite_name][0]
print(f"✓ 环境加载成功: {suite_name}")
print(f" - 动作空间: {env.action_space}")
# 检查动作空间类型
# Isaac Lab Arena 的动作空间是 (1, 36),第一个维度是 batch 大小
if hasattr(env.action_space, 'shape'):
action_space_shape = env.action_space.shape
print(f" - 动作空间形状: {action_space_shape}")
# 如果是 2D,第二个维度是动作维度;如果是 1D,直接使用
shape_len = len(action_space_shape) if hasattr(action_space_shape, '__len__') else 1
if shape_len == 2:
# 2D shape: (batch_size, action_dim)
action_dim = int(action_space_shape[1]) # 第二个维度是实际的动作维度
batch_size = int(action_space_shape[0]) # 第一个维度是 batch 大小
print(f" - Batch 大小: {batch_size}")
print(f" - 动作维度: {action_dim}")
else:
# 1D shape: (action_dim,)
action_dim = int(action_space_shape[0])
print(f" - 动作维度: {action_dim}")
if hasattr(env.action_space, 'low') and hasattr(env.action_space, 'high'):
print(f" - 动作范围: [{env.action_space.low.min():.3f}, {env.action_space.high.max():.3f}]")
else:
action_dim = 36
print(f" - 使用默认动作维度: {action_dim}")
# 尝试获取关节名称信息
print("\n尝试获取关节名称信息...")
joint_names = None
try:
# 从环境或信息中获取关节名称
obs, info = env.reset()
# 检查 info 中是否有关节名称
if 'joint_names' in info:
joint_names = info['joint_names']
print(f"✓ 找到关节名称: {len(joint_names)} 个关节")
elif hasattr(env, 'joint_names'):
joint_names = env.joint_names
print(f"✓ 从环境获取关节名称: {len(joint_names)} 个关节")
elif hasattr(env, '_env') and hasattr(env._env, 'joint_names'):
joint_names = env._env.joint_names
print(f"✓ 从内部环境获取关节名称: {len(joint_names)} 个关节")
# 如果还是没有,尝试从 observation 键名中推断
if joint_names is None:
print(" ⚠ 无法自动获取关节名称,使用默认 GR1 映射")
# GR1 典型的关节顺序(基于 Unitree G1,GR1 应该类似)
joint_names = [
# 左腿 (0-5)
"LeftHipPitch", "LeftHipRoll", "LeftHipYaw", "LeftKnee", "LeftAnklePitch", "LeftAnkleRoll",
# 右腿 (6-11)
"RightHipPitch", "RightHipRoll", "RightHipYaw", "RightKnee", "RightAnklePitch", "RightAnkleRoll",
# 腰部 (12-14)
"WaistYaw", "WaistRoll", "WaistPitch",
# 左臂 (15-21)
"LeftShoulderPitch", "LeftShoulderRoll", "LeftShoulderYaw", "LeftElbow",
"LeftWristRoll", "LeftWristPitch", "LeftWristYaw",
# 右臂 (22-28)
"RightShoulderPitch", "RightShoulderRoll", "RightShoulderYaw", "RightElbow",
"RightWristRoll", "RightWristPitch", "RightWristYaw",
# 其他关节(如果有)(29-35)
*[f"Joint{i}" for i in range(29, action_dim)]
]
# 确保长度匹配
joint_names = joint_names[:action_dim]
except Exception as e:
print(f" ⚠ 获取关节名称时出错: {e}")
joint_names = None
if joint_names:
print(f"\n关节映射表 (共 {len(joint_names)} 个关节):")
print("=" * 80)
for i, name in enumerate(joint_names):
# 按区域分组显示
if i < 6:
region = "左腿"
elif i < 12:
region = "右腿"
elif i < 15:
region = "腰部"
elif i < 22:
region = "左臂"
elif i < 29:
region = "右臂"
else:
region = "其他"
print(f" 关节 {i:2d}: {name:30s} ({region})")
print("=" * 80)
# 探测 action 和关节的映射关系
@torch.no_grad()
def probe_action_mapping(env, device, steps=30, amp=0.02):
"""探测每个 action 维度对应哪个实际关节
对每个 action[i],只给它一个小幅动作,运行几步后看哪个关节位置变化最大
"""
print("\n" + "=" * 80)
print("探测 Action -> Joint 映射关系")
print("=" * 80)
print(f"方法: 对每个 action[i] 设置小幅动作 {amp},运行 {steps} 步")
print(f" 观察哪个关节位置变化最大")
print("=" * 80)
action_to_joint_mapping = {}
for i in range(action_dim):
obs, info = env.reset()
q0 = obs["robot_joint_pos"].clone()
a = torch.zeros((1, action_dim), device=device)
a[0, i] = amp
for _ in range(steps):
obs, *_ = env.step(a)
dq = obs["robot_joint_pos"] - q0
vals, idx = torch.topk(dq.abs()[0], k=5)
# 记录最大的关节变化
primary_joint = idx[0].item()
primary_change = vals[0].item()
action_to_joint_mapping[i] = {
'primary_joint': primary_joint,
'primary_change': primary_change,
'top5_joints': idx.tolist(),
'top5_changes': vals.tolist()
}
if primary_change > 0.001: # 只显示有明显变化的
print(f"action[{i:02d}] -> 主要影响关节 {primary_joint:2d} (变化: {primary_change:.6f})")
print(f" Top5 关节: {idx.tolist()}, 变化值: {[f'{v:.6f}' for v in vals.tolist()]}")
else:
print(f"action[{i:02d}] -> 无明显影响(变化 < 0.001)")
print("=" * 80)
return action_to_joint_mapping
# 执行探测
print("\n开始探测 Action -> Joint 映射关系(这可能需要几分钟)...")
try:
action_mapping = probe_action_mapping(env, env.device, steps=30, amp=0.02)
# 生成反向映射:joint -> action
joint_to_action = {}
for action_idx, mapping_info in action_mapping.items():
joint_idx = mapping_info['primary_joint']
if mapping_info['primary_change'] > 0.001: # 只记录有明显影响的
if joint_idx not in joint_to_action:
joint_to_action[joint_idx] = []
joint_to_action[joint_idx].append({
'action': action_idx,
'change': mapping_info['primary_change']
})
# 打印总结
print(f"\n{'=' * 80}")
print("映射关系总结")
print(f"{'=' * 80}")
print(f"发现 {len([k for k, v in action_mapping.items() if v['primary_change'] > 0.001])} 个有效的 action 维度")
print(f"这些 action 主要影响 {len(joint_to_action)} 个不同的关节")
print(f"\n按关节分组:")
for joint_idx in sorted(joint_to_action.keys()):
actions = joint_to_action[joint_idx]
action_str = ", ".join([f"action[{a['action']}]" for a in actions])
print(f" 关节 {joint_idx:2d}: {action_str}")
print(f"{'=' * 80}\n")
except Exception as e:
print(f"✗ 探测映射关系失败: {e}")
import traceback
traceback.print_exc()
action_mapping = None
# 测试:先尝试一个小的随机动作看看环境是否有响应
print("\n测试: 尝试执行一个小幅随机动作...")
try:
# Isaac Lab Arena 需要形状为 (1, 36) 的动作
test_action = torch.randn(1, action_dim, device=env.device) * 0.1 # 小幅度随机动作
test_obs, test_reward, test_terminated, test_truncated, test_info = env.step(test_action)
print(f"✓ 测试动作执行成功! Reward: {test_reward.item():.4f}")
print(" 这说明环境是响应动作的。")
except Exception as e:
print(f"✗ 测试动作执行失败: {e}")
import traceback
traceback.print_exc()
# 重置环境
print("\n正在重置环境...")
try:
obs, info = env.reset()
print("✓ 环境重置成功")
except Exception as e:
print(f"✗ 环境重置失败: {e}")
import traceback
traceback.print_exc()
env.close()
return
print("\n" + "=" * 60)
print("自动控制说明:")
print("=" * 60)
print("只控制手臂关节(左臂、右臂),自动正弦循环(幅度 -3 到 3)")
print("左右腿和腰部关节保持为 0(不动)")
print("每个关节有不同的相位,周期约 8 秒")
print("同时打印观察到的关节位置变化")
print("按 Ctrl+C 退出")
print("=" * 60)
print("\n开始自动控制循环...\n")
step_count = 0
last_print_time = time.time()
start_time = time.time()
# 控制所有 36 个关节,每个有不同的相位偏移
amplitude = 3.0 # 幅度:从 -3 到 3
# 为所有关节生成不同的相位偏移
phase_offsets = [i * 2 * math.pi / action_dim for i in range(action_dim)]
# 保存初始关节位置用于对比
initial_joint_pos = None
# 保存 action 映射信息(从探测中获得)
if 'action_mapping' not in locals():
action_mapping = None
try:
while True:
# 使用正弦函数让所有关节循环
# 周期大约 8 秒
t = (time.time() - start_time) / 8.0 # 8 秒一个周期
# 创建动作向量
action_values = np.zeros(action_dim, dtype=np.float32)
# 定义关节范围:
# 0-5: 左腿(不动)
# 6-11: 右腿(不动)
# 12-14: 腰部(不动)
# 15-21: 左臂(运动)
# 22-28: 右臂(运动)
# 29+: 其他关节(运动,如果有)
left_leg_end = 6
right_leg_end = 12
waist_end = 15
left_arm_start = 15
left_arm_end = 22
right_arm_start = 22
right_arm_end = 29
# 只控制手臂和其他关节(15 及以后)
for joint_idx in range(left_arm_start, action_dim):
phase = t * 2 * math.pi + phase_offsets[joint_idx]
action_values[joint_idx] = amplitude * math.sin(phase) # 正弦波:-3 到 3
# 左右腿和腰部保持为 0(已经在初始化时设为0了)
# 准备动作 tensor
try:
# Isaac Lab Arena 环境期望动作格式: (n_envs, action_dim) 的 torch.Tensor
action_tensor = torch.from_numpy(action_values.copy()).float()
# 确保是 2D tensor: (1, 36)
if action_tensor.dim() == 1:
action_tensor = action_tensor.unsqueeze(0) # (36,) -> (1, 36)
# 移动到正确的 device
action_tensor = action_tensor.to(env.device)
# 执行动作
obs, reward, terminated, truncated, info = env.step(action_tensor)
done = terminated | truncated
# 获取当前关节位置观察
if initial_joint_pos is None:
# 第一次:保存初始位置
if 'robot_joint_pos' in obs:
initial_joint_pos = obs['robot_joint_pos'].cpu().numpy().flatten()
print(f"\n初始关节位置形状: {initial_joint_pos.shape}")
print(f"前 8 个初始关节位置: {initial_joint_pos[:8]}")
if done.any():
print("\n环境回合结束,正在重置...")
obs, info = env.reset()
print("✓ 环境重置成功")
start_time = time.time() # 重置时间基准
initial_joint_pos = None # 重置初始位置记录
except Exception as e:
print(f"\n✗ 执行动作时出错: {e}")
print(f" 动作类型: {type(action_tensor)}")
print(f" 动作形状: {action_tensor.shape if hasattr(action_tensor, 'shape') else 'N/A'}")
import traceback
traceback.print_exc()
break
# 每 0.5 秒打印一次当前状态
current_time = time.time()
if current_time - last_print_time > 0.5:
elapsed = current_time - start_time
# 获取当前观察到的关节位置
current_joint_pos = None
joint_pos_changed = False
if 'robot_joint_pos' in obs and initial_joint_pos is not None:
current_joint_pos = obs['robot_joint_pos'].cpu().numpy().flatten()
# 计算位置变化
if current_joint_pos.shape == initial_joint_pos.shape:
joint_pos_diff = current_joint_pos - initial_joint_pos
joint_pos_changed = np.any(np.abs(joint_pos_diff) > 0.01) # 有大于 0.01 的变化
print(f"\n步数: {step_count} | 运行时间: {elapsed:.1f}s")
# 如果有关节名称或映射信息,显示所有区域的关节信息
if joint_names or action_mapping:
regions = {
"左腿": (0, 6),
"右腿": (6, 12),
"腰部": (12, 15),
"左臂": (15, 22),
"右臂": (22, 29),
"其他": (29, action_dim)
}
print(" 各区域当前状态:")
for region_name, (start, end) in regions.items():
if end <= action_dim:
region_actions = action_values[start:end]
# 对于左腿、右腿、腰部,显示所有关节都是0
if region_name in ["左腿", "右腿", "腰部"]:
# 显示前3个关节作为示例
sample_indices = list(range(start, min(start + 3, end)))
if joint_names:
sample_str = ", ".join([f"action[{j}]->关节{joint_names[j] if j < len(joint_names) else '?'}=0.0" for j in sample_indices])
elif action_mapping:
sample_str = ", ".join([f"action[{j}]->关节{action_mapping[j]['primary_joint'] if j in action_mapping else '?'}=0.0" for j in sample_indices])
else:
sample_str = ", ".join([f"action[{j}]=0.0" for j in sample_indices])
if len(sample_indices) < (end - start):
sample_str += f", ... (共 {end - start} 个 action,全部为 0)"
print(f" {region_name:6s}: {sample_str}")
else:
# 对于手臂,显示最大动作值的关节
if len(region_actions) > 0:
max_idx = start + np.argmax(np.abs(region_actions))
max_val = region_actions[np.argmax(np.abs(region_actions))]
if abs(max_val) > 0.01:
if joint_names and max_idx < len(joint_names):
joint_name = joint_names[max_idx]
elif action_mapping and max_idx in action_mapping:
joint_name = f"关节{action_mapping[max_idx]['primary_joint']}"
else:
joint_name = "未知"
print(f" {region_name:6s}: action[{max_idx:2d}] ({joint_name:25s}) = {max_val:.3f} (最大)")
print(f" 发送的 Action 摘要:")
print(f" 左腿+右腿+腰部 (0-14): 全部为 0")
print(f" 左臂 (15-21): 范围 [{action_values[15:22].min():.3f}, {action_values[15:22].max():.3f}]")
print(f" 右臂 (22-28): 范围 [{action_values[22:29].min():.3f}, {action_values[22:29].max():.3f}]")
else:
print(f" 发送的 Action 摘要:")
print(f" 左腿+右腿+腰部 (0-14): 全部为 0")
print(f" 手臂和其他关节 (15+): 范围 [{action_values[15:].min():.3f}, {action_values[15:].max():.3f}]")
print(f" 前 8 个 Action 值: {action_values[:8]}")
print(f" Action 范围: [{action_values.min():.3f}, {action_values.max():.3f}]")
print(f" 非零 Action 数: {np.count_nonzero(action_values)} / {len(action_values)}")
if current_joint_pos is not None and initial_joint_pos is not None:
print(f" 观察到的关节位置 (前 8 个): {current_joint_pos[:8]}")
print(f" 位置变化 (前 8 个): {joint_pos_diff[:8]}")
print(f" 最大位置变化: {np.abs(joint_pos_diff).max():.4f}")
if joint_pos_changed:
print(f" ✓ 关节位置在变化!")
else:
print(f" ✗ 关节位置未变化")
print(f" Reward: {reward.item():.4f}")
last_print_time = current_time
step_count += 1
time.sleep(0.01) # 控制循环频率(约 100Hz)
except KeyboardInterrupt:
print("\n\n程序被中断 (Ctrl+C)")
except Exception as e:
print(f"\n\n程序出错: {e}")
import traceback
traceback.print_exc()
finally:
env.close()
print("\n✓ 环境已关闭")
print("\n最终 Action 格式:")
print(f" - 形状: {action_values.shape}")
print(f" - 类型: {type(action_values)}")
print(f" - 前 10 个值: {action_values[:10]}")
print(f" - 动作范围: [{action_values.min():.3f}, {action_values.max():.3f}]")
if __name__ == "__main__":
main()