Skip to content

Commit 3ae6c46

Browse files
committed
More test coverage
1 parent 4556842 commit 3ae6c46

File tree

5 files changed

+219
-65
lines changed

5 files changed

+219
-65
lines changed

gcaa/core/allocation.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import numpy as np
2+
3+
4+
def remove_completed_tasks(pos_t: np.ndarray, ind):
5+
"""
6+
pos_t: (nt, 2) array of task positions
7+
ind: list/array of completed task indices (1-based in MATLAB)
8+
"""
9+
10+
# Convert MATLAB's 1-based indices → Python 0-based
11+
ind_set = {ind}
12+
13+
# Keep only rows whose index is NOT in ind_set
14+
mask = [i not in ind_set for i in range(pos_t.shape[0])]
15+
pos_t_new = pos_t[mask, :]
16+
17+
return pos_t_new
18+
19+
20+
def update_path(p, pos_a, pos_t, time_step, agents, nt):
21+
"""
22+
p: list of lists of task indices (0-based)
23+
pos_a: (na, 2) agent positions
24+
pos_t: (nt, 2) task positions
25+
agents: object with Speed and Lt fields
26+
nt: number of remaining tasks
27+
"""
28+
29+
ind_completed_tasks = []
30+
31+
for i in range(pos_a.shape[0]):
32+
33+
if len(p[i]) == 0:
34+
continue # no remaining tasks for agent i
35+
36+
# Next task index (Python 0-based)
37+
task_idx = p[i][0]
38+
39+
d_a_t = pos_t[task_idx] - pos_a[i]
40+
dist = np.linalg.norm(d_a_t)
41+
42+
# Can agent reach this task within this timestep?
43+
if dist < time_step * agents.Speed[i]:
44+
45+
# Snap agent to task
46+
pos_a[i] = pos_t[task_idx]
47+
48+
nt -= 1
49+
agents.Lt[i] -= 1
50+
51+
# Record completed task
52+
ind_completed_tasks.append(p[i][0])
53+
54+
# Remove completed task from path
55+
p[i] = p[i][1:]
56+
57+
else:
58+
# Move agent toward task
59+
direction = d_a_t / dist
60+
pos_a[i] = pos_a[i] + direction * time_step * agents.Speed[i]
61+
62+
return p, pos_a, ind_completed_tasks, nt, agents

gcaa/core/dta.py

Lines changed: 4 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import json
21
import time
32
from dataclasses import dataclass
43
from textwrap import wrap
@@ -7,12 +6,12 @@
76
import numpy as np
87

98
from gcaa.algorithms.greedy import GCAASolution
10-
from gcaa.tools.constants import SIMU_DIR
119
from gcaa.core.control import ComputeCommandParamsWithVelocity, \
1210
OptimalControlSolution
1311
from gcaa.core.utility import CalcTaskUtility
1412
from gcaa.tools.basic import PrettyDict
15-
from gcaa.tools.disk import mkdir, dump_json
13+
from gcaa.tools.constants import SIMU_DIR
14+
from gcaa.tools.disk import dump_json
1615
from gcaa.tools.plotting import plotMapAllocation, PlotAgentRange
1716
from gcaa.tools.serialize import make_json_serializable
1817

@@ -36,67 +35,6 @@ class Agents:
3635
kdrag: float
3736

3837

