diff --git a/pelita/scripts/pelita_player.py b/pelita/scripts/pelita_player.py index 71e762225..f4744a496 100755 --- a/pelita/scripts/pelita_player.py +++ b/pelita/scripts/pelita_player.py @@ -205,25 +205,35 @@ def player_handle_request(socket, poller, team, team_name_override=False, silent raise RuntimeError("Created bad reply message") -def check_team_name(name): - # Team name must be ascii +def sanitize_team_name(string): + """Strip all non-ascii characters from team name""" + sane = [] + # first of all, verify that the whole thing is valid unicode + # this should always be True, but who knows where do they get + # their strings from try: - name.encode('ascii') + string.encode('utf8') except UnicodeEncodeError: - raise ValueError('Invalid team name (non ascii): "%s".'%name) - # Team name must be shorter than 25 characters - if len(name) > 25: - raise ValueError('Invalid team name (longer than 25): "%s".'%name) - if len(name) == 0: - raise ValueError('Invalid team name (too short).') - # Check every character and make sure it is either - # a letter or a number. Nothing else is allowed. - for char in name: - if (not char.isalnum()) and (char != ' '): - raise ValueError('Invalid team name (only alphanumeric ' - 'chars or blanks): "%s"'%name) - if name.isspace(): - raise ValueError('Invalid team name (no letters): "%s"'%name) + raise ValueError(f'{string} is not valid Unicode') + for c in string.strip(): + if c.isspace(): + # convert newlines and other whitespace to blanks + char = ' ' + elif int(c.isalnum()): + char = c + else: + # ignore anything else + continue + sane.append(char) + if len(sane) == 25: + # break out of the loop when we have 25 chars + break + + name = ''.join(sane) + if name == '': + return '???' + + return ''.join(sane) def load_team(spec): @@ -246,7 +256,6 @@ def load_team(spec): print('ERROR: %s' % e, file=sys.stderr) raise - check_team_name(team.team_name) return team def load_team_from_module(path: str): @@ -308,7 +317,7 @@ def team_from_module(module): """ # look for a new-style team move = module.move - name = module.TEAM_NAME + name = sanitize_team_name(module.TEAM_NAME) if not callable(move): raise TypeError("move is not a function") diff --git a/pyproject.toml b/pyproject.toml index 9d0a20a05..8c6984e72 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -59,6 +59,9 @@ test = "pytest" [tool.pytest.ini_options] # addopts = --verbose python_files = ["test/test_*.py", "contrib/test_*.py"] +markers = [ + "cleanup_test_modules: Ensure the given modules are cleaned up after a test", +] [tool.coverage.run] relative_files = true diff --git a/test/fixtures/player_bad_team_name.py b/test/fixtures/player_bad_team_name.py deleted file mode 100644 index deda109d8..000000000 --- a/test/fixtures/player_bad_team_name.py +++ /dev/null @@ -1,5 +0,0 @@ -# Player with an overly long team name - -TEAM_NAME = "123456789 123456789 123456789 123456789" -def move(b, s): - return b.position diff --git a/test/test_network.py b/test/test_network.py index b2a3f0374..2a3fd83f8 100644 --- a/test/test_network.py +++ b/test/test_network.py @@ -138,138 +138,181 @@ def stopping(bot, state): assert res[0] == "success" -@pytest.mark.parametrize("checkpoint", range(12)) -def test_client_broken(zmq_context, checkpoint): - # This test runs a test game against a (malicious) server client - # (a malicious subprocess client is harder to test) - # Depending on the checkpoint selected, the broken test client will - # run up to a particular point and then send a malicious message. +def dealer_good(q, *, num_requests, timeout): + zmq_context = zmq.Context() + sock = zmq_context.socket(zmq.DEALER) + poll = zmq.Poller() - # Depending on whether this message occurs in the game setup stage - # or during the game run, this will either set the phase to FAILURE or - # let the good team win. Pelita itself should not break in the process. + port = sock.bind_to_random_port('tcp://127.0.0.1') + q.put(port) + + poll.register(sock, zmq.POLLIN) + _available_socks = poll.poll(timeout=timeout) + request = sock.recv_json() + assert request['REQUEST'] + sock.send_json({'__status__': 'ok', '__data__': {'team_name': 'good player'}}) + + _available_socks = poll.poll(timeout=timeout) + set_initial = sock.recv_json(flags=zmq.NOBLOCK) + if set_initial['__action__'] == 'exit': + return + assert set_initial['__action__'] == "set_initial" + sock.send_json({'__uuid__': set_initial['__uuid__'], '__return__': None}) + + for _i in range(num_requests): + _available_socks = poll.poll(timeout=timeout) + game_state = sock.recv_json(flags=zmq.NOBLOCK) + msg_id = game_state['__uuid__'] - timeout = 3000 + action = game_state['__action__'] + if action == 'exit': + return + assert set_initial['__action__'] == "set_initial" - q1 = queue.Queue() - q2 = queue.Queue() + current_pos = game_state['__data__']['game_state']['team']['bot_positions'][game_state['__data__']['game_state']['bot_turn']] + sock.send_json({'__uuid__': msg_id, '__return__': {'move': current_pos}}) - def dealer_good(q): - zmq_context = zmq.Context() - sock = zmq_context.socket(zmq.DEALER) - poll = zmq.Poller() + _available_socks = poll.poll(timeout=timeout) + exit_state = sock.recv_json(flags=zmq.NOBLOCK) - port = sock.bind_to_random_port('tcp://127.0.0.1') - q.put(port) + assert exit_state['__action__'] == 'exit' - poll.register(sock, zmq.POLLIN) - _available_socks = poll.poll(timeout=timeout) - request = sock.recv_json() - assert request['REQUEST'] - sock.send_json({'__status__': 'ok', '__data__': {'team_name': 'good player'}}) +def dealer_bad(q, *, team_name=None, num_requests, checkpoint, timeout): + zmq_context = zmq.Context() + sock = zmq_context.socket(zmq.DEALER) + poll = zmq.Poller() + port = sock.bind_to_random_port('tcp://127.0.0.1') + q.put(port) + + poll.register(sock, zmq.POLLIN) + # we set our recv to raise, if there is no message (zmq.NOBLOCK), + # so we do not need to care to check whether something is in the _available_socks + _available_socks = poll.poll(timeout=timeout) + + request = sock.recv_json(flags=zmq.NOBLOCK) + assert request['REQUEST'] + if checkpoint == 1: + sock.send_string("") + return + elif checkpoint == 2: + sock.send_json({'__status__': 'ok'}) + return + else: + if team_name is None: + team_name = f'bad <{checkpoint}>' + sock.send_json({'__status__': 'ok', '__data__': {'team_name': team_name}}) + + _available_socks = poll.poll(timeout=timeout) + + set_initial = sock.recv_json(flags=zmq.NOBLOCK) + + if checkpoint == 3: + sock.send_string("") + return + elif checkpoint == 4: + sock.send_json({'__uuid__': 'ok'}) + return + else: + sock.send_json({'__uuid__': set_initial['__uuid__'], '__data__': None}) + + for _i in range(num_requests): _available_socks = poll.poll(timeout=timeout) - set_initial = sock.recv_json(flags=zmq.NOBLOCK) - if set_initial['__action__'] == 'exit': + game_state = sock.recv_json(flags=zmq.NOBLOCK) + msg_id = game_state['__uuid__'] + + action = game_state['__action__'] + if action == 'exit': return - assert set_initial['__action__'] == "set_initial" - sock.send_json({'__uuid__': set_initial['__uuid__'], '__return__': None}) - for _i in range(8): - _available_socks = poll.poll(timeout=timeout) - game_state = sock.recv_json(flags=zmq.NOBLOCK) + current_pos = game_state['__data__']['game_state']['team']['bot_positions'][game_state['__data__']['game_state']['bot_turn']] + if checkpoint == 5: + sock.send_string("No json") + return + elif checkpoint == 6: + # This is an acceptable message that will never match a request + # We can send the correct message afterwards and the match continues + sock.send_json({'__uuid__': "Bad", '__return__': "Nothing"}) + sock.send_json({'__uuid__': msg_id, '__return__': {'move': current_pos}}) + elif checkpoint == 7: + sock.send_json({'__uuid__': msg_id, '__return__': {'move': [0, 0]}}) + return + elif checkpoint == 8: + sock.send_json({'__uuid__': msg_id, '__return__': {'move': "NOTHING"}}) + return + elif checkpoint == 9: + # cannot become a tuple + sock.send_json({'__uuid__': msg_id, '__return__': {'move': 12345}}) + return + elif checkpoint == 10: + sock.send_json({'__uuid__': msg_id, '__return__': "NOT A DICT"}) + return + else: + sock.send_json({'__uuid__': msg_id, '__return__': {'move': current_pos}}) - action = game_state['__action__'] - if action == 'exit': - return - assert set_initial['__action__'] == "set_initial" - current_pos = game_state['__data__']['game_state']['team']['bot_positions'][game_state['__data__']['game_state']['bot_turn']] - sock.send_json({'__uuid__': game_state['__uuid__'], '__return__': {'move': current_pos}}) +def test_bad_team_name_is_currently_not_tested_in_backend(zmq_context): + timeout = 3000 - _available_socks = poll.poll(timeout=timeout) - exit_state = sock.recv_json(flags=zmq.NOBLOCK) + q1 = queue.Queue() + q2 = queue.Queue() - assert exit_state['__action__'] == 'exit' + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + players = [] + players.append(executor.submit(dealer_good, q1, num_requests=8, timeout=timeout)) + players.append(executor.submit(dealer_bad, q2, team_name="Long bad team name 123456789 123456789!!", checkpoint=0, num_requests=8, timeout=timeout)) - def dealer_bad(q): - zmq_context = zmq.Context() - sock = zmq_context.socket(zmq.DEALER) - poll = zmq.Poller() + port1 = q1.get() + port2 = q2.get() - port = sock.bind_to_random_port('tcp://127.0.0.1') - q.put(port) + layout = {'walls': ((0, 0), (0, 1), (0, 2), (0, 3), (0, 4), (0, 5), (1, 0), (1, 5), (2, 0), (2, 5), (3, 0), (3, 2), (3, 3), (3, 5), (4, 0), (4, 2), (4, 3), (4, 5), (5, 0), (5, 5), (6, 0), (6, 5), (7, 0), (7, 1), (7, 2), (7, 3), (7, 4), (7, 5)), 'food': [(1, 1), (1, 2), (2, 1), (2, 2), (5, 3), (5, 4), (6, 3), (6, 4)], 'bots': [(1, 3), (6, 2), (1, 4), (6, 1)], 'shape': (8, 6)} - poll.register(sock, zmq.POLLIN) - # we set our recv to raise, if there is no message (zmq.NOBLOCK), - # so we do not need to care to check whether something is in the _available_socks - _available_socks = poll.poll(timeout=timeout) + game_state = setup_game([ + f'pelita://127.0.0.1:{port1}/PLAYER1', + f'pelita://127.0.0.1:{port2}/PLAYER2' + ], + layout_dict=layout, + max_rounds=2, + timeout_length=1, + ) - request = sock.recv_json(flags=zmq.NOBLOCK) - assert request['REQUEST'] - if checkpoint == 1: - sock.send_string("") - return - elif checkpoint == 2: - sock.send_json({'__status__': 'ok'}) - return - else: - sock.send_json({'__status__': 'ok', '__data__': {'team_name': f'bad <{checkpoint}>'}}) + # check that the game_state ends in the expected phase + # assert game_state['game_phase'] == 'FAILURE' - _available_socks = poll.poll(timeout=timeout) + while game_state['game_phase'] == 'RUNNING': + game_state = play_turn(game_state) - set_initial = sock.recv_json(flags=zmq.NOBLOCK) + assert game_state['team_names'] == ['good player', 'Long bad team name 123456789 123456789!!'] - if checkpoint == 3: - sock.send_string("") - return - elif checkpoint == 4: - sock.send_json({'__uuid__': 'ok'}) - return - else: - sock.send_json({'__uuid__': set_initial['__uuid__'], '__data__': None}) - - for _i in range(8): - _available_socks = poll.poll(timeout=timeout) - game_state = sock.recv_json(flags=zmq.NOBLOCK) - - action = game_state['__action__'] - if action == 'exit': - return - - current_pos = game_state['__data__']['game_state']['team']['bot_positions'][game_state['__data__']['game_state']['bot_turn']] - if checkpoint == 5: - sock.send_string("No json") - return - elif checkpoint == 6: - # This is an acceptable message that will never match a request - # We can send the correct message afterwards and the match continues - sock.send_json({'__uuid__': "Bad", '__return__': "Nothing"}) - sock.send_json({'__uuid__': game_state['__uuid__'], '__return__': {'move': current_pos}}) - elif checkpoint == 7: - sock.send_json({'__uuid__': game_state['__uuid__'], '__return__': {'move': [0, 0]}}) - return - elif checkpoint == 8: - sock.send_json({'__uuid__': game_state['__uuid__'], '__return__': {'move': "NOTHING"}}) - return - elif checkpoint == 9: - # cannot become a tuple - sock.send_json({'__uuid__': game_state['__uuid__'], '__return__': {'move': 12345}}) - return - elif checkpoint == 10: - sock.send_json({'__uuid__': game_state['__uuid__'], '__return__': "NOT A DICT"}) - return - else: - sock.send_json({'__uuid__': game_state['__uuid__'], '__return__': {'move': current_pos}}) + # check that no player had an uncaught exception + for player in concurrent.futures.as_completed(players): + assert player.exception() is None, traceback.print_exception(player.exception(), limit=None, file=None, chain=True) + + +@pytest.mark.parametrize("checkpoint", range(12)) +def test_client_broken(zmq_context, checkpoint): + # This test runs a test game against a (malicious) server client + # (a malicious subprocess client is harder to test) + # Depending on the checkpoint selected, the broken test client will + # run up to a particular point and then send a malicious message. + + # Depending on whether this message occurs in the game setup stage + # or during the game run, this will either set the phase to FAILURE or + # let the good team win. Pelita itself should not break in the process. + + timeout = 3000 + + q1 = queue.Queue() + q2 = queue.Queue() with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: players = [] - players.append(executor.submit(dealer_good, q1)) + players.append(executor.submit(dealer_good, q1, num_requests=8, timeout=timeout)) if checkpoint == 0: - players.append(executor.submit(dealer_good, q2)) + players.append(executor.submit(dealer_good, q2, num_requests=8, timeout=timeout)) else: - players.append(executor.submit(dealer_bad, q2)) + players.append(executor.submit(dealer_bad, q2, checkpoint=checkpoint, num_requests=8, timeout=timeout)) port1 = q1.get() port2 = q2.get() diff --git a/test/test_pelita_player.py b/test/test_pelita_player.py index c3535a7c5..6942125e7 100644 --- a/test/test_pelita_player.py +++ b/test/test_pelita_player.py @@ -5,7 +5,7 @@ import pytest -from pelita.scripts.pelita_player import load_team, load_team_from_module +from pelita.scripts.pelita_player import load_team, load_team_from_module, sanitize_team_name _mswindows = (sys.platform == "win32") @@ -27,134 +27,176 @@ def move(bot, state): return bot.position """ -# TODO: The modules should be unloaded after use -# If we import modules with the same name again, the results will be very unexpected - -class TestLoadFactory: - def test_simple_module_import(self): - # modules_before = list(sys.modules.keys()) - with tempfile.TemporaryDirectory() as d: - module = Path(d) / "teamx" - module.mkdir() - initfile = module / "__init__.py" - with initfile.open(mode='w') as f: - f.write(SIMPLE_MODULE) - - spec = str(module) - load_team_from_module(spec) - - def test_simple_file_import(self): - # modules_before = list(sys.modules.keys()) - with tempfile.TemporaryDirectory() as d: - module = Path(d) / "teamy" - module.mkdir() - initfile = module / "teamyy.py" - with initfile.open(mode='w') as f: - f.write(SIMPLE_MODULE) - - spec = str(initfile) +# We do not want to clutter sys.modules too much and also avoid import errors when +# a module of the same name has been imported before. +# With cleanup_test_modules we ensure that imported names are cleared again after a test + +class AutoCleanModules: + def __init__(self, modules): + self.modules = modules + +@pytest.fixture +def cleanup_test_modules(request): + + # Use a marker like + # @pytest.mark.cleanup_test_modules(["test-team-a", "test-team-b"]) + # to override the module names that are to be cleaned up + + marker = request.node.get_closest_marker("cleanup_test_modules") + if marker is None: + modules = ['test_module'] + else: + modules = marker.args[0] + + for module in modules: + if module in sys.modules: + raise RuntimeError(f"Test module {module} is already in sys.modules.") + + auto_clean_modules = AutoCleanModules(modules) + + yield auto_clean_modules + + for module in auto_clean_modules.modules: + del sys.modules[module] + + +@pytest.mark.cleanup_test_modules(["teamx"]) +def test_simple_module_import(cleanup_test_modules): + with tempfile.TemporaryDirectory() as d: + module = Path(d) / "teamx" + module.mkdir() + initfile = module / "__init__.py" + with initfile.open(mode='w') as f: + f.write(SIMPLE_MODULE) + + spec = str(module) + load_team_from_module(spec) + +@pytest.mark.cleanup_test_modules(["teamyy"]) +def test_simple_file_import(cleanup_test_modules): + with tempfile.TemporaryDirectory() as d: + module = Path(d) / "teamy" + module.mkdir() + initfile = module / "teamyy.py" + with initfile.open(mode='w') as f: + f.write(SIMPLE_MODULE) + + spec = str(initfile) + load_team_from_module(spec) + +@pytest.mark.cleanup_test_modules(["teamz"]) +def test_failing_import(cleanup_test_modules): + with tempfile.TemporaryDirectory() as d: + module = Path(d) / "teamz" + module.mkdir() + initfile = module / "__init__.py" + with initfile.open(mode='w') as f: + f.write(SIMPLE_FAILING_MODULE) + + spec = str(module) + with pytest.raises(AttributeError): load_team_from_module(spec) - def test_failing_import(self): - # modules_before = list(sys.modules.keys()) - with tempfile.TemporaryDirectory() as d: - module = Path(d) / "teamz" - module.mkdir() - initfile = module / "__init__.py" - with initfile.open(mode='w') as f: - f.write(SIMPLE_FAILING_MODULE) - - spec = str(module) - with pytest.raises(AttributeError): - load_team_from_module(spec) - - def test_import_of_pyc(self): - with tempfile.TemporaryDirectory() as d: - module = Path(d) / "teampyc" - module.mkdir() - initfile = module / "teampycpyc.py" - with initfile.open(mode='w') as f: - f.write(SIMPLE_MODULE) - pycfile = initfile.parent / "teampycpyc.pyc" - py_compile.compile(str(initfile), cfile=str(pycfile)) - initfile.unlink() - - spec = str(pycfile) +@pytest.mark.cleanup_test_modules(["teampycpyc"]) +def test_import_of_pyc(cleanup_test_modules): + with tempfile.TemporaryDirectory() as d: + module = Path(d) / "teampyc" + module.mkdir() + initfile = module / "teampycpyc.py" + with initfile.open(mode='w') as f: + f.write(SIMPLE_MODULE) + pycfile = initfile.parent / "teampycpyc.pyc" + py_compile.compile(str(initfile), cfile=str(pycfile)) + initfile.unlink() + + spec = str(pycfile) + load_team_from_module(spec) + +# No cleanup needed. Module is not imported +def test_failing_import_importerror(): + assert "teamzab" not in sys.modules + with tempfile.TemporaryDirectory() as d: + module = Path(d) / "teamzab" + module.mkdir() + initfile = module / "__init__.py" + with initfile.open(mode='w') as f: + f.write(MODULE_IMPORT_ERROR) + broken_module = module / "broken.py" + with broken_module.open(mode='w') as f: + f.write('this is a syntax error\n') + + spec = str(module) + + with pytest.raises(SyntaxError): load_team_from_module(spec) - def test_failing_import_importerror(self): - with tempfile.TemporaryDirectory() as d: - module = Path(d) / "teamzab" - module.mkdir() - initfile = module / "__init__.py" - with initfile.open(mode='w') as f: - f.write(MODULE_IMPORT_ERROR) - broken_module = module / "broken.py" - with broken_module.open(mode='w') as f: - f.write('this is a syntax error\n') - - spec = str(module) - - with pytest.raises(SyntaxError): - load_team_from_module(spec) - -class TestLoadTeam: - def test_simple_module_import_forbidden_names(self): - names = ["", " ", "-", "∂", "0" * 26] - for idx, name in enumerate(names): - # modules_before = list(sys.modules.keys()) - with tempfile.TemporaryDirectory() as d: - module = Path(d) / ("teamx_%i" % idx) - module.mkdir() - initfile = module / "__init__.py" - with initfile.open(mode='w') as f: - try: - f.write(SIMPLE_MODULE % (name,)) - except UnicodeEncodeError: - if _mswindows: - # Ignore UnicodeEncodeErrors on Windows for this test - # It is too complicate to debug this - continue - else: - raise - - spec = str(module) - with pytest.raises(ValueError): - load_team(spec) - - def test_simple_module_import_allowed_names(self): - names = ["a", "a a", "0" * 25] - for idx, name in enumerate(names): - # modules_before = list(sys.modules.keys()) - with tempfile.TemporaryDirectory() as d: - module = Path(d) / ("teamy_%i" % idx) - module.mkdir() - initfile = module / "__init__.py" - with initfile.open(mode='w') as f: - f.write(SIMPLE_MODULE % (name,)) - - spec = str(module) - load_team(spec) - - # These test cases need to be handled in one function - # ie. not in a parametrized test, as the will need - # to be run inside the same Python session - load_team_cases = [ - ("pelita/player/StoppingPlayer", None), -# ("StoppingPlayer,StoppingPlayer", None), - ("NonExistingPlayer", ImportError), -# ("StoppingPlayer,StoppingPlayer,FoodEatingPlayer", ValueError), - #('doc/source/groupN', AttributeError), # TODO: Should be rewritten for a proper team - #('doc/source/groupN/__init__.py', ImportError), # TODO: Should be rewritten for a proper team - #('doc/source/groupN', ValueError), # Has already been imported - ] - - def test_load_team(self): - for path, result in self.load_team_cases: - print(path, result) - if result is not None: - with pytest.raises(result): - load_team(path) - else: - load_team(path) - + assert "teamzab" not in sys.modules + + +@pytest.mark.parametrize('name, expected', [ + ("a", True), + ("a a", True), + ("0" * 25, True), + ("", "???"), + (" ", "???"), + ("-", "???"), + ("∂", "???"), + ("0" * 26, "0" * 25), + (" " + "0" * 26, "0" * 25), +]) +def test_player_import_name(name, expected, cleanup_test_modules): + with tempfile.TemporaryDirectory() as d: + # we must have a unused file name + team_file = Path(d) / "test_module.py" + with team_file.open(mode='w') as f: + try: + f.write(SIMPLE_MODULE % (name,)) + except UnicodeEncodeError: + if _mswindows: + # Ignore UnicodeEncodeErrors on Windows for this test + # It is too complicate to debug this + + # No module has been imported; ensure that nothing is cleaned up + cleanup_test_modules.modules = [] + return + else: + raise + + spec = str(team_file) + if expected is True: + assert load_team(spec).team_name == name + else: + assert load_team(spec).team_name == expected + + +@pytest.mark.parametrize('name, expected', [ + ("a", True), + ("a a", True), + ("0" * 25, True), + ("", "???"), + (" ", "???"), + ("-", "???"), + ("∂", "???"), + ("0" * 26, "0" * 25), + (" " + "0" * 26, "0" * 25), +]) +def test_sanitize_team_name(name, expected): + if expected is True: + assert sanitize_team_name(name) == name + else: + assert sanitize_team_name(name) == expected + + +@pytest.mark.parametrize('team_spec, expected', [ + ("pelita/player/StoppingPlayer", None), + ("NonExistingPlayer", ImportError), + #('doc/source/groupN', AttributeError), # TODO: Should be rewritten for a proper team + #('doc/source/groupN/__init__.py', ImportError), # TODO: Should be rewritten for a proper team +]) +def test_load_team(team_spec, expected): + + if expected is not None: + with pytest.raises(expected): + load_team(team_spec) + else: + load_team(team_spec) diff --git a/test/test_remote_game.py b/test/test_remote_game.py index be4659e4c..802f46da4 100644 --- a/test/test_remote_game.py +++ b/test/test_remote_game.py @@ -94,33 +94,6 @@ def test_remote_timeout(): assert state['timeouts'][1][(2, 1)]['type'] == 'timeout' -@pytest.mark.parametrize("failing_team", [0, 1]) -def test_bad_team_name(failing_team): - layout = """ - ########## - # b y # - #a .. x# - ########## - """ - - failing_player = FIXTURE_DIR / 'player_bad_team_name.py' - good_player = "0" - - if failing_team == 0: - teams = [str(failing_player), str(good_player)] - elif failing_team == 1: - teams = [str(good_player), str(failing_player)] - - state = pelita.game.run_game(teams, - max_rounds=8, - layout_dict=pelita.layout.parse_layout(layout), - timeout_length=0.4) - - assert state['whowins'] == -1 - assert state['fatal_errors'][failing_team][0]['type'] == "RemotePlayerFailure" - assert "longer than 25" in state['fatal_errors'][failing_team][0]['description'] - - def test_remote_dumps_are_written(): layout = """ ##########