-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathmettabridge.py
More file actions
154 lines (143 loc) · 5.9 KB
/
mettabridge.py
File metadata and controls
154 lines (143 loc) · 5.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import sys
sys.path.append('./plugins/')
sys.path.append('./channels/')
from copy import deepcopy
from exploration import *
from narsplugin import narsplugin_init, call_nars, call_nars_tuple
from guiplugin import guiplugin_init, call_gui
from hyperon.ext import register_atoms
from hyperon import *
import time
import io
NAV_STATE_FAIL = "FAIL"
NAV_STATE_BUSY = "BUSY"
NAV_STATE_SUCCESS = "SUCCESS"
NAV_STATE = NAV_STATE_SUCCESS #"busy", "success" and "fail" (initially set to success)
def NAV_STATE_SET(NAV_STATE_ARG):
global NAV_STATE
NAV_STATE = NAV_STATE_ARG
def NAV_STATE_GET():
global NAV_STATE
return NAV_STATE
def ARM_STATE_SET(ARM_STATE_ARG):
global ARM_STATE
ARM_STATE = ARM_STATE_ARG
def ARM_STATE_GET():
global ARM_STATE
return ARM_STATE
class PatternOperation(OperationObject):
def __init__(self, name, op, unwrap=False, rec=False):
super().__init__(name, op, unwrap)
self.rec = rec
def execute(self, *args, res_typ=AtomType.UNDEFINED):
return super().execute(*args, res_typ=res_typ)
def wrapnpop(func):
def wrapper(*args):
a = [str("'"+arg) if arg is SymbolAtom else str(arg) for arg in args]
res = func(*a)
return [res]
return wrapper
MeTTaROS2Command = ""
def call_ros(*a):
global runner, MeTTaROS2Command
tokenizer = runner.tokenizer()
cmd = str(a[0])
parser = SExprParser(f"(nartech.ros.command {cmd})")
MeTTaROS2Command = cmd
return parser.parse(tokenizer)
objects = "()"
def call_ros_objects(*a):
global runner, objects
tokenizer = runner.tokenizer()
parser = SExprParser(objects)
return parser.parse(tokenizer)
def call_ros_arm(*a):
global runner, ARM_STATE
tokenizer = runner.tokenizer()
parser = SExprParser(ARM_STATE)
return parser.parse(tokenizer)
def call_ros_navigation(*a):
global runner, NAV_STATE
tokenizer = runner.tokenizer()
parser = SExprParser(NAV_STATE)
return parser.parse(tokenizer)
def space_init():
global runner
metta_code = None
for arg in sys.argv:
if arg.endswith(".metta"):
with open(arg, "r") as f:
metta_code = f.read()
runner = MeTTa()
runner.run("""
(= (nartech.ros.objects.filter $category $objects)
(collapse (let* (($obj (superpose $objects))
((detection $category (coordinates $x $y)) $obj))
$obj)))
""")
runner.register_atom("nartech.ros", G(PatternOperation('nartech.ros', wrapnpop(call_ros), unwrap=False)))
runner.register_atom("nartech.ros.arm", G(PatternOperation('nartech.ros.arm', wrapnpop(call_ros_arm), unwrap=False)))
runner.register_atom("nartech.ros.navigation", G(PatternOperation('nartech.ros.navigation', wrapnpop(call_ros_navigation), unwrap=False)))
runner.register_atom("nartech.ros.objects", G(PatternOperation('nartech.ros.objects', wrapnpop(call_ros_objects), unwrap=False)))
runner.register_atom("nartech.nars", G(PatternOperation('nartech.nars', wrapnpop(call_nars), unwrap=False)))
runner.register_atom("nartech.nars.tuple", G(PatternOperation('nartech.nars.tuple', wrapnpop(call_nars_tuple), unwrap=False)))
runner.register_atom("nartech.gui", G(PatternOperation('nartech.gui', wrapnpop(call_gui), unwrap=False)))
narsplugin_init(runner)
guiplugin_init(runner)
if metta_code is not None:
result = runner.run(metta_code)
for x in result:
print(x) # Only prints the return value
currentTime = 0
start_time = time.time()
def space_tick(node = None):
global currentTime, MeTTaROS2Command, runner, objects
elapsedTime = round(time.time() - start_time, 2)
if elapsedTime < 10 and node is not None:
return
cmd = MeTTaROS2Command
MeTTaROS2Command = ""
objects = "()"
if node:
if cmd == "right":
node.start_navigation_by_moves("right")
if cmd == "left":
node.start_navigation_by_moves("left")
if cmd == "up":
node.start_navigation_by_moves("up")
if cmd == "down":
node.start_navigation_by_moves("down")
if cmd == "drop":
node.drop()
if cmd.startswith("(go (coordinates "):
x_y = cmd.split("(go (coordinates ")[1].split(")")[0]
x = int(x_y.split(" ")[0])
y = int(x_y.split(" ")[1])
potential_label = cmd.split("(go (coordinates ")[1].split(")")[1].split(")")[0].strip()
node.start_navigation_to_coordinate((x, y), potential_label)
if cmd.startswith("(pick "):
label = cmd.split("(pick ")[1].split(")")[0]
node.pick(label)
alldetections = deepcopy(node.semantic_slam.previous_detections)
objects = "("
if "{SELF}" in node.semantic_slam.previous_detections:
(t, object_grid_x, object_grid_y, origin_x, origin_y, point_map, point_base_link, imagecoords_depth) = node.semantic_slam.previous_detections["{SELF}"]
x_y_unknown = BFS_for_nearest_unknown_cell(node.semantic_slam.low_res_grid, node.semantic_slam.new_width, node.semantic_slam.new_height, object_grid_x, object_grid_y)
if x_y_unknown:
(x_unknown,y_unknown) = x_y_unknown
alldetections["unknown"] = (time.time(), x_unknown, y_unknown, origin_x, origin_y, None, None, None)
print(alldetections)
for category in alldetections:
(t, object_grid_x, object_grid_y, origin_x, origin_y, point_map, point_base_link, imagecoords_depth) = alldetections[category]
SEXP = f"(detection {category} (coordinates {object_grid_x} {object_grid_y}))"
objects += SEXP
if category == "{SELF}":
SELF_position = (object_grid_x, object_grid_y)
objects += ")"
currentTime += 1
print("STEP", str(currentTime) +":", runner.run(f"!(Step {currentTime} {elapsedTime})"))
space_init()
if __name__ == "__main__":
while True:
space_tick()
time.sleep(0.01)