39-
def remove_completed_tasks(pos_t: np.ndarray, ind):
40-
"""
41-
pos_t: (nt, 2) array of task positions
42-
ind: list/array of completed task indices (1-based in MATLAB)
43-
"""
44-
45-
# Convert MATLAB's 1-based indices → Python 0-based
46-
ind_set = {ind}
47-
48-
# Keep only rows whose index is NOT in ind_set
49-
mask = [i not in ind_set for i in range(pos_t.shape[0])]
50-
pos_t_new = pos_t[mask, :]
51-
52-
return pos_t_new
53-
54-
55-
def update_path(p, pos_a, pos_t, time_step, agents, nt):
56-
"""
57-
p: list of lists of task indices (0-based)
58-
pos_a: (na, 2) agent positions
59-
pos_t: (nt, 2) task positions
60-
agents: object with Speed and Lt fields
61-
nt: number of remaining tasks
62-
"""
63-
64-
ind_completed_tasks = []
65-
66-
for i in range(pos_a.shape[0]):
67-
68-
if len(p[i]) == 0:
69-
continue # no remaining tasks for agent i
70-
71-
# Next task index (Python 0-based)
72-
task_idx = p[i][0]
73-
74-
d_a_t = pos_t[task_idx] - pos_a[i]
75-
dist = np.linalg.norm(d_a_t)
76-
77-
# Can agent reach this task within this timestep?
78-
if dist < time_step * agents.Speed[i]:
79-
80-
# Snap agent to task
81-
pos_a[i] = pos_t[task_idx]
82-
83-
nt -= 1
84-
agents.Lt[i] -= 1
85-
86-
# Record completed task
87-
ind_completed_tasks.append(p[i][0])
88-
89-
# Remove completed task from path
90-
p[i] = p[i][1:]
91-
92-
else:
93-
# Move agent toward task
94-
direction = d_a_t / dist
95-
pos_a[i] = pos_a[i] + direction * time_step * agents.Speed[i]
96-
97-
return p, pos_a, ind_completed_tasks, nt, agents
98-
99-
10038
def optimal_control_dta(
10139
na=5,
10240
nt=4,
@@ -118,7 +56,8 @@ def optimal_control_dta(
11856
"""
11957

12058
Lt = 1
121-
nt_loiter = int(np.ceil(0.0 * nt)) if use_GCAA else 0
59+
loiter_ratio=.0
60+
nt_loiter = int(np.ceil(loiter_ratio * nt)) if use_GCAA else 0
12261
task_type = np.zeros(nt, dtype=int)
12362
task_type[:nt_loiter] = 1
12463
lambda_ = 1

gcaa/tests/test_allocation.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import numpy as np
2+
3+
from gcaa.core.allocation import remove_completed_tasks, update_path
4+
5+
6+
def test_remove_completed_tasks_single_index_removal():
7+
pos_t = np.array([
8+
[0.0, 0.0], # 0
9+
[1.0, 1.0], # 1
10+
[2.0, 2.0], # 2
11+
])
12+
13+
# Current implementation treats 'ind' as a single 0-based index
14+
out = remove_completed_tasks(pos_t, 1)
15+
# Expect row index 1 removed
16+
expected = np.array([[0.0, 0.0], [2.0, 2.0]])
17+
assert np.array_equal(out, expected)
18+
19+
# Index that doesn't match any (e.g., outside range) should keep mask true for all
20+
out2 = remove_completed_tasks(pos_t, 5)
21+
assert np.array_equal(out2, pos_t)
22+
23+
24+
def test_update_path_completion_and_motion():
25+
# Two agents, three tasks
26+
pos_a = np.array([
27+
[0.0, 0.0], # agent 0 near task 0
28+
[0.0, 0.0], # agent 1 far from task 2
29+
], dtype=float)
30+
31+
pos_t = np.array([
32+
[0.05, 0.0], # task 0 close to agent 0
33+
[1.0, 0.0], # task 1 (unused)
34+
[1.0, 0.0], # task 2 targeted by agent 1
35+
], dtype=float)
36+
37+
# Paths: agent 0 -> task 0, agent 1 -> task 2
38+
p = [[0], [2]]
39+
40+
class AgentsStub:
41+
def __init__(self):
42+
self.Speed = np.array([1.0, 0.1]) # agent 0 fast, agent 1 slow
43+
self.Lt = np.array([1, 1])
44+
45+
agents = AgentsStub()
46+
47+
time_step = 0.1
48+
nt = 3
49+
50+
p2, pos_a2, completed, nt2, agents2 = update_path(p, pos_a.copy(), pos_t, time_step, agents, nt)
51+
52+
# Agent 0 should complete task 0 since dist=0.05 < time_step*Speed=0.1
53+
assert 0 in completed
54+
assert np.allclose(pos_a2[0], pos_t[0])
55+
assert p2[0] == []
56+
assert agents2.Lt[0] == 0
57+
58+
# Agent 1 should move towards task 2 but not reach it
59+
# Initial dist is 1.0; step size is 0.1*0.1 = 0.01 along x-axis
60+
assert 2 not in completed
61+
assert np.allclose(pos_a2[1], [0.01, 0.0])
62+
assert p2[1] == [2]
63+
64+
# nt reduced by number of completed tasks (1)
65+
assert nt2 == nt - 1

gcaa/tests/test_dta.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,13 @@ class TestDTA(TestCase):
99
def setUp(self):
1010
np.random.seed(0)
1111

12+
def test_fixed_alloc(self):
13+
optimal_control_dta(
14+
n_rounds=4,
15+
na=3,
16+
nt=2,
17+
)
18+
1219
def test(self):
1320
results = optimal_control_dta(
1421
n_rounds=20,

gcaa/tests/test_utility.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import numpy as np
2+
3+
from gcaa.core.utility import MinimumCostAlongLoitering
4+
5+
6+
def test_minimum_cost_along_loitering_basic():
7+
agent_pos = np.array([0.0, 0.0])
8+
agent_va = np.array([0.0, 0.0])
9+
10+
task_pos = np.array([
11+
[1.0, 1.0],
12+
[2.0, -1.0]
13+
])
14+
15+
task_radius = np.array([0.5, 1.0])
16+
task_tloiter = np.array([1.0, 2.0])
17+
task_tf = np.array([5.0, 10.0])
18+
kdrag = 0.1
19+
j = 0
20+
21+
rin, vt, rho = MinimumCostAlongLoitering(
22+
agent_pos,
23+
agent_va,
24+
task_pos,
25+
task_radius,
26+
task_tloiter,
27+
task_tf,
28+
j,
29+
kdrag
30+
)
31+
32+
assert rin.shape == (2,)
33+
assert vt.shape == (2,)
34+
assert np.isfinite(rho)
35+
assert rho > 0
36+
37+
38+
def test_minimum_cost_monotonic_radius():
39+
agent_pos = np.array([0.0, 0.0])
40+
agent_va = np.array([0.0, 0.0])
41+
task_pos = np.array([[1.0, 0.0]])
42+
task_tloiter = np.array([1.0])
43+
task_tf = np.array([5.0])
44+
kdrag = 0.1
45+
j = 0
46+
47+
r_small = np.array([0.2])
48+
r_large = np.array([2.0])
49+
50+
_, _, rho_small = MinimumCostAlongLoitering(
51+
agent_pos, agent_va, task_pos,
52+
r_small, task_tloiter, task_tf, j, kdrag
53+
)
54+
_, _, rho_large = MinimumCostAlongLoitering(
55+
agent_pos, agent_va, task_pos,
56+
r_large, task_tloiter, task_tf, j, kdrag
57+
)
58+
59+
# Larger radius → longer loiter → higher cost
60+
assert rho_large > rho_small
61+
62+
63+
def test_minimum_cost_repeatability():
64+
# Ensure deterministic behavior (no randomness)
65+
agent_pos = np.array([0.0, 0.0])
66+
agent_va = np.array([0.0, 0.0])
67+
task_pos = np.array([[0.5, 0.8]])
68+
radius = np.array([1.0])
69+
tloiter = np.array([1.2])
70+
tf = np.array([4.0])
71+
j = 0
72+
kdrag = 0.2
73+
74+
r1 = MinimumCostAlongLoitering(agent_pos, agent_va, task_pos,
75+
radius, tloiter, tf, j, kdrag)
76+
r2 = MinimumCostAlongLoitering(agent_pos, agent_va, task_pos,
77+
radius, tloiter, tf, j, kdrag)
78+
79+
assert np.allclose(r1[0], r2[0])
80+
assert np.allclose(r1[1], r2[1])
81+
assert np.isclose(r1[2], r2[2])

0 commit comments

Comments
 (0)