diff --git a/bin/problem.py b/bin/problem.py index 69696272b..fd19a45de 100644 --- a/bin/problem.py +++ b/bin/problem.py @@ -6,7 +6,7 @@ from collections.abc import Callable, Sequence from pathlib import Path -from typing import Any, Final, Literal, Optional, overload, TYPE_CHECKING +from typing import Any, Final, Literal, Optional, overload, TypeVar, TYPE_CHECKING if TYPE_CHECKING: # Prevent circular import: https://stackoverflow.com/a/39757388 from program import Program @@ -34,6 +34,66 @@ def check_unknown_keys(yaml_data: dict[str, Any], sub_key: Optional[str] = None) warn(f"found unknown problem.yaml key: {key} in {f'`{sub_key}`' if sub_key else 'root'}") +T = TypeVar("T") + + +def parse_optional_setting(yaml_data: dict[str, Any], key: str, t: type[T]) -> Optional[T]: + if key in yaml_data: + value = yaml_data.pop(key) + if isinstance(value, int) and t is float: + value = float(value) + if isinstance(value, t): + return value + if value == "" and (t is list or t is dict): + # handle empty yaml keys + return t() + warn(f"incompatible value for key '{key}' in problem.yaml. SKIPPED.") + return None + + +def parse_setting( + yaml_data: dict[str, Any], key: str, default: T, constraint: Optional[str] = None +) -> T: + value = parse_optional_setting(yaml_data, key, type(default)) + result = default if value is None else value + if constraint: + assert isinstance(result, (float, int)) + assert eval(f"{default} {constraint}") + if not eval(f"{result} {constraint}"): + warn( + f"value for '{key}' in problem.yaml should be {constraint} but is {result}. SKIPPED." + ) + return default + return result + + +def parse_optional_list_setting(yaml_data: dict[str, Any], key: str, t: type[T]) -> list[T]: + if key in yaml_data: + value = yaml_data.pop(key) + if isinstance(value, t): + return [value] + if isinstance(value, list): + if not all(isinstance(v, t) for v in value): + warn( + f"some values for key '{key}' in problem.yaml do not have type {t.__name__}. SKIPPED." + ) + return [] + if not value: + warn(f"value for '{key}' in problem.yaml should not be an empty list.") + return value + warn(f"incompatible value for key '{key}' in problem.yaml. SKIPPED.") + return [] + + +def parse_deprecated_setting( + yaml_data: dict[str, Any], key: str, new: Optional[str] = None +) -> None: + if key in yaml_data: + use = f", use '{new}' instead" if new else "" + warn(f"key '{key}' is deprecated{use}. SKIPPED.") + yaml_data.pop(key) + + class Person: def __init__(self, yaml_data: str | dict[str, Any]): if isinstance(yaml_data, dict): diff --git a/bin/program.py b/bin/program.py index 7786a3c1d..d48235502 100644 --- a/bin/program.py +++ b/bin/program.py @@ -1,11 +1,12 @@ import re import shutil import stat +import shlex import subprocess import threading from colorama import Fore from pathlib import Path -from typing import Final, Optional, TYPE_CHECKING +from typing import Any, Final, Mapping, Optional, Sequence, TypeVar, TYPE_CHECKING import config from util import * @@ -13,68 +14,150 @@ if TYPE_CHECKING: # Prevent circular import: https://stackoverflow.com/a/39757388 from problem import Problem -EXTRA_LANGUAGES: Final[str] = """ -checktestdata: - name: 'Checktestdata' - priority: 1 - files: '*.ctd' - run: 'checktestdata {mainfile}' - -viva: - name: 'Viva' - priority: 2 - files: '*.viva' - run: 'java -jar {viva_jar} {mainfile}' - -manual: - name: 'manual' - priority: 9999 - files: 'build run' - compile: '{build}' - run: '{run}' -""" - -SANITIZER_FLAGS: Final[str] = """ -cpp: - compile: -fsanitize=undefined,address -""" + +class Language: + def __init__(self, lang_id: str, conf: dict[str, Any]): + self.ok = True + self.id = lang_id + + T = TypeVar("T") + + def get_optional_value(key: str, t: type[T]) -> Optional[T]: + if key in conf: + value = conf.pop(key) + if isinstance(value, t): + return value + warn( + f"incompatible value for key '{key}' in languages.yaml for '{lang_id}'. SKIPPED." + ) + return None + + def get_value(key: str, t: type[T]) -> T: + if key in conf: + value = conf.pop(key) + if isinstance(value, t): + return value + error(f"incompatible value for key '{key}' in languages.yaml for '{lang_id}'") + else: + error(f"missing key '{key}' in languages.yaml for '{lang_id}'") + self.ok = False + return t() + + self.name = get_value("name", str) + self.priority = get_value("priority", int) + self.files = (get_optional_value("files", str) or "").split() + self.shebang = None + shebang = get_optional_value("shebang", str) + if shebang is not None: + try: + self.shebang = re.compile(shebang) + except re.error: + warn(f"invalid shebang in languages.yaml for '{lang_id}'. SKIPPED.") + self.compile = get_optional_value("compile", str) + self.run = get_value("run", str) + + def get_exe(key: str, command: str) -> Optional[str]: + try: + exe = shlex.split(command)[0] + if exe and exe[0] != "{": + return exe + except (IndexError, ValueError): + error(f"invalid value for key '{key}' in languages.yaml for '{lang_id}'") + self.ok = False + return None + + self.compile_exe = get_exe("compile", self.compile) if self.compile else None + self.run_exe = get_exe("run", self.run) + + for key in conf: + assert isinstance(key, str) + warn(f"found unknown languages.yaml key: '{key}' for '{lang_id}'") + + def __lt__(self, other: "Language") -> bool: + return self.id > other.id + + # Returns true when file f matches the shebang regex. + def matches_shebang(self, f: Path) -> bool: + if self.shebang is None: + return True + with f.open() as o: + return self.shebang.search(o.readline()) is not None + + +CHECKTESTDATA: Final[Language] = Language( + "BAPCtools:checktestdata", + { + "name": "Checktestdata", + "priority": 1, + "files": "*.ctd", + "run": "checktestdata {mainfile}", + }, +) +VIVA: Final[Language] = Language( + "BAPCtools:viva", + { + "name": "Viva", + "priority": 2, + "files": "*.viva", + "run": "java -jar {viva_jar} {mainfile}", + }, +) +EXTRA_LANGUAGES: Final[Sequence[Language]] = [ + CHECKTESTDATA, + VIVA, + Language( + "BAPCtools:manual", + { + "name": "manual", + "priority": 9999, + "files": "build run", + "compile": "{build}", + "run": "{run}", + }, + ), +] # The cached languages.yaml for the current contest. -_languages = None -_sanitizer = None +_languages: Optional[list[Language]] = None _program_config_lock = threading.Lock() -def languages(): +def languages() -> Sequence[Language]: global _languages, _program_config_lock with _program_config_lock: if _languages is not None: return _languages if Path("languages.yaml").is_file(): - _languages = read_yaml(Path("languages.yaml")) + raw_languages = read_yaml(Path("languages.yaml")) else: - _languages = read_yaml(config.TOOLS_ROOT / "config/languages.yaml") + raw_languages = read_yaml(config.TOOLS_ROOT / "config/languages.yaml") + if not isinstance(raw_languages, dict): + fatal("could not parse languages.yaml.") + + languages = [Language(lang_id, lang_conf) for lang_id, lang_conf in raw_languages.items()] + priorities: dict[int, str] = {} + _languages = [] + for lang in languages: + if not lang.ok: + continue + if lang.priority in priorities: + warn( + f"'{lang.id}' and '{priorities[lang.priority]}' have the same priority in languages.yaml." + ) + _languages.append(lang) + priorities[lang.priority] = lang.id - # Add custom languages. - extra_langs = parse_yaml(EXTRA_LANGUAGES) - for lang in extra_langs: - assert lang not in _languages - _languages[lang] = extra_langs[lang] + for lang in EXTRA_LANGUAGES: + assert lang.ok + _languages.append(lang) return _languages -def sanitizer(): - global _sanitizer, _program_config_lock - with _program_config_lock: - if _sanitizer is not None: - return _sanitizer - - # Read sanitizer extra flags - _sanitizer = parse_yaml(SANITIZER_FLAGS) - - return _sanitizer +SANITIZER_FLAGS: Final[Mapping[str, Mapping[str, str]]] = { + "c++": {"compile": "-fsanitize=undefined,address"}, +} # A Program is class that wraps a program (file/directory) on disk. A program is usually one of: @@ -108,8 +191,6 @@ def sanitizer(): # # build() will return the true if building was successfull. class Program: - input_files: list[Path] # Populated in Program.build - def __init__( self, problem: "Problem", @@ -128,6 +209,9 @@ def __init__( assert self.__class__ is not Program # Program is abstract and may not be instantiated + # read and parse languages.yaml + languages() + # Make sure we never try to build the same program twice. That'd be stupid. if not skip_double_build_warning: if path in problem._programs: @@ -179,82 +263,62 @@ def __init__( self.source_files = [] self.has_deps = False + self.input_files: list[Path] # Populated in Program.build + self.language: Language # Populated in Program.build + # is file at path executable @staticmethod def _is_executable(path: Path) -> bool: - return bool( - path.is_file() and (path.stat().st_mode & (stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH)) + return path.is_file() and bool( + path.stat().st_mode & (stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH) ) - # Returns true when file f matches the given shebang regex. - @staticmethod - def _matches_shebang(f: Path, shebang: Optional[re.Pattern]) -> bool: - if shebang is None: - return True - with f.open() as o: - return shebang.search(o.readline()) is not None - # Do not warn for the same fallback language multiple times. warn_cache: set[str] = set() - language: Optional[str] - # Sets self.language and self.env['mainfile'] def _get_language(self, bar: ProgressBar) -> bool: fallback = False candidates = [] for lang in languages(): - lang_conf = languages()[lang] - globs = lang_conf["files"].split() or [] - shebang = re.compile(lang_conf["shebang"]) if lang_conf.get("shebang") else None - priority = int(lang_conf["priority"]) - matching_files = [] for f in self.input_files: - if any(f.match(glob) for glob in globs) and Program._matches_shebang(f, shebang): + if any(f.match(glob) for glob in lang.files) and lang.matches_shebang(f): matching_files.append(f) if len(matching_files) == 0: continue candidates.append( - (priority // 1000, len(matching_files), priority, lang, matching_files) + ((lang.priority // 1000, len(matching_files), lang.priority), lang, matching_files) ) candidates.sort(reverse=True) - for _, _, priority, lang, files in candidates: - lang_conf = languages()[lang] - name = lang_conf["name"] - + for _, lang, files in candidates: + name = lang.name + # Make sure we can compile programs for this language. + if lang.compile_exe is not None and shutil.which(lang.compile_exe) is None: + fallback = True + if lang.compile_exe not in Program.warn_cache and config.args.verbose: + Program.warn_cache.add(lang.compile_exe) + bar.debug( + f"Compile program {lang.compile_exe} not found for language {name}. Falling back to lower priority languages." + ) + continue # Make sure we can run programs for this language. - if "compile" in lang_conf: - exe = lang_conf["compile"].split()[0] - if exe[0] != "{" and shutil.which(exe) is None: - fallback = True - if exe not in Program.warn_cache: - if config.args.verbose: - bar.debug( - f"Compile program {exe} not found for language {name}. Falling back to lower priority languages." - ) - Program.warn_cache.add(exe) - continue - assert "run" in lang_conf - exe = lang_conf["run"].split()[0] - if exe[0] != "{" and shutil.which(exe) is None: + if lang.run_exe is not None and shutil.which(lang.run_exe) is None: fallback = True - if exe not in Program.warn_cache: - if config.args.verbose: - Program.warn_cache.add(exe) - bar.debug( - f"Run program {exe} not found for language {name}. Falling back to lower priority languages." - ) + if lang.run_exe not in Program.warn_cache and config.args.verbose: + Program.warn_cache.add(lang.run_exe) + bar.debug( + f"Run program {lang.run_exe} not found for language {name}. Falling back to lower priority languages." + ) continue if fallback: - if lang not in Program.warn_cache: - if config.args.verbose: - Program.warn_cache.add(lang) - bar.debug(f"Falling back to {languages()[lang]['name']}.") + if lang.id not in Program.warn_cache and config.args.verbose: + Program.warn_cache.add(lang.id) + bar.debug(f"Falling back to {name}.") if len(files) == 0: self.ok = False @@ -308,7 +372,7 @@ def _checks(self, bar: ProgressBar) -> None: ) # Make sure C++ does not depend on stdc++.h, because it's not portable. - if self.language == "cpp": + if "c++" in self.language.name.lower(): for f in self.source_files: try: if f.read_text().find("bits/stdc++.h") != -1: @@ -324,7 +388,7 @@ def _checks(self, bar: ProgressBar) -> None: from validate import Validator if isinstance(self, Generator) or isinstance(self, Validator): - if self.language == "cpp": + if "c++" in self.language.name.lower(): for f in self.source_files: try: text = f.read_text() @@ -354,7 +418,7 @@ def _checks(self, bar: ProgressBar) -> None: ) except UnicodeDecodeError: pass - if self.language and "py" in self.language: + if "python" in self.language.name.lower(): for f in self.source_files: try: text = f.read_text() @@ -475,24 +539,22 @@ def build(self, bar: ProgressBar) -> bool: # A file containing the compile command and hash. meta_path = self.tmpdir / "meta_.yaml" - lang_config = languages()[self.language] - sanitizer_config = sanitizer() - - compile_command = lang_config["compile"] if "compile" in lang_config else "" - run_command = lang_config["run"] + compile_command = self.language.compile or "" + run_command = self.language.run if ( self.subdir == "submissions" and config.args.sanitizer - and self.language in sanitizer_config + and self.language.name in SANITIZER_FLAGS ): - if "compile" in sanitizer_config[self.language]: - compile_command += " " + sanitizer_config[self.language]["compile"] - if "run" in sanitizer_config[self.language]: - run_command += " " + sanitizer_config[self.language]["run"] + sanitizer = SANITIZER_FLAGS[self.language.name] + if "compile" in sanitizer: + compile_command += " " + sanitizer["compile"] + if "run" in sanitizer: + run_command += " " + sanitizer["run"] - self.compile_command = compile_command.format(**self.env).split() - self.run_command = run_command.format(**self.env).split() + self.compile_command = shlex.split(compile_command.format(**self.env)) + self.run_command = shlex.split(run_command.format(**self.env)) # Compare the hash to the last build. up_to_date = False @@ -519,7 +581,7 @@ def build(self, bar: ProgressBar) -> bool: return True - def _exec_command(self, *args, **kwargs) -> ExecResult: + def _exec_command(self, *args: Any, **kwargs: Any) -> ExecResult: if "timeout" not in kwargs and "timeout" in self.limits: kwargs["timeout"] = self.limits["timeout"] if "memory" not in kwargs and "memory" in self.limits: @@ -527,14 +589,14 @@ def _exec_command(self, *args, **kwargs) -> ExecResult: return exec_command(*args, **kwargs) @staticmethod - def add_callback(problem: "Problem", path: Path, c: Callable[["Program"], Any]): + def add_callback(problem: "Problem", path: Path, c: Callable[["Program"], None]) -> None: if path not in problem._program_callbacks: problem._program_callbacks[path] = [] problem._program_callbacks[path].append(c) class Generator(Program): - def __init__(self, problem: "Problem", path: Path, **kwargs): + def __init__(self, problem: "Problem", path: Path, **kwargs: Any): super().__init__( problem, path, @@ -547,7 +609,9 @@ def __init__(self, problem: "Problem", path: Path, **kwargs): # Run the generator in the given working directory. # May write files in |cwd| and stdout is piped to {name}.in if it's not written already. # Returns ExecResult. Success when result.status == ExecStatus.ACCEPTED. - def run(self, bar: ProgressBar, cwd: Path, name: str, args: list[str] = []) -> ExecResult: + def run( + self, bar: ProgressBar, cwd: Path, name: str, args: list[str | Path] = [] + ) -> ExecResult: assert self.run_command is not None in_path = cwd / (name + ".in") diff --git a/bin/stats.py b/bin/stats.py index ba357d128..eb7671b28 100644 --- a/bin/stats.py +++ b/bin/stats.py @@ -94,25 +94,23 @@ def problem_stats(problems: list[Problem]) -> None: ("subs", lambda p: len(glob(p.path, "submissions/*/*")), 6), ] languages = { - " c(++)": ["C", "C++"], - "py": ["Python 2", "Python 3", "CPython 2", "CPython 3"], - "java": ["Java"], - "kt": ["Kotlin"], + " c(++)": ["c", "c++"], + "py": ["python 2", "python 3", "cpython 2", "cpython 3"], + "java": ["java"], + "kt": ["kotlin"], } for column, lang_names in languages.items(): paths = [] lang_defined = False - for lang_id, lang_definition in program.languages().items(): - if lang_definition["name"] in lang_names: + for lang in program.languages(): + if lang.name.lower() in lang_names: lang_defined = True - # dict.get() returns None if key 'files' is not declared - lang_globs = lang_definition.get("files") + lang_globs = lang.files if lang_globs: - paths += [f"submissions/accepted/{glob}" for glob in lang_globs.split()] + paths += [f"submissions/accepted/{glob}" for glob in lang_globs] else: warn( - f"Language {lang_id} ('{lang_definition['name']}') " - "does not define `files:` in languages.yaml" + f"Language {lang.id} ('{lang.name}') does not define `files:` in languages.yaml" ) if paths: stats.append((column, list(set(paths)), 1)) @@ -358,11 +356,11 @@ def format_value( def format_row(*values: Optional[str | float | int | timedelta]) -> str: return format_string.format(*[format_value(value) for value in values]) - languages: dict[str, list[str] | Literal[True]] = { - "C(++)": ["C", "C++"], - "Python": ["Python 2", "Python 3", "CPython 2", "CPython 3"], - "Java": ["Java"], - "Kotlin": ["Kotlin"], + languages = { + "C(++)": ["c", "c++"], + "Python": ["python 2", "python 3", "cpython 2", "cpython 3"], + "Java": ["java"], + "Kotlin": ["kotlin"], } def get_submissions_row( @@ -373,9 +371,9 @@ def get_submissions_row( paths.append("accepted/*") else: assert isinstance(names, list) - for config in program.languages().values(): - if config["name"] in names: - globs = config["files"].split() or [] + for config in program.languages(): + if config.name.lower() in names: + globs = config.files paths += [f"accepted/{glob}" for glob in globs] paths = list(set(paths)) diff --git a/bin/util.py b/bin/util.py index 991b8f7c2..57376599f 100644 --- a/bin/util.py +++ b/bin/util.py @@ -818,61 +818,6 @@ def write_yaml( return None -T = TypeVar("T") - - -def parse_optional_setting(yaml_data: dict[str, Any], key: str, t: type[T]) -> Optional[T]: - if key in yaml_data: - value = yaml_data.pop(key) - if isinstance(value, int) and t is float: - value = float(value) - if isinstance(value, t): - return value - if value == "" and (t is list or t is dict): - # handle empty yaml keys - return t() - warn(f"incompatible value for key '{key}' in problem.yaml. SKIPPED.") - return None - - -def parse_setting( - yaml_data: dict[str, Any], key: str, default: T, constraint: Optional[str] = None -) -> T: - value = parse_optional_setting(yaml_data, key, type(default)) - result = default if value is None else value - if constraint and not eval(f"{result} {constraint}"): - warn(f"value for '{key}' in problem.yaml should be {constraint} but is {value}. SKIPPED.") - return default - return result - - -def parse_optional_list_setting(yaml_data: dict[str, Any], key: str, t: type[T]) -> list[T]: - if key in yaml_data: - value = yaml_data.pop(key) - if isinstance(value, t): - return [value] - if isinstance(value, list): - if not all(isinstance(v, t) for v in value): - warn( - f"some values for key '{key}' in problem.yaml do not have type {t.__name__}. SKIPPED." - ) - return [] - if not value: - warn(f"value for '{key}' in problem.yaml should not be an empty list.") - return value - warn(f"incompatible value for key '{key}' in problem.yaml. SKIPPED.") - return [] - - -def parse_deprecated_setting( - yaml_data: dict[str, Any], key: str, new: Optional[str] = None -) -> None: - if key in yaml_data: - use = f", use '{new}' instead" if new else "" - warn(f"key '{key}' is deprecated{use}. SKIPPED.") - yaml_data.pop(key) - - def _ask_variable(name: str, default: Optional[str] = None, allow_empty: bool = False) -> str: if config.args.defaults: if not default and not allow_empty: diff --git a/bin/validate.py b/bin/validate.py index 71d4fe56c..3e640c604 100644 --- a/bin/validate.py +++ b/bin/validate.py @@ -88,7 +88,10 @@ class Validator(program.Program): ExecResult.status == ExecStatus.REJECTED if the validator rejected. """ - FORMAT_VALIDATOR_LANGUAGES: Final[Sequence[str]] = ["checktestdata", "viva"] + FORMAT_VALIDATOR_LANGUAGES: Final[Sequence[program.Language]] = [ + program.CHECKTESTDATA, + program.VIVA, + ] def __repr__(self): return type(self).__name__ + ": " + str(self.path) @@ -173,7 +176,7 @@ def format_exec_code_map(returncode): return ExecStatus.TIMEOUT return ExecStatus.ERROR - if self.language == "checktestdata": + if self.language == program.CHECKTESTDATA: with main_path.open("rb") as main_file: return self._exec_command( self.run_command, @@ -182,7 +185,7 @@ def format_exec_code_map(returncode): cwd=cwd, ) - if self.language == "viva": + if self.language == program.VIVA: # Called as `viva validator.viva testcase.in`. result = self._exec_command( self.run_command + [main_path.absolute()], diff --git a/bin/verdicts.py b/bin/verdicts.py index 6a4a5f923..4a2ecd3b9 100644 --- a/bin/verdicts.py +++ b/bin/verdicts.py @@ -4,16 +4,16 @@ import threading from enum import Enum from pathlib import Path -from typing import Literal, Sequence, TYPE_CHECKING +from typing import Any, Literal, Optional, Sequence, TextIO, TYPE_CHECKING from colorama import Fore, Style import config import testcase -from util import ProgressBar +from util import ITEM_TYPE, ProgressBar if TYPE_CHECKING: - import run + pass class Verdict(Enum): @@ -26,7 +26,7 @@ class Verdict(Enum): VALIDATOR_CRASH = 5 COMPILER_ERROR = 6 - def __str__(self): + def __str__(self) -> str: return { Verdict.ACCEPTED: "ACCEPTED", Verdict.WRONG_ANSWER: "WRONG ANSWER", @@ -36,10 +36,10 @@ def __str__(self): Verdict.COMPILER_ERROR: "COMPILER ERROR", }[self] - def __lt__(self, other): + def __lt__(self, other: "Verdict") -> bool: return self.value < other.value - def short(self): + def short(self) -> str: return { Verdict.ACCEPTED: "AC", Verdict.WRONG_ANSWER: "WA", @@ -49,7 +49,7 @@ def short(self): Verdict.COMPILER_ERROR: "CE", }[self] - def color(self): + def color(self) -> str: return { Verdict.ACCEPTED: Fore.GREEN, Verdict.WRONG_ANSWER: Fore.RED, @@ -78,7 +78,7 @@ class RunUntil(Enum): ALL = 3 -def to_char(v: Verdict | None | Literal[False], lower: bool = False): +def to_char(v: Verdict | None | Literal[False], lower: bool = False) -> str: if v is None or v is False: return f"{Fore.BLUE}?{Style.RESET_ALL}" else: @@ -86,7 +86,7 @@ def to_char(v: Verdict | None | Literal[False], lower: bool = False): return f"{v.color()}{char}{Style.RESET_ALL}" -def to_string(v: Verdict | None | Literal[False]): +def to_string(v: Verdict | None | Literal[False]) -> str: if v is None or v is False: return to_char(v) else: @@ -199,10 +199,10 @@ def __init__( self.children[tg] = sorted(self.children[tg]) # Allow `with self` to lock. - def __enter__(self): + def __enter__(self) -> None: self.lock.__enter__() - def __exit__(self, *args): + def __exit__(self, *args) -> None: self.lock.__exit__(*args) def is_test_group(self, node: str) -> bool: @@ -217,7 +217,7 @@ def is_test_case(self, node: str) -> bool: """ return node not in self.children - def set(self, test_case: str, verdict: str | Verdict, duration: float): + def set(self, test_case: str, verdict: str | Verdict, duration: float) -> None: """Set the verdict and duration of the given test case (implying possibly others) verdict can be given as a Verdict or as a string using either long or @@ -229,7 +229,7 @@ def set(self, test_case: str, verdict: str | Verdict, duration: float): self.duration[test_case] = duration self._set_verdict_for_node(test_case, verdict, duration >= self.timeout) - def __getitem__(self, test_node) -> Verdict | None | Literal[False]: + def __getitem__(self, test_node: str) -> Verdict | None | Literal[False]: with self: return self.verdict[test_node] @@ -297,7 +297,7 @@ def aggregate(self, test_group: str) -> Verdict: assert first_error is not False return first_error - def _set_verdict_for_node(self, test_node: str, verdict: Verdict, timeout: bool): + def _set_verdict_for_node(self, test_node: str, verdict: Verdict, timeout: bool) -> None: # This assumes self.lock is already held. if timeout: assert verdict != Verdict.ACCEPTED @@ -374,8 +374,8 @@ def __init__(self, length: int, text: str): self.length = length self.text = text - def __iter__(self): - yield from [self.length, self.text] + def tuple(self) -> tuple[int, str]: + return (self.length, self.text) def __init__( self, @@ -420,7 +420,8 @@ def __init__( verdicts[-1].text += "s" if test_case in self.samples else "-" printed = self.name_width + 1 - for length, tmp in verdicts: + for verdict_value in verdicts: + length, tmp = verdict_value.tuple() if printed + 1 + length > self.width: lines.append(f"{str():{self.name_width + 1}}") printed = self.name_width + 1 @@ -443,18 +444,18 @@ def __init__( file=sys.stderr, ) - def next_submission(self, verdicts: Verdicts): + def next_submission(self, verdicts: Verdicts) -> None: self.results.append(verdicts) self.current_test_cases = set() - def add_test_case(self, test_case: str): + def add_test_case(self, test_case: str) -> None: self.current_test_cases.add(test_case) - def update_verdicts(self, test_case: str, verdict: str | Verdict, duration: float): + def update_verdicts(self, test_case: str, verdict: str | Verdict, duration: float) -> None: self.results[-1].set(test_case, verdict, duration) self.current_test_cases.discard(test_case) - def _clear(self, *, force: bool = True): + def _clear(self, *, force: bool = True) -> None: if force or self.print_without_force: if self.last_printed: actual_width = ProgressBar.columns @@ -480,7 +481,7 @@ def _get_verdict(self, s: int, test_case: str, check_sample: bool = True) -> str res = Style.DIM + to_char(None) return res - def print(self, **kwargs): + def print(self, **kwargs: Any) -> None: if config.args.tree: self._print_tree(**kwargs) else: @@ -492,7 +493,7 @@ def _print_tree( force: bool = True, new_lines: int = 1, printed_lengths: list[int] | None = None, - ): + ) -> None: if printed_lengths is None: printed_lengths = [] if force or self.print_without_force: @@ -551,7 +552,8 @@ def _print_tree( width = -1 if ProgressBar.columns - pref_len < 10 else self.width space = "" - for length, group in grouped: + for grouped_value in grouped: + length, group = grouped_value.tuple() if width >= 0 and printed + 1 + length > width: printed_text.append( f"\n{Style.DIM}{indent}{pipe} {pipe2} {Style.RESET_ALL}" @@ -592,7 +594,7 @@ def _print_table( force: bool = True, new_lines: int = 2, printed_lengths: list[int] | None = None, - ): + ) -> None: if printed_lengths is None: printed_lengths = [] if force or self.print_without_force: @@ -615,7 +617,8 @@ def _print_table( verdicts[-1].length += 1 verdicts[-1].text += self._get_verdict(s, test_case) - for length, tmp in verdicts: + for verdict_value in verdicts: + length, tmp = verdict_value.tuple() if self.width >= 0 and printed + 1 + length > self.width: printed_text.append(f"\n{str():{self.name_width + 1}}") printed_lengths.append(printed) @@ -632,12 +635,12 @@ def _print_table( def ProgressBar( self, - prefix, - max_len=None, - count=None, + prefix: str, + max_len: Optional[int] = None, + count: Optional[int] = None, *, - items=None, - needs_leading_newline=False, + items: Optional[Sequence[ITEM_TYPE]] = None, + needs_leading_newline: bool = False, ) -> "TableProgressBar": return TableProgressBar( self, @@ -650,7 +653,16 @@ def ProgressBar( class TableProgressBar(ProgressBar): - def __init__(self, table, prefix, max_len, count, *, items, needs_leading_newline): + def __init__( + self, + table: VerdictTable, + prefix: str, + max_len: Optional[int], + count: Optional[int], + *, + items: Optional[Sequence[ITEM_TYPE]], + needs_leading_newline: bool, + ): super().__init__( prefix, max_len, @@ -661,7 +673,7 @@ def __init__(self, table, prefix, max_len, count, *, items, needs_leading_newlin self.table = table # at the begin of any IO the progress bar locks so we can clear the table at this point - def __enter__(self): + def __enter__(self) -> None: super().__enter__() if ProgressBar.lock_depth == 1: if isinstance(sys.stderr, io.TextIOWrapper): @@ -670,7 +682,7 @@ def __enter__(self): self.table._clear(force=False) # at the end of any IO the progress bar unlocks so we can reprint the table at this point - def __exit__(self, *args): + def __exit__(self, *args: Any) -> None: if ProgressBar.lock_depth == 1: # ProgressBar.columns is just an educated guess for the number of printed chars # in the ProgressBar @@ -680,20 +692,43 @@ def __exit__(self, *args): print(end="", flush=True, file=sys.stderr) super().__exit__(*args) - def _print(self, *objects, sep="", end="\n", file=sys.stderr, flush=True): + def _print( + self, + *objects: Any, + sep: str = "", + end: str = "\n", + file: TextIO = sys.stderr, + flush: bool = True, + ) -> None: assert self._is_locked() # drop all flushes... print(*objects, sep=sep, end=end, file=file, flush=False) - # TODO #102: item has type `str` in the base class, but type `run.Run` here. - def start(self, item: "run.Run"): # type: ignore[override] - self.table.add_test_case(item.testcase.name) - return super().start(item) + def start(self, item: ITEM_TYPE = "") -> "TableProgressBar": + from run import Run - def done(self, success=True, message="", data="", print_item=True): - return super().done(success, message, data, print_item) + assert isinstance(item, Run) + self.table.add_test_case(item.testcase.name) + copy = super().start(item) + assert isinstance(copy, TableProgressBar) + return copy - def finalize(self, *, print_done=True, message=None, suppress_newline=False): + def done( + self, + success: bool = True, + message: str = "", + data: Optional[str] = None, + print_item: bool = True, + ) -> None: + super().done(success, message, data, print_item) + + def finalize( + self, + *, + print_done: bool = True, + message: Optional[str] = None, + suppress_newline: bool = False, + ) -> bool: with self: res = super().finalize( print_done=print_done, diff --git a/config/languages.yaml b/config/languages.yaml index 484a40e89..0bfb0af0f 100644 --- a/config/languages.yaml +++ b/config/languages.yaml @@ -169,7 +169,7 @@ haskell: java: name: 'Java' - priority: 800 + priority: 850 files: '*.java' compile: 'javac -encoding UTF-8 -sourcepath {path} -d {path} {files}' run: 'java -Dfile.encoding=UTF-8 -XX:+UseSerialGC -Xss64m -Xms{memlim}m -Xmx{memlim}m -cp {path} {mainclass}' @@ -281,7 +281,7 @@ cpython3: # Use .cpy to explicitly invoke CPython 3 cpython: name: 'Python 3' - priority: 800 + priority: 803 files: '*.cpy' compile: 'python3 -m py_compile {files}' run: 'python3 {mainfile}' @@ -313,13 +313,13 @@ scala: # Executes any .sh file. shell: name: 'Shell' - priority: 900 + priority: 910 files: '*.sh' run: 'sh {mainfile}' # Executes any .bash file. bash: name: 'Bash' - priority: 900 + priority: 920 files: '*.bash' run: 'bash {mainfile}'