Skip to content

Commit f20ab8e

Browse files
authored
Add first unit tests for process_pool_worker (#3766)
The only change of note is to move the `ArgumentParser` to a function to enable writing unit tests for it. # Changed Behaviour No user-functional change. ## Type of change - Code maintenance/cleanup
1 parent a0a24da commit f20ab8e

File tree

3 files changed

+193
-50
lines changed

3 files changed

+193
-50
lines changed

parsl/executors/high_throughput/process_worker_pool.py

Lines changed: 119 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -809,63 +809,132 @@ def start_file_logger(filename, rank, name='parsl', level=logging.DEBUG, format_
809809
return logger
810810

811811

812-
if __name__ == "__main__":
813-
814-
parser = argparse.ArgumentParser()
815-
parser.add_argument("-d", "--debug", action='store_true',
816-
help="Enable logging at DEBUG level")
817-
parser.add_argument("-a", "--addresses", default='',
818-
help="Comma separated list of addresses at which the interchange could be reached")
819-
parser.add_argument("--cert_dir", required=True,
820-
help="Path to certificate directory.")
821-
parser.add_argument("-l", "--logdir", default="process_worker_pool_logs",
822-
help="Process worker pool log directory")
823-
parser.add_argument("-u", "--uid", default=str(uuid.uuid4()).split('-')[-1],
824-
help="Unique identifier string for Manager")
825-
parser.add_argument("-b", "--block_id", default=None,
826-
help="Block identifier for Manager")
827-
parser.add_argument("-c", "--cores_per_worker", default="1.0",
828-
help="Number of cores assigned to each worker process. Default=1.0")
829-
parser.add_argument("-m", "--mem_per_worker", default=0,
830-
help="GB of memory assigned to each worker process. Default=0, no assignment")
831-
parser.add_argument("-t", "--task_port", required=True,
832-
help="REQUIRED: Task port for receiving tasks from the interchange")
833-
parser.add_argument("--max_workers_per_node", default=float('inf'),
834-
help="Caps the maximum workers that can be launched, default:infinity")
835-
parser.add_argument("-p", "--prefetch_capacity", default=0,
836-
help="Number of tasks that can be prefetched to the manager. Default is 0.")
837-
parser.add_argument("--hb_period", default=30,
838-
help="Heartbeat period in seconds. Uses manager default unless set")
839-
parser.add_argument("--hb_threshold", default=120,
840-
help="Heartbeat threshold in seconds. Uses manager default unless set")
841-
parser.add_argument("--drain_period", default=None,
842-
help="Drain this pool after specified number of seconds. By default, does not drain.")
843-
parser.add_argument("--address_probe_timeout", default=30,
844-
help="Timeout to probe for viable address to interchange. Default: 30s")
845-
parser.add_argument("--poll", default=10,
846-
help="Poll period used in milliseconds")
847-
parser.add_argument("-r", "--result_port", required=True,
848-
help="REQUIRED: Result port for posting results to the interchange")
812+
def get_arg_parser() -> argparse.ArgumentParser:
849813

850814
def strategyorlist(s: str):
851-
allowed_strategies = ["none", "block", "alternating", "block-reverse"]
815+
s = s.lower()
816+
allowed_strategies = ("none", "block", "alternating", "block-reverse")
852817
if s in allowed_strategies:
853818
return s
854819
elif s[0:4] == "list":
855820
return s
856-
else:
857-
raise argparse.ArgumentTypeError("cpu-affinity must be one of {} or a list format".format(allowed_strategies))
858-
859-
parser.add_argument("--cpu-affinity", type=strategyorlist,
860-
required=True,
861-
help="Whether/how workers should control CPU affinity.")
862-
parser.add_argument("--available-accelerators", type=str, nargs="*",
863-
help="Names of available accelerators, if not given assumed to be zero accelerators available", default=[])
864-
parser.add_argument("--enable_mpi_mode", action='store_true',
865-
help="Enable MPI mode")
866-
parser.add_argument("--mpi-launcher", type=str, choices=VALID_LAUNCHERS,
867-
help="MPI launcher to use iff enable_mpi_mode=true")
821+
err_msg = f"cpu-affinity must be one of {allowed_strategies} or a list format"
822+
raise argparse.ArgumentTypeError(err_msg)
868823

824+
parser = argparse.ArgumentParser()
825+
parser.add_argument(
826+
"-d", "--debug", action='store_true', help="Enable logging at DEBUG level",
827+
)
828+
parser.add_argument(
829+
"-a",
830+
"--addresses",
831+
default='',
832+
help="Comma separated list of addresses at which the interchange could be reached",
833+
)
834+
parser.add_argument(
835+
"--cert_dir", required=True, help="Path to certificate directory."
836+
)
837+
parser.add_argument(
838+
"-l",
839+
"--logdir",
840+
default="process_worker_pool_logs",
841+
help="Process worker pool log directory",
842+
)
843+
parser.add_argument(
844+
"-u",
845+
"--uid",
846+
default=str(uuid.uuid4()).split('-')[-1],
847+
help="Unique identifier string for Manager",
848+
)
849+
parser.add_argument(
850+
"-b", "--block_id", default=None, help="Block identifier for Manager"
851+
)
852+
parser.add_argument(
853+
"-c",
854+
"--cores_per_worker",
855+
default="1.0",
856+
help="Number of cores assigned to each worker process. Default=1.0",
857+
)
858+
parser.add_argument(
859+
"-m",
860+
"--mem_per_worker",
861+
default=0,
862+
help="GB of memory assigned to each worker process. Default=0, no assignment",
863+
)
864+
parser.add_argument(
865+
"-t",
866+
"--task_port",
867+
required=True,
868+
help="Task port for receiving tasks from the interchange",
869+
)
870+
parser.add_argument(
871+
"--max_workers_per_node",
872+
default=float('inf'),
873+
help="Caps the maximum workers that can be launched, default:infinity",
874+
)
875+
parser.add_argument(
876+
"-p",
877+
"--prefetch_capacity",
878+
default=0,
879+
help="Number of tasks that can be prefetched to the manager. Default is 0.",
880+
)
881+
parser.add_argument(
882+
"--hb_period",
883+
default=30,
884+
help="Heartbeat period in seconds. Uses manager default unless set",
885+
)
886+
parser.add_argument(
887+
"--hb_threshold",
888+
default=120,
889+
help="Heartbeat threshold in seconds. Uses manager default unless set",
890+
)
891+
parser.add_argument(
892+
"--drain_period",
893+
default=None,
894+
help="Drain this pool after specified number of seconds. By default, does not drain.",
895+
)
896+
parser.add_argument(
897+
"--address_probe_timeout",
898+
default=30,
899+
help="Timeout to probe for viable address to interchange. Default: 30s",
900+
)
901+
parser.add_argument(
902+
"--poll", default=10, help="Poll period used in milliseconds"
903+
)
904+
parser.add_argument(
905+
"-r",
906+
"--result_port",
907+
required=True,
908+
help="Result port for posting results to the interchange",
909+
)
910+
parser.add_argument(
911+
"--cpu-affinity",
912+
type=strategyorlist,
913+
required=True,
914+
help="Whether/how workers should control CPU affinity.",
915+
)
916+
parser.add_argument(
917+
"--available-accelerators",
918+
type=str,
919+
nargs="*",
920+
default=[],
921+
help="Names of available accelerators, if not given assumed to be zero accelerators available",
922+
)
923+
parser.add_argument(
924+
"--enable_mpi_mode", action='store_true', help="Enable MPI mode"
925+
)
926+
parser.add_argument(
927+
"--mpi-launcher",
928+
type=str,
929+
choices=VALID_LAUNCHERS,
930+
help="MPI launcher to use iff enable_mpi_mode=true",
931+
)
932+
933+
return parser
934+
935+
936+
if __name__ == "__main__":
937+
parser = get_arg_parser()
869938
args = parser.parse_args()
870939

871940
os.makedirs(os.path.join(args.logdir, "block-{}".format(args.block_id), args.uid), exist_ok=True)

parsl/tests/unit/executors/high_throughput/__init__.py

Whitespace-only changes.
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import sys
2+
from argparse import ArgumentError
3+
4+
import pytest
5+
6+
from parsl.executors.high_throughput import process_worker_pool
7+
8+
if sys.version_info < (3, 12):
9+
# exit_on_error bug; see https://github.com/python/cpython/issues/121018
10+
# "argparse.ArgumentParser.parses_args does not honor exit_on_error=False when
11+
# given unrecognized arguments"
12+
pytest.skip(allow_module_level=True, reason="exit_on_error argparse bug")
13+
14+
# due to above pytest.skip, mypy on < Py312 thinks this is unreachable. :facepalm:
15+
_known_required = ( # type: ignore[unreachable, unused-ignore]
16+
"--cert_dir",
17+
"--cpu-affinity",
18+
"--result_port",
19+
"--task_port",
20+
)
21+
22+
23+
@pytest.mark.local
24+
def test_arg_parser_exits_on_error():
25+
p = process_worker_pool.get_arg_parser()
26+
assert p.exit_on_error
27+
28+
29+
@pytest.mark.local
30+
def test_arg_parser_known_required():
31+
p = process_worker_pool.get_arg_parser()
32+
reqd = [a for a in p._actions if a.required]
33+
for a in reqd:
34+
assert a.option_strings[-1] in _known_required, "Update _known_required?"
35+
36+
37+
@pytest.mark.local
38+
@pytest.mark.parametrize("req", _known_required)
39+
def test_arg_parser_required(req):
40+
p = process_worker_pool.get_arg_parser()
41+
p.exit_on_error = False
42+
with pytest.raises(ArgumentError) as pyt_exc:
43+
p.parse_args([])
44+
45+
e_msg = pyt_exc.value.args[1]
46+
assert req in e_msg
47+
48+
49+
@pytest.mark.local
50+
@pytest.mark.parametrize("valid,val", (
51+
(True, "NoNe"),
52+
(True, "none"),
53+
(True, "block"),
54+
(True, "alternating"),
55+
(True, "block-reverse"),
56+
(True, "list"),
57+
(False, "asdf"),
58+
(False, ""),
59+
))
60+
def test_arg_parser_validates_cpu_affinity(valid, val):
61+
reqd_args = []
62+
reqd_args.extend(("--cert_dir", "/some/path"))
63+
reqd_args.extend(("--result_port", "123"))
64+
reqd_args.extend(("--task_port", "123"))
65+
reqd_args.extend(("--cpu-affinity", val))
66+
67+
p = process_worker_pool.get_arg_parser()
68+
p.exit_on_error = False
69+
if valid:
70+
p.parse_args(reqd_args)
71+
else:
72+
with pytest.raises(ArgumentError) as pyt_exc:
73+
p.parse_args(reqd_args)
74+
assert "must be one of" in pyt_exc.value.args[1]

0 commit comments

Comments
 (0)