|
| 1 | +import json |
| 2 | +import logging |
| 3 | +import os |
| 4 | +import pickle |
| 5 | +import platform |
| 6 | +import subprocess |
| 7 | +import time |
| 8 | + |
| 9 | +import psutil |
| 10 | +import pytest |
| 11 | +import zmq |
| 12 | + |
| 13 | +import parsl.executors.high_throughput.zmq_pipes as zmq_pipes |
| 14 | +from parsl.executors.high_throughput.executor import DEFAULT_INTERCHANGE_LAUNCH_CMD |
| 15 | +from parsl.executors.high_throughput.manager_selector import RandomManagerSelector |
| 16 | +from parsl.version import VERSION as PARSL_VERSION |
| 17 | + |
| 18 | +P_ms = 10 |
| 19 | + |
| 20 | + |
| 21 | +@pytest.mark.local |
| 22 | +def test_exit_with_bad_registration(tmpd_cwd, try_assert): |
| 23 | + """Test that the interchange exits when it receives a bad registration message. |
| 24 | + This complements parsl/tests/test_scaling/test_worker_interchange_bad_messages_3262.py |
| 25 | + which tests that the interchange is resistent to other forms of bad message. |
| 26 | + """ |
| 27 | + |
| 28 | + outgoing_q = zmq_pipes.TasksOutgoing( |
| 29 | + "127.0.0.1", (49152, 65535), None |
| 30 | + ) |
| 31 | + incoming_q = zmq_pipes.ResultsIncoming( |
| 32 | + "127.0.0.1", (49152, 65535), None |
| 33 | + ) |
| 34 | + command_client = zmq_pipes.CommandClient( |
| 35 | + "127.0.0.1", (49152, 65535), None |
| 36 | + ) |
| 37 | + |
| 38 | + interchange_config = {"client_address": "127.0.0.1", |
| 39 | + "client_ports": (outgoing_q.port, |
| 40 | + incoming_q.port, |
| 41 | + command_client.port), |
| 42 | + "interchange_address": "127.0.0.1", |
| 43 | + "worker_ports": None, |
| 44 | + "worker_port_range": (50000, 60000), |
| 45 | + "hub_address": None, |
| 46 | + "hub_zmq_port": None, |
| 47 | + "logdir": tmpd_cwd, |
| 48 | + "heartbeat_threshold": 120, |
| 49 | + "poll_period": P_ms, |
| 50 | + "logging_level": logging.DEBUG, |
| 51 | + "cert_dir": None, |
| 52 | + "manager_selector": RandomManagerSelector(), |
| 53 | + "run_id": "test" |
| 54 | + } |
| 55 | + |
| 56 | + config_pickle = pickle.dumps(interchange_config) |
| 57 | + |
| 58 | + interchange_proc = subprocess.Popen(DEFAULT_INTERCHANGE_LAUNCH_CMD, stdin=subprocess.PIPE) |
| 59 | + stdin = interchange_proc.stdin |
| 60 | + assert stdin is not None, "Popen should have created an IO object (vs default None) because of PIPE mode" |
| 61 | + |
| 62 | + stdin.write(config_pickle) |
| 63 | + stdin.flush() |
| 64 | + stdin.close() |
| 65 | + |
| 66 | + # wait for interchange to be alive, by waiting for the command thread to become |
| 67 | + # responsive. if the interchange process didn't start enough to get the command |
| 68 | + # thread running, this will time out. |
| 69 | + |
| 70 | + (task_port, result_port) = command_client.run("WORKER_PORTS", timeout_s=120) |
| 71 | + |
| 72 | + # now we'll assume that if the interchange command thread is responding, |
| 73 | + # then the worker polling code is also running and that the interchange has |
| 74 | + # started successfully. |
| 75 | + |
| 76 | + # send bad registration message as if from a new worker pool. The badness here |
| 77 | + # is that the Python version does not match the real Python version - which |
| 78 | + # unlike some other bad interchange messages, should cause the interchange |
| 79 | + # to shut down. |
| 80 | + |
| 81 | + msg = {'type': 'registration', |
| 82 | + 'parsl_v': PARSL_VERSION, |
| 83 | + 'python_v': "{}.{}.{}".format(1, 1, 1), # this is the bad bit |
| 84 | + 'worker_count': 1, |
| 85 | + 'uid': 'testuid', |
| 86 | + 'block_id': 0, |
| 87 | + 'start_time': time.time(), |
| 88 | + 'prefetch_capacity': 0, |
| 89 | + 'max_capacity': 1, |
| 90 | + 'os': platform.system(), |
| 91 | + 'hostname': platform.node(), |
| 92 | + 'dir': os.getcwd(), |
| 93 | + 'cpu_count': psutil.cpu_count(logical=False), |
| 94 | + 'total_memory': psutil.virtual_memory().total, |
| 95 | + } |
| 96 | + |
| 97 | + # connect to worker port and send this message. |
| 98 | + |
| 99 | + context = zmq.Context() |
| 100 | + channel_timeout = 10000 # in milliseconds |
| 101 | + task_channel = context.socket(zmq.DEALER) |
| 102 | + task_channel.setsockopt(zmq.LINGER, 0) |
| 103 | + task_channel.setsockopt(zmq.IDENTITY, b'testid') |
| 104 | + |
| 105 | + task_channel.set_hwm(0) |
| 106 | + task_channel.setsockopt(zmq.SNDTIMEO, channel_timeout) |
| 107 | + task_channel.connect(f"tcp://127.0.0.1:{task_port}") |
| 108 | + |
| 109 | + b_msg = json.dumps(msg).encode('utf-8') |
| 110 | + |
| 111 | + task_channel.send(b_msg) |
| 112 | + |
| 113 | + # check that the interchange exits within some reasonable time |
| 114 | + try_assert(lambda: interchange_proc.poll() is not None, "Interchange did not exit after killing watched client process", timeout_ms=5000) |
| 115 | + |
| 116 | + # See issue #3697 - ideally the interchange would exit cleanly, but it does not. |
| 117 | + # assert interchange_proc.poll() == 0, "Interchange exited with an error code, not 0" |
| 118 | + |
| 119 | + task_channel.close() |
| 120 | + context.term() |
0 commit comments