Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions Lib/test/libregrtest/mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ disallow_untyped_defs = False
check_untyped_defs = False
warn_return_any = False

# Enable --strict-optional for these ASAP:
[mypy-Lib.test.libregrtest.run_workers.*]
strict_optional = False

# Various internal modules that typeshed deliberately doesn't have stubs for:
[mypy-_abc.*,_opcode.*,_overlapped.*,_testcapi.*,_testinternalcapi.*,test.*]
ignore_missing_imports = True
39 changes: 25 additions & 14 deletions Lib/test/libregrtest/run_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,13 @@ def __init__(self, worker_id: int, runner: "RunWorkers") -> None:
self._killed = False
self._stopped = False

def current_test_name(self) -> TestName:
if self.test_name is None:
raise ValueError(
'Should never call `.current_test_name()` before calling `.run()`'
)
return self.test_name

def __repr__(self) -> str:
info = [f'WorkerThread #{self.worker_id}']
if self.is_alive():
Expand All @@ -127,9 +134,9 @@ def __repr__(self) -> str:
if test:
info.append(f'test={test}')
popen = self._popen
if popen is not None:
if popen is not None and self.start_time is not None:
dt = time.monotonic() - self.start_time
info.extend((f'pid={self._popen.pid}',
info.extend((f'pid={popen.pid}',
f'time={format_duration(dt)}'))
return '<%s>' % ' '.join(info)

Expand Down Expand Up @@ -311,13 +318,14 @@ def read_stdout(self, stdout_file: TextIO) -> str:
except Exception as exc:
# gh-101634: Catch UnicodeDecodeError if stdout cannot be
# decoded from encoding
raise WorkerError(self.test_name,
raise WorkerError(self.current_test_name(),
f"Cannot read process stdout: {exc}",
stdout=None,
state=State.WORKER_BUG)

def read_json(self, json_file: JsonFile, json_tmpfile: TextIO | None,
stdout: str) -> tuple[TestResult, str]:
test_name = self.current_test_name()
try:
if json_tmpfile is not None:
json_tmpfile.seek(0)
Expand All @@ -332,11 +340,11 @@ def read_json(self, json_file: JsonFile, json_tmpfile: TextIO | None,
# gh-101634: Catch UnicodeDecodeError if stdout cannot be
# decoded from encoding
err_msg = f"Failed to read worker process JSON: {exc}"
raise WorkerError(self.test_name, err_msg, stdout,
raise WorkerError(test_name, err_msg, stdout,
state=State.WORKER_BUG)

if not worker_json:
raise WorkerError(self.test_name, "empty JSON", stdout,
raise WorkerError(test_name, "empty JSON", stdout,
state=State.WORKER_BUG)

try:
Expand All @@ -345,7 +353,7 @@ def read_json(self, json_file: JsonFile, json_tmpfile: TextIO | None,
# gh-101634: Catch UnicodeDecodeError if stdout cannot be
# decoded from encoding
err_msg = f"Failed to parse worker process JSON: {exc}"
raise WorkerError(self.test_name, err_msg, stdout,
raise WorkerError(test_name, err_msg, stdout,
state=State.WORKER_BUG)

return (result, stdout)
Expand All @@ -363,14 +371,14 @@ def _runtest(self, test_name: TestName) -> MultiprocessResult:
stdout = self.read_stdout(stdout_file)

if retcode is None:
raise WorkerError(self.test_name, stdout=stdout,
raise WorkerError(test_name, stdout=stdout,
err_msg=None,
state=State.TIMEOUT)
if retcode != 0:
name = support.get_signal_name(retcode)
if name:
retcode = f"{retcode} ({name})"
raise WorkerError(self.test_name, f"Exit code {retcode}", stdout,
raise WorkerError(test_name, f"Exit code {retcode}", stdout,
state=State.WORKER_FAILED)

result, stdout = self.read_json(json_file, json_tmpfile, stdout)
Expand All @@ -394,15 +402,15 @@ def run(self) -> None:
except StopIteration:
break

self.start_time = time.monotonic()
self.start_time = start_time = time.monotonic()
self.test_name = test_name
try:
mp_result = self._runtest(test_name)
except WorkerError as exc:
mp_result = exc.mp_result
finally:
self.test_name = None
mp_result.result.duration = time.monotonic() - self.start_time
mp_result.result.duration = time.monotonic() - start_time
self.output.put((False, mp_result))

if mp_result.result.must_stop(fail_fast, fail_env_changed):
Expand All @@ -416,6 +424,9 @@ def run(self) -> None:

def _wait_completed(self) -> None:
popen = self._popen
# only needed for mypy:
if popen is None:
raise ValueError("Should never access `._popen` before calling `.run()`")

try:
popen.wait(WAIT_COMPLETED_TIMEOUT)
Expand Down Expand Up @@ -451,7 +462,7 @@ def get_running(workers: list[WorkerThread]) -> str | None:
running: list[str] = []
for worker in workers:
test_name = worker.test_name
if not test_name:
if not test_name or worker.start_time is None:
continue
dt = time.monotonic() - worker.start_time
if dt >= PROGRESS_MIN_TIME:
Expand Down Expand Up @@ -483,7 +494,7 @@ def __init__(self, num_workers: int, runtests: RunTests,
self.worker_timeout: float | None = min(self.timeout * 1.5, self.timeout + 5 * 60)
else:
self.worker_timeout = None
self.workers: list[WorkerThread] | None = None
self.workers: list[WorkerThread] = []

jobs = self.runtests.get_jobs()
if jobs is not None:
Expand All @@ -503,7 +514,7 @@ def start_workers(self) -> None:
processes = plural(nworkers, "process", "processes")
msg = (f"Run {tests} in parallel using "
f"{nworkers} worker {processes}")
if self.timeout:
if self.timeout and self.worker_timeout is not None:
msg += (" (timeout: %s, worker timeout: %s)"
% (format_duration(self.timeout),
format_duration(self.worker_timeout)))
Expand Down Expand Up @@ -555,7 +566,7 @@ def display_result(self, mp_result: MultiprocessResult) -> None:
if mp_result.err_msg:
# WORKER_BUG
text += ' (%s)' % mp_result.err_msg
elif (result.duration >= PROGRESS_MIN_TIME and not pgo):
elif (result.duration and result.duration >= PROGRESS_MIN_TIME and not pgo):
text += ' (%s)' % format_duration(result.duration)
if not pgo:
running = get_running(self.workers)
Expand Down
Loading