Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions Lib/test/libregrtest/mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ disallow_untyped_defs = False
check_untyped_defs = False
warn_return_any = False

# Enable --strict-optional for these ASAP:
[mypy-Lib.test.libregrtest.run_workers.*]
strict_optional = False

# Various internal modules that typeshed deliberately doesn't have stubs for:
[mypy-_abc.*,_opcode.*,_overlapped.*,_testcapi.*,_testinternalcapi.*,test.*]
ignore_missing_imports = True
35 changes: 21 additions & 14 deletions Lib/test/libregrtest/run_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ def __init__(self,
super().__init__()


_NOT_RUNNING = "<not running>"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can make it public, it's not a deep secret.

Copy link
Member Author

@sobolevn sobolevn Nov 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that it needs to be public, since it is not used outside of this module and probably won't be. If you prefer to have it as public - I will change it :)



class WorkerThread(threading.Thread):
def __init__(self, worker_id: int, runner: "RunWorkers") -> None:
super().__init__()
Expand All @@ -111,8 +114,8 @@ def __init__(self, worker_id: int, runner: "RunWorkers") -> None:
self.output = runner.output
self.timeout = runner.worker_timeout
self.log = runner.log
self.test_name: TestName | None = None
self.start_time: float | None = None
self.test_name = _NOT_RUNNING
self.start_time = 0.0
self._popen: subprocess.Popen[str] | None = None
self._killed = False
self._stopped = False
Expand All @@ -127,9 +130,9 @@ def __repr__(self) -> str:
if test:
info.append(f'test={test}')
popen = self._popen
if popen is not None:
if popen is not None and self.start_time is not None:
dt = time.monotonic() - self.start_time
info.extend((f'pid={self._popen.pid}',
info.extend((f'pid={popen.pid}',
f'time={format_duration(dt)}'))
return '<%s>' % ' '.join(info)

Expand Down Expand Up @@ -318,6 +321,7 @@ def read_stdout(self, stdout_file: TextIO) -> str:

def read_json(self, json_file: JsonFile, json_tmpfile: TextIO | None,
stdout: str) -> tuple[TestResult, str]:
test_name = self.test_name
try:
if json_tmpfile is not None:
json_tmpfile.seek(0)
Expand All @@ -332,11 +336,11 @@ def read_json(self, json_file: JsonFile, json_tmpfile: TextIO | None,
# gh-101634: Catch UnicodeDecodeError if stdout cannot be
# decoded from encoding
err_msg = f"Failed to read worker process JSON: {exc}"
raise WorkerError(self.test_name, err_msg, stdout,
raise WorkerError(test_name, err_msg, stdout,
state=State.WORKER_BUG)

if not worker_json:
raise WorkerError(self.test_name, "empty JSON", stdout,
raise WorkerError(test_name, "empty JSON", stdout,
state=State.WORKER_BUG)

try:
Expand All @@ -345,7 +349,7 @@ def read_json(self, json_file: JsonFile, json_tmpfile: TextIO | None,
# gh-101634: Catch UnicodeDecodeError if stdout cannot be
# decoded from encoding
err_msg = f"Failed to parse worker process JSON: {exc}"
raise WorkerError(self.test_name, err_msg, stdout,
raise WorkerError(test_name, err_msg, stdout,
state=State.WORKER_BUG)

return (result, stdout)
Expand All @@ -363,14 +367,14 @@ def _runtest(self, test_name: TestName) -> MultiprocessResult:
stdout = self.read_stdout(stdout_file)

if retcode is None:
raise WorkerError(self.test_name, stdout=stdout,
raise WorkerError(test_name, stdout=stdout,
err_msg=None,
state=State.TIMEOUT)
if retcode != 0:
name = support.get_signal_name(retcode)
if name:
retcode = f"{retcode} ({name})"
raise WorkerError(self.test_name, f"Exit code {retcode}", stdout,
raise WorkerError(test_name, f"Exit code {retcode}", stdout,
state=State.WORKER_FAILED)

result, stdout = self.read_json(json_file, json_tmpfile, stdout)
Expand Down Expand Up @@ -401,7 +405,7 @@ def run(self) -> None:
except WorkerError as exc:
mp_result = exc.mp_result
finally:
self.test_name = None
self.test_name = _NOT_RUNNING
mp_result.result.duration = time.monotonic() - self.start_time
self.output.put((False, mp_result))

Expand All @@ -416,6 +420,9 @@ def run(self) -> None:

def _wait_completed(self) -> None:
popen = self._popen
# only needed for mypy:
if popen is None:
raise ValueError("Should never access `._popen` before calling `.run()`")

try:
popen.wait(WAIT_COMPLETED_TIMEOUT)
Expand Down Expand Up @@ -451,7 +458,7 @@ def get_running(workers: list[WorkerThread]) -> str | None:
running: list[str] = []
for worker in workers:
test_name = worker.test_name
if not test_name:
if not test_name or worker.start_time is None:
continue
dt = time.monotonic() - worker.start_time
if dt >= PROGRESS_MIN_TIME:
Expand Down Expand Up @@ -483,7 +490,7 @@ def __init__(self, num_workers: int, runtests: RunTests,
self.worker_timeout: float | None = min(self.timeout * 1.5, self.timeout + 5 * 60)
else:
self.worker_timeout = None
self.workers: list[WorkerThread] | None = None
self.workers: list[WorkerThread] = []

jobs = self.runtests.get_jobs()
if jobs is not None:
Expand All @@ -503,7 +510,7 @@ def start_workers(self) -> None:
processes = plural(nworkers, "process", "processes")
msg = (f"Run {tests} in parallel using "
f"{nworkers} worker {processes}")
if self.timeout:
if self.timeout and self.worker_timeout is not None:
msg += (" (timeout: %s, worker timeout: %s)"
% (format_duration(self.timeout),
format_duration(self.worker_timeout)))
Expand Down Expand Up @@ -555,7 +562,7 @@ def display_result(self, mp_result: MultiprocessResult) -> None:
if mp_result.err_msg:
# WORKER_BUG
text += ' (%s)' % mp_result.err_msg
elif (result.duration >= PROGRESS_MIN_TIME and not pgo):
elif (result.duration and result.duration >= PROGRESS_MIN_TIME and not pgo):
text += ' (%s)' % format_duration(result.duration)
if not pgo:
running = get_running(self.workers)
Expand Down
Loading