Skip to content

Commit 447bf40

Browse files
authored
add test_time back (#2367)
1 parent 53ce294 commit 447bf40

File tree

1 file changed

+317
-0
lines changed

1 file changed

+317
-0
lines changed

tests/test_time.py

Lines changed: 317 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,317 @@
1+
"""Test the advanced schedulers."""
2+
3+
import unittest
4+
from unittest import TestCase, mock
5+
6+
from mesa.agent import Agent
7+
from mesa.model import Model
8+
from mesa.time import (
9+
BaseScheduler,
10+
RandomActivation,
11+
RandomActivationByType,
12+
SimultaneousActivation,
13+
StagedActivation,
14+
)
15+
16+
RANDOM = "random"
17+
STAGED = "staged"
18+
SIMULTANEOUS = "simultaneous"
19+
RANDOM_BY_TYPE = "random_by_type"
20+
21+
22+
class MockAgent(Agent):
23+
"""Minimalistic agent for testing purposes."""
24+
25+
def __init__(self, model): # noqa: D107
26+
super().__init__(model)
27+
self.steps = 0
28+
self.advances = 0
29+
30+
def kill_other_agent(self): # noqa: D102
31+
for agent in self.model.schedule.agents:
32+
if agent is not self:
33+
agent.remove()
34+
35+
def stage_one(self): # noqa: D102
36+
if self.model.enable_kill_other_agent:
37+
self.kill_other_agent()
38+
self.model.log.append(f"{self.unique_id}_1")
39+
40+
def stage_two(self): # noqa: D102
41+
self.model.log.append(f"{self.unique_id}_2")
42+
43+
def advance(self): # noqa: D102
44+
self.advances += 1
45+
46+
def step(self): # noqa: D102
47+
if self.model.enable_kill_other_agent:
48+
self.kill_other_agent()
49+
self.steps += 1
50+
self.model.log.append(self.unique_id)
51+
52+
53+
class MockModel(Model): # Noqa: D101
54+
def __init__(
55+
self, shuffle=False, activation=STAGED, enable_kill_other_agent=False, seed=None
56+
):
57+
"""Creates a Model instance with a schedule.
58+
59+
Args:
60+
shuffle (Bool): whether or not to instantiate a scheduler
61+
with shuffling.
62+
This option is only used for
63+
StagedActivation schedulers.
64+
65+
activation (str): which kind of scheduler to use.
66+
'random' creates a RandomActivation scheduler.
67+
'staged' creates a StagedActivation scheduler.
68+
The default scheduler is a BaseScheduler.
69+
enable_kill_other_agent: whether or not to enable kill_other_agent
70+
seed : rng
71+
"""
72+
super().__init__(seed=seed)
73+
self.log = []
74+
self.enable_kill_other_agent = enable_kill_other_agent
75+
76+
# Make scheduler
77+
if activation == STAGED:
78+
model_stages = ["stage_one", "model.model_stage", "stage_two"]
79+
self.schedule = StagedActivation(
80+
self, stage_list=model_stages, shuffle=shuffle
81+
)
82+
elif activation == RANDOM:
83+
self.schedule = RandomActivation(self)
84+
elif activation == SIMULTANEOUS:
85+
self.schedule = SimultaneousActivation(self)
86+
elif activation == RANDOM_BY_TYPE:
87+
self.schedule = RandomActivationByType(self)
88+
else:
89+
self.schedule = BaseScheduler(self)
90+
91+
# Make agents
92+
for _ in range(2):
93+
agent = MockAgent(self)
94+
self.schedule.add(agent)
95+
96+
def step(self): # noqa: D102
97+
self.schedule.step()
98+
99+
def model_stage(self): # noqa: D102
100+
self.log.append("model_stage")
101+
102+
103+
class TestStagedActivation(TestCase):
104+
"""Test the staged activation."""
105+
106+
expected_output = ["1_1", "1_1", "model_stage", "1_2", "1_2"]
107+
108+
def test_no_shuffle(self):
109+
"""Testing the staged activation without shuffling."""
110+
model = MockModel(shuffle=False)
111+
model.step()
112+
model.step()
113+
assert all(i == j for i, j in zip(model.log[:5], model.log[5:]))
114+
115+
def test_shuffle(self):
116+
"""Test the staged activation with shuffling."""
117+
model = MockModel(shuffle=True)
118+
model.step()
119+
for output in self.expected_output[:2]:
120+
assert output in model.log[:2]
121+
for output in self.expected_output[3:]:
122+
assert output in model.log[3:]
123+
assert self.expected_output[2] == model.log[2]
124+
125+
def test_shuffle_shuffles_agents(self): # noqa: D102
126+
model = MockModel(shuffle=True)
127+
a = mock.Mock()
128+
model.schedule._agents.random = a
129+
assert a.shuffle.call_count == 0
130+
model.step()
131+
assert a.shuffle.call_count == 1
132+
133+
def test_remove(self):
134+
"""Test the staged activation can remove an agent."""
135+
model = MockModel(shuffle=True)
136+
agents = list(model.schedule._agents)
137+
agent = agents[0]
138+
model.schedule.remove(agents[0])
139+
assert agent not in model.schedule.agents
140+
141+
def test_intrastep_remove(self):
142+
"""Test the staged activation can remove an agent in a step of another agent.
143+
144+
so that the one removed doesn't step.
145+
"""
146+
model = MockModel(shuffle=True, enable_kill_other_agent=True)
147+
model.step()
148+
assert len(model.log) == 3
149+
150+
def test_add_existing_agent(self): # noqa: D102
151+
model = MockModel()
152+
agent = model.schedule.agents[0]
153+
with self.assertRaises(Exception):
154+
model.schedule.add(agent)
155+
156+
157+
class TestRandomActivation(TestCase):
158+
"""Test the random activation."""
159+
160+
def test_init(self): # noqa: D102
161+
model = Model()
162+
agents = [MockAgent(model) for _ in range(10)]
163+
164+
scheduler = RandomActivation(model, agents)
165+
assert all(agent in scheduler.agents for agent in agents)
166+
167+
def test_random_activation_step_shuffles(self):
168+
"""Test the random activation step."""
169+
model = MockModel(activation=RANDOM)
170+
a = mock.Mock()
171+
model.schedule._agents.random = a
172+
model.schedule.step()
173+
assert a.shuffle.call_count == 1
174+
175+
def test_random_activation_step_increments_step_and_time_counts(self):
176+
"""Test the random activation step increments step and time counts."""
177+
model = MockModel(activation=RANDOM)
178+
assert model.schedule.steps == 0
179+
assert model.schedule.time == 0
180+
model.schedule.step()
181+
assert model.schedule.steps == 1
182+
assert model.schedule.time == 1
183+
184+
def test_random_activation_step_steps_each_agent(self):
185+
"""Test the random activation step causes each agent to step."""
186+
model = MockModel(activation=RANDOM)
187+
model.step()
188+
agent_steps = [i.steps for i in model.schedule.agents]
189+
# one step for each of 2 agents
190+
assert all(x == 1 for x in agent_steps)
191+
192+
def test_intrastep_remove(self):
193+
"""Test the random activation can remove an agent in a step of another agent.
194+
195+
so that the one removed doesn't step.
196+
"""
197+
model = MockModel(activation=RANDOM, enable_kill_other_agent=True)
198+
model.step()
199+
assert len(model.log) == 1
200+
201+
def test_get_agent_keys(self): # noqa: D102
202+
model = MockModel(activation=RANDOM)
203+
204+
keys = model.schedule.get_agent_keys()
205+
agent_ids = [agent.unique_id for agent in model.agents]
206+
assert all(entry_i == entry_j for entry_i, entry_j in zip(keys, agent_ids))
207+
208+
keys = model.schedule.get_agent_keys(shuffle=True)
209+
agent_ids = {agent.unique_id for agent in model.agents}
210+
assert all(entry in agent_ids for entry in keys)
211+
212+
def test_not_sequential(self): # noqa: D102
213+
model = MockModel(activation=RANDOM)
214+
# Create 10 agents
215+
for _ in range(10):
216+
model.schedule.add(MockAgent(model))
217+
# Run 3 steps
218+
for _ in range(3):
219+
model.step()
220+
# Filter out non-integer elements from the log
221+
filtered_log = [item for item in model.log if isinstance(item, int)]
222+
223+
# Check that there are no 18 consecutive agents id's in the filtered log
224+
total_agents = 10
225+
assert not any(
226+
all(
227+
(filtered_log[(i + j) % total_agents] - filtered_log[i]) % total_agents
228+
== j % total_agents
229+
for j in range(18)
230+
)
231+
for i in range(len(filtered_log))
232+
), f"Agents are activated sequentially:\n{filtered_log}"
233+
234+
235+
class TestSimultaneousActivation(TestCase):
236+
"""Test the simultaneous activation."""
237+
238+
def test_simultaneous_activation_step_steps_and_advances_each_agent(self):
239+
"""Test the simultaneous activation step causes each agent to step."""
240+
model = MockModel(activation=SIMULTANEOUS)
241+
model.step()
242+
# one step for each of 2 agents
243+
agent_steps = [i.steps for i in model.schedule.agents]
244+
agent_advances = [i.advances for i in model.schedule.agents]
245+
assert all(x == 1 for x in agent_steps)
246+
assert all(x == 1 for x in agent_advances)
247+
248+
249+
class TestRandomActivationByType(TestCase):
250+
"""Test the random activation by type.
251+
252+
TODO implement at least 2 types of agents, and test that step_type only
253+
does step for one type of agents, not the entire agents.
254+
"""
255+
256+
def test_init(self): # noqa: D102
257+
model = Model()
258+
agents = [MockAgent(model) for _ in range(10)]
259+
agents += [Agent(model) for _ in range(10)]
260+
261+
scheduler = RandomActivationByType(model, agents)
262+
assert all(agent in scheduler.agents for agent in agents)
263+
264+
def test_random_activation_step_shuffles(self):
265+
"""Test the random activation by type step."""
266+
model = MockModel(activation=RANDOM_BY_TYPE)
267+
a = mock.Mock()
268+
model.random = a
269+
for agentset in model.schedule._agents_by_type.values():
270+
agentset.random = a
271+
model.schedule.step()
272+
assert a.shuffle.call_count == 2
273+
274+
def test_random_activation_step_increments_step_and_time_counts(self):
275+
"""Test the random activation by type step increments step and time counts."""
276+
model = MockModel(activation=RANDOM_BY_TYPE)
277+
assert model.schedule.steps == 0
278+
assert model.schedule.time == 0
279+
model.schedule.step()
280+
assert model.schedule.steps == 1
281+
assert model.schedule.time == 1
282+
283+
def test_random_activation_step_steps_each_agent(self):
284+
"""Test the random activation by type step causes each agent to step."""
285+
model = MockModel(activation=RANDOM_BY_TYPE)
286+
model.step()
287+
agent_steps = [i.steps for i in model.schedule.agents]
288+
# one step for each of 2 agents
289+
assert all(x == 1 for x in agent_steps)
290+
291+
def test_random_activation_counts(self):
292+
"""Test the random activation by type step causes each agent to step."""
293+
model = MockModel(activation=RANDOM_BY_TYPE)
294+
295+
agent_types = model.agent_types
296+
for agent_type in agent_types:
297+
assert model.schedule.get_type_count(agent_type) == len(
298+
model.agents_by_type[agent_type]
299+
)
300+
301+
# def test_add_non_unique_ids(self):
302+
# """
303+
# Test that adding agent with duplicate ids result in an error.
304+
# TODO: we need to run this test on all schedulers, not just
305+
# TODO:: identical IDs is something for the agent, not the scheduler and should be tested there
306+
# RandomActivationByType.
307+
# """
308+
# model = MockModel(activation=RANDOM_BY_TYPE)
309+
# a = MockAgent(0, model)
310+
# b = MockAgent(0, model)
311+
# model.schedule.add(a)
312+
# with self.assertRaises(Exception):
313+
# model.schedule.add(b)
314+
315+
316+
if __name__ == "__main__":
317+
unittest.main()

0 commit comments

Comments
 (0)