From 5af4f857926c67a38277d0a832a4033540857139 Mon Sep 17 00:00:00 2001 From: mzuenni Date: Mon, 20 Oct 2025 03:46:18 +0200 Subject: [PATCH 01/12] add asserts to ensure safer use of eval --- bin/util.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/bin/util.py b/bin/util.py index eb584f6c..9b0cc450 100644 --- a/bin/util.py +++ b/bin/util.py @@ -840,9 +840,14 @@ def parse_setting( ) -> T: value = parse_optional_setting(yaml_data, key, type(default)) result = default if value is None else value - if constraint and not eval(f"{result} {constraint}"): - warn(f"value for '{key}' in problem.yaml should be {constraint} but is {value}. SKIPPED.") - return default + if constraint: + assert isinstance(result, (float, int)) + assert eval(f"{default} {constraint}") + if not eval(f"{result} {constraint}"): + warn( + f"value for '{key}' in problem.yaml should be {constraint} but is {result}. SKIPPED." + ) + return default return result From 0e37ad701ac1acb938f373604198284af61a4ef5 Mon Sep 17 00:00:00 2001 From: mzuenni Date: Mon, 20 Oct 2025 03:54:37 +0200 Subject: [PATCH 02/12] reworked languages parsing --- bin/program.py | 272 ++++++++++++++++++++++++++++++------------------- 1 file changed, 166 insertions(+), 106 deletions(-) diff --git a/bin/program.py b/bin/program.py index cde1633d..eb6ab1f8 100644 --- a/bin/program.py +++ b/bin/program.py @@ -1,11 +1,12 @@ import re import shutil import stat +import shlex import subprocess import threading from colorama import Fore from pathlib import Path -from typing import Final, Optional, TYPE_CHECKING +from typing import Any, Final, Mapping, Optional, Sequence, TYPE_CHECKING import config from util import * @@ -13,68 +14,145 @@ if TYPE_CHECKING: # Prevent circular import: https://stackoverflow.com/a/39757388 from problem import Problem -EXTRA_LANGUAGES: Final[str] = """ -checktestdata: - name: 'Checktestdata' - priority: 1 - files: '*.ctd' - run: 'checktestdata {mainfile}' - -viva: - name: 'Viva' - priority: 2 - files: '*.viva' - run: 'java -jar {viva_jar} {mainfile}' - -manual: - name: 'manual' - priority: 9999 - files: 'build run' - compile: '{build}' - run: '{run}' -""" - -SANITIZER_FLAGS: Final[str] = """ -cpp: - compile: -fsanitize=undefined,address -""" + +class Language: + def __init__(self, lang_id: str, conf: dict[str, Any]): + self.ok = True + + def get_optional_value(key: str, t: type[T]) -> Optional[T]: + if key in conf: + value = conf.pop(key) + if isinstance(value, t): + return value + warn( + f"incompatible value for key '{key}' in languages.yaml for '{lang_id}'. SKIPPED." + ) + return None + + def get_value(key: str, t: type[T]) -> T: + if key in conf: + value = conf.pop(key) + if isinstance(value, t): + return value + error(f"incompatible value for key '{key}' in languages.yaml for '{lang_id}'") + else: + error(f"missing key '{key}' in languages.yaml for '{lang_id}'") + self.ok = False + return t() + + self.id = lang_id + self.name = get_value("name", str) + self.priority = get_value("priority", int) + self.files = (get_optional_value("files", str) or "").split() + self.shebang = None + shebang = get_optional_value("shebang", str) + if shebang is not None: + try: + self.shebang = re.compile(shebang) + except re.error: + warn(f"invalid shebang in languages.yaml for '{lang_id}'. SKIPPED.") + self.compile = get_optional_value("compile", str) + self.run = get_value("run", str) + + def get_exe(command: str) -> str: + try: + return shlex.split(command)[0] + except (IndexError, ValueError) as e: + print(e) + self.ok = False + return "" + + self.compile_exe = get_exe(self.compile) if self.compile is not None else None + self.run_exe = get_exe(self.run) + + for key in conf: + assert isinstance(key, str) + warn(f"found unknown languages.yaml key: '{key}' for '{lang_id}'") + + # Returns true when file f matches the shebang regex. + def matches_shebang(self, f: Path) -> bool: + if self.shebang is None: + return True + with f.open() as o: + return self.shebang.search(o.readline()) is not None + + @staticmethod + def command_is_exe(command: Optional[str]) -> bool: + return bool(command and command[0] != "{") + + +EXTRA_LANGUAGES: Final[Sequence[Language]] = [ + Language( + "BAPCtools:checktestdata", + { + "name": "Checktestdata", + "priority": 1, + "files": "*.ctd", + "run": "checktestdata {mainfile}", + }, + ), + Language( + "BAPCtools:viva", + { + "name": "Viva", + "priority": 2, + "files": "*.viva", + "run": "java -jar {viva_jar} {mainfile}", + }, + ), + Language( + "BAPCtools:manual", + { + "name": "manual", + "priority": 9999, + "files": "build run", + "compile": "{build}", + "run": "{run}", + }, + ), +] # The cached languages.yaml for the current contest. -_languages = None -_sanitizer = None +_languages: Optional[list[Language]] = None _program_config_lock = threading.Lock() -def languages(): +def languages() -> Sequence[Language]: global _languages, _program_config_lock with _program_config_lock: if _languages is not None: return _languages if Path("languages.yaml").is_file(): - _languages = read_yaml(Path("languages.yaml")) + raw_languages = read_yaml(Path("languages.yaml")) else: - _languages = read_yaml(config.TOOLS_ROOT / "config/languages.yaml") + raw_languages = read_yaml(config.TOOLS_ROOT / "config/languages.yaml") + if not isinstance(raw_languages, dict): + fatal("could not parse languages.yaml.") + + languages = [Language(lang_id, lang_conf) for lang_id, lang_conf in raw_languages.items()] + priorities: dict[int, str] = {} + _languages = [] + for lang in languages: + if not lang.ok: + continue + if lang.priority in priorities: + warn( + f"'{lang.id}' and '{priorities[lang.priority]}' have the same priority in languages.yaml." + ) + _languages.append(lang) + priorities[lang.priority] = lang.id - # Add custom languages. - extra_langs = parse_yaml(EXTRA_LANGUAGES) - for lang in extra_langs: - assert lang not in _languages - _languages[lang] = extra_langs[lang] + for lang in EXTRA_LANGUAGES: + assert lang.ok + _languages.append(lang) return _languages -def sanitizer(): - global _sanitizer, _program_config_lock - with _program_config_lock: - if _sanitizer is not None: - return _sanitizer - - # Read sanitizer extra flags - _sanitizer = parse_yaml(SANITIZER_FLAGS) - - return _sanitizer +SANITIZER_FLAGS: Final[Mapping[str, Mapping[str, str]]] = { + "c++": {"compile": "-fsanitize=undefined,address"}, +} # A Program is class that wraps a program (file/directory) on disk. A program is usually one of: @@ -108,8 +186,6 @@ def sanitizer(): # # build() will return the (run_command, message) pair. class Program: - input_files: list[Path] # Populated in Program.build - def __init__( self, problem: "Problem", @@ -128,6 +204,9 @@ def __init__( assert self.__class__ is not Program # Program is abstract and may not be instantiated + # read and parse languages.yaml + languages() + # Make sure we never try to build the same program twice. That'd be stupid. if not skip_double_build_warning: if path in problem._programs: @@ -179,82 +258,65 @@ def __init__( self.source_files = [] self.has_deps = False + self.input_files: list[Path] # Populated in Program.build + self.language: Language # Populated in Program.build + # is file at path executable @staticmethod - def _is_executable(path): - return path.is_file() and ( + def _is_executable(path: Path) -> bool: + return path.is_file() and bool( path.stat().st_mode & (stat.S_IEXEC | stat.S_IXGRP | stat.S_IXOTH) ) - # Returns true when file f matches the given shebang regex. - @staticmethod - def _matches_shebang(f, shebang): - if shebang is None: - return True - with f.open() as o: - return shebang.search(o.readline()) - # Do not warn for the same fallback language multiple times. warn_cache: set[str] = set() - language: Optional[str] - # Sets self.language and self.env['mainfile'] def _get_language(self, bar: ProgressBar): fallback = False candidates = [] for lang in languages(): - lang_conf = languages()[lang] - globs = lang_conf["files"].split() or [] - shebang = re.compile(lang_conf["shebang"]) if lang_conf.get("shebang") else None - priority = int(lang_conf["priority"]) - matching_files = [] for f in self.input_files: - if any(f.match(glob) for glob in globs) and Program._matches_shebang(f, shebang): + if any(f.match(glob) for glob in lang.files) and lang.matches_shebang(f): matching_files.append(f) if len(matching_files) == 0: continue candidates.append( - (priority // 1000, len(matching_files), priority, lang, matching_files) + ((lang.priority // 1000, len(matching_files), lang.priority), lang, matching_files) ) candidates.sort(reverse=True) - for _, _, priority, lang, files in candidates: - lang_conf = languages()[lang] - name = lang_conf["name"] - - # Make sure we can run programs for this language. - if "compile" in lang_conf: - exe = lang_conf["compile"].split()[0] - if exe[0] != "{" and shutil.which(exe) is None: + for _, lang, files in candidates: + name = lang.name + # Make sure we can compile programs for this language. + if lang.compile_exe is not None: + exe = lang.compile_exe + if Language.command_is_exe(exe) and shutil.which(exe) is None: fallback = True - if exe not in Program.warn_cache: - if config.args.verbose: - bar.debug( - f"Compile program {exe} not found for language {name}. Falling back to lower priority languages." - ) - Program.warn_cache.add(exe) - continue - assert "run" in lang_conf - exe = lang_conf["run"].split()[0] - if exe[0] != "{" and shutil.which(exe) is None: - fallback = True - if exe not in Program.warn_cache: - if config.args.verbose: + if exe not in Program.warn_cache and config.args.verbose: Program.warn_cache.add(exe) bar.debug( - f"Run program {exe} not found for language {name}. Falling back to lower priority languages." + f"Compile program {exe} not found for language {name}. Falling back to lower priority languages." ) + continue + exe = lang.run_exe + # Make sure we can compile programs for this language. + if Language.command_is_exe(exe) and shutil.which(exe) is None: + fallback = True + if exe not in Program.warn_cache and config.args.verbose: + Program.warn_cache.add(exe) + bar.debug( + f"Run program {exe} not found for language {name}. Falling back to lower priority languages." + ) continue if fallback: - if lang not in Program.warn_cache: - if config.args.verbose: - Program.warn_cache.add(lang) - bar.debug(f"Falling back to {languages()[lang]['name']}.") + if lang.id not in Program.warn_cache and config.args.verbose: + Program.warn_cache.add(lang.id) + bar.debug(f"Falling back to {name}.") if len(files) == 0: self.ok = False @@ -308,7 +370,7 @@ def _checks(self, bar: ProgressBar): ) # Make sure C++ does not depend on stdc++.h, because it's not portable. - if self.language == "cpp": + if "c++" in self.language.name.lower(): for f in self.source_files: try: if f.read_text().find("bits/stdc++.h") != -1: @@ -324,7 +386,7 @@ def _checks(self, bar: ProgressBar): from validate import Validator if isinstance(self, Generator) or isinstance(self, Validator): - if self.language == "cpp": + if "c++" in self.language.name.lower(): for f in self.source_files: try: text = f.read_text() @@ -354,7 +416,7 @@ def _checks(self, bar: ProgressBar): ) except UnicodeDecodeError: pass - if self.language and "py" in self.language: + if "python" in self.language.name.lower(): for f in self.source_files: try: text = f.read_text() @@ -475,21 +537,19 @@ def build(self, bar: ProgressBar): # A file containing the compile command and hash. meta_path = self.tmpdir / "meta_.yaml" - lang_config = languages()[self.language] - sanitizer_config = sanitizer() - - compile_command = lang_config["compile"] if "compile" in lang_config else "" - run_command = lang_config["run"] + compile_command = self.language.compile or "" + run_command = self.language.run if ( self.subdir == "submissions" and config.args.sanitizer - and self.language in sanitizer_config + and self.language.name in SANITIZER_FLAGS ): - if "compile" in sanitizer_config[self.language]: - compile_command += " " + sanitizer_config[self.language]["compile"] - if "run" in sanitizer_config[self.language]: - run_command += " " + sanitizer_config[self.language]["run"] + sanitizer = SANITIZER_FLAGS[self.language.name] + if "compile" in sanitizer: + compile_command += " " + sanitizer["compile"] + if "run" in sanitizer: + run_command += " " + sanitizer["run"] self.compile_command = compile_command.format(**self.env).split() self.run_command = run_command.format(**self.env).split() From 598a1ae1e8f03b31ce2fddd98fed837dc806b49c Mon Sep 17 00:00:00 2001 From: mzuenni Date: Mon, 20 Oct 2025 03:55:27 +0200 Subject: [PATCH 03/12] make priorities disjoint --- config/languages.yaml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/config/languages.yaml b/config/languages.yaml index 484a40e8..0bfb0af0 100644 --- a/config/languages.yaml +++ b/config/languages.yaml @@ -169,7 +169,7 @@ haskell: java: name: 'Java' - priority: 800 + priority: 850 files: '*.java' compile: 'javac -encoding UTF-8 -sourcepath {path} -d {path} {files}' run: 'java -Dfile.encoding=UTF-8 -XX:+UseSerialGC -Xss64m -Xms{memlim}m -Xmx{memlim}m -cp {path} {mainclass}' @@ -281,7 +281,7 @@ cpython3: # Use .cpy to explicitly invoke CPython 3 cpython: name: 'Python 3' - priority: 800 + priority: 803 files: '*.cpy' compile: 'python3 -m py_compile {files}' run: 'python3 {mainfile}' @@ -313,13 +313,13 @@ scala: # Executes any .sh file. shell: name: 'Shell' - priority: 900 + priority: 910 files: '*.sh' run: 'sh {mainfile}' # Executes any .bash file. bash: name: 'Bash' - priority: 900 + priority: 920 files: '*.bash' run: 'bash {mainfile}' From 93272c9b0887fd7081268d817c24cf17e1774f09 Mon Sep 17 00:00:00 2001 From: mzuenni Date: Mon, 20 Oct 2025 04:05:14 +0200 Subject: [PATCH 04/12] fix stats --- bin/stats.py | 36 +++++++++++++++++------------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/bin/stats.py b/bin/stats.py index 9f0740d7..4fb01bf4 100644 --- a/bin/stats.py +++ b/bin/stats.py @@ -94,25 +94,23 @@ def problem_stats(problems: list[Problem]) -> None: ("subs", lambda p: len(glob(p.path, "submissions/*/*")), 6), ] languages = { - " c(++)": ["C", "C++"], - "py": ["Python 2", "Python 3", "CPython 2", "CPython 3"], - "java": ["Java"], - "kt": ["Kotlin"], + " c(++)": ["c", "c++"], + "py": ["python 2", "python 3", "cpython 2", "cpython 3"], + "java": ["java"], + "kt": ["kotlin"], } for column, lang_names in languages.items(): paths = [] lang_defined = False - for lang_id, lang_definition in program.languages().items(): - if lang_definition["name"] in lang_names: + for lang in program.languages(): + if lang.name.lower() in lang_names: lang_defined = True - # dict.get() returns None if key 'files' is not declared - lang_globs = lang_definition.get("files") + lang_globs = lang.files if lang_globs: - paths += [f"submissions/accepted/{glob}" for glob in lang_globs.split()] + paths += [f"submissions/accepted/{glob}" for glob in lang_globs] else: warn( - f"Language {lang_id} ('{lang_definition['name']}') " - "does not define `files:` in languages.yaml" + f"Language {lang.id} ('{lang.name}') does not define `files:` in languages.yaml" ) if paths: stats.append((column, list(set(paths)), 1)) @@ -358,11 +356,11 @@ def format_value( def format_row(*values: Optional[str | float | int | timedelta]) -> str: return format_string.format(*[format_value(value) for value in values]) - languages: dict[str, list[str] | Literal[True]] = { - "C(++)": ["C", "C++"], - "Python": ["Python 2", "Python 3", "CPython 2", "CPython 3"], - "Java": ["Java"], - "Kotlin": ["Kotlin"], + languages = { + "C(++)": ["c", "c++"], + "Python": ["python 2", "python 3", "cpython 2", "cpython 3"], + "Java": ["java"], + "Kotlin": ["kotlin"], } def get_submissions_row( @@ -373,9 +371,9 @@ def get_submissions_row( paths.append("accepted/*") else: assert isinstance(names, list) - for config in program.languages().values(): - if config["name"] in names: - globs = config["files"].split() or [] + for config in program.languages(): + if config.name.lower() in names: + globs = config.files paths += [f"accepted/{glob}" for glob in globs] paths = list(set(paths)) From 17a5be2f126b9ca6d91375c5e2d61d98c0a7c2a5 Mon Sep 17 00:00:00 2001 From: mzuenni Date: Mon, 20 Oct 2025 04:07:29 +0200 Subject: [PATCH 05/12] restore ids --- bin/program.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/bin/program.py b/bin/program.py index eb6ab1f8..5122e199 100644 --- a/bin/program.py +++ b/bin/program.py @@ -83,7 +83,7 @@ def command_is_exe(command: Optional[str]) -> bool: EXTRA_LANGUAGES: Final[Sequence[Language]] = [ Language( - "BAPCtools:checktestdata", + "checktestdata", { "name": "Checktestdata", "priority": 1, @@ -92,7 +92,7 @@ def command_is_exe(command: Optional[str]) -> bool: }, ), Language( - "BAPCtools:viva", + "viva", { "name": "Viva", "priority": 2, @@ -101,7 +101,7 @@ def command_is_exe(command: Optional[str]) -> bool: }, ), Language( - "BAPCtools:manual", + "manual", { "name": "manual", "priority": 9999, From 5f553cf31103545e54d9787aaca22cf406eef88b Mon Sep 17 00:00:00 2001 From: mzuenni Date: Mon, 20 Oct 2025 11:38:47 +0200 Subject: [PATCH 06/12] change how languages are compared --- bin/program.py | 41 +++++++++++++++++++++++------------------ bin/validate.py | 9 ++++++--- 2 files changed, 29 insertions(+), 21 deletions(-) diff --git a/bin/program.py b/bin/program.py index 5122e199..208756bc 100644 --- a/bin/program.py +++ b/bin/program.py @@ -69,6 +69,9 @@ def get_exe(command: str) -> str: assert isinstance(key, str) warn(f"found unknown languages.yaml key: '{key}' for '{lang_id}'") + def __lt__(self, other: "Language") -> bool: + return self.id > other.id + # Returns true when file f matches the shebang regex. def matches_shebang(self, f: Path) -> bool: if self.shebang is None: @@ -81,25 +84,27 @@ def command_is_exe(command: Optional[str]) -> bool: return bool(command and command[0] != "{") +CHECKTESTDATA: Final[Language] = Language( + "BAPCtools:checktestdata", + { + "name": "Checktestdata", + "priority": 1, + "files": "*.ctd", + "run": "checktestdata {mainfile}", + }, +) +VIVA: Final[Language] = Language( + "BAPCtools:checktestdata", + { + "name": "Checktestdata", + "priority": 1, + "files": "*.ctd", + "run": "checktestdata {mainfile}", + }, +) EXTRA_LANGUAGES: Final[Sequence[Language]] = [ - Language( - "checktestdata", - { - "name": "Checktestdata", - "priority": 1, - "files": "*.ctd", - "run": "checktestdata {mainfile}", - }, - ), - Language( - "viva", - { - "name": "Viva", - "priority": 2, - "files": "*.viva", - "run": "java -jar {viva_jar} {mainfile}", - }, - ), + CHECKTESTDATA, + VIVA, Language( "manual", { diff --git a/bin/validate.py b/bin/validate.py index 71d4fe56..3e640c60 100644 --- a/bin/validate.py +++ b/bin/validate.py @@ -88,7 +88,10 @@ class Validator(program.Program): ExecResult.status == ExecStatus.REJECTED if the validator rejected. """ - FORMAT_VALIDATOR_LANGUAGES: Final[Sequence[str]] = ["checktestdata", "viva"] + FORMAT_VALIDATOR_LANGUAGES: Final[Sequence[program.Language]] = [ + program.CHECKTESTDATA, + program.VIVA, + ] def __repr__(self): return type(self).__name__ + ": " + str(self.path) @@ -173,7 +176,7 @@ def format_exec_code_map(returncode): return ExecStatus.TIMEOUT return ExecStatus.ERROR - if self.language == "checktestdata": + if self.language == program.CHECKTESTDATA: with main_path.open("rb") as main_file: return self._exec_command( self.run_command, @@ -182,7 +185,7 @@ def format_exec_code_map(returncode): cwd=cwd, ) - if self.language == "viva": + if self.language == program.VIVA: # Called as `viva validator.viva testcase.in`. result = self._exec_command( self.run_command + [main_path.absolute()], From af4633a758ac88e19c5a069dc3db3addbad991a5 Mon Sep 17 00:00:00 2001 From: mzuenni Date: Mon, 20 Oct 2025 11:50:21 +0200 Subject: [PATCH 07/12] fix viva --- bin/program.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/bin/program.py b/bin/program.py index 208756bc..6f8daf53 100644 --- a/bin/program.py +++ b/bin/program.py @@ -94,12 +94,12 @@ def command_is_exe(command: Optional[str]) -> bool: }, ) VIVA: Final[Language] = Language( - "BAPCtools:checktestdata", + "BAPCtools:viva", { - "name": "Checktestdata", - "priority": 1, - "files": "*.ctd", - "run": "checktestdata {mainfile}", + "name": "Viva", + "priority": 2, + "files": "*.viva", + "run": "java -jar {viva_jar} {mainfile}", }, ) EXTRA_LANGUAGES: Final[Sequence[Language]] = [ From 7021f3977eab8c50780f6b63d89b65b503aad95a Mon Sep 17 00:00:00 2001 From: mzuenni Date: Mon, 20 Oct 2025 14:22:39 +0200 Subject: [PATCH 08/12] move methods (the error message has problem.yaml hardcoded) --- bin/problem.py | 62 +++++++++++++++++++++++++++++++++++++++++++++++++- bin/program.py | 8 ++++--- bin/util.py | 60 ------------------------------------------------ 3 files changed, 66 insertions(+), 64 deletions(-) diff --git a/bin/problem.py b/bin/problem.py index 6e890cf7..32fabdf2 100644 --- a/bin/problem.py +++ b/bin/problem.py @@ -6,7 +6,7 @@ from collections.abc import Callable, Sequence from pathlib import Path -from typing import Any, Final, Literal, Optional, overload, TYPE_CHECKING +from typing import Any, Final, Literal, Optional, overload, TypeVar, TYPE_CHECKING if TYPE_CHECKING: # Prevent circular import: https://stackoverflow.com/a/39757388 from program import Program @@ -33,6 +33,66 @@ def check_unknown_keys(yaml_data: dict[str, Any], sub_key: Optional[str] = None) warn(f"found unknown problem.yaml key: {key} in {f'`{sub_key}`' if sub_key else 'root'}") +T = TypeVar("T") + + +def parse_optional_setting(yaml_data: dict[str, Any], key: str, t: type[T]) -> Optional[T]: + if key in yaml_data: + value = yaml_data.pop(key) + if isinstance(value, int) and t is float: + value = float(value) + if isinstance(value, t): + return value + if value == "" and (t is list or t is dict): + # handle empty yaml keys + return t() + warn(f"incompatible value for key '{key}' in problem.yaml. SKIPPED.") + return None + + +def parse_setting( + yaml_data: dict[str, Any], key: str, default: T, constraint: Optional[str] = None +) -> T: + value = parse_optional_setting(yaml_data, key, type(default)) + result = default if value is None else value + if constraint: + assert isinstance(result, (float, int)) + assert eval(f"{default} {constraint}") + if not eval(f"{result} {constraint}"): + warn( + f"value for '{key}' in problem.yaml should be {constraint} but is {result}. SKIPPED." + ) + return default + return result + + +def parse_optional_list_setting(yaml_data: dict[str, Any], key: str, t: type[T]) -> list[T]: + if key in yaml_data: + value = yaml_data.pop(key) + if isinstance(value, t): + return [value] + if isinstance(value, list): + if not all(isinstance(v, t) for v in value): + warn( + f"some values for key '{key}' in problem.yaml do not have type {t.__name__}. SKIPPED." + ) + return [] + if not value: + warn(f"value for '{key}' in problem.yaml should not be an empty list.") + return value + warn(f"incompatible value for key '{key}' in problem.yaml. SKIPPED.") + return [] + + +def parse_deprecated_setting( + yaml_data: dict[str, Any], key: str, new: Optional[str] = None +) -> None: + if key in yaml_data: + use = f", use '{new}' instead" if new else "" + warn(f"key '{key}' is deprecated{use}. SKIPPED.") + yaml_data.pop(key) + + class Person: def __init__(self, yaml_data: str | dict[str, Any]): if isinstance(yaml_data, dict): diff --git a/bin/program.py b/bin/program.py index 6f8daf53..5ffa66a6 100644 --- a/bin/program.py +++ b/bin/program.py @@ -6,7 +6,7 @@ import threading from colorama import Fore from pathlib import Path -from typing import Any, Final, Mapping, Optional, Sequence, TYPE_CHECKING +from typing import Any, Final, Mapping, Optional, Sequence, TypeVar, TYPE_CHECKING import config from util import * @@ -18,6 +18,9 @@ class Language: def __init__(self, lang_id: str, conf: dict[str, Any]): self.ok = True + self.id = lang_id + + T = TypeVar("T") def get_optional_value(key: str, t: type[T]) -> Optional[T]: if key in conf: @@ -40,7 +43,6 @@ def get_value(key: str, t: type[T]) -> T: self.ok = False return t() - self.id = lang_id self.name = get_value("name", str) self.priority = get_value("priority", int) self.files = (get_optional_value("files", str) or "").split() @@ -106,7 +108,7 @@ def command_is_exe(command: Optional[str]) -> bool: CHECKTESTDATA, VIVA, Language( - "manual", + "BAPCtools:manual", { "name": "manual", "priority": 9999, diff --git a/bin/util.py b/bin/util.py index 9b0cc450..894e148e 100644 --- a/bin/util.py +++ b/bin/util.py @@ -818,66 +818,6 @@ def write_yaml( return None -T = TypeVar("T") - - -def parse_optional_setting(yaml_data: dict[str, Any], key: str, t: type[T]) -> Optional[T]: - if key in yaml_data: - value = yaml_data.pop(key) - if isinstance(value, int) and t is float: - value = float(value) - if isinstance(value, t): - return value - if value == "" and (t is list or t is dict): - # handle empty yaml keys - return t() - warn(f"incompatible value for key '{key}' in problem.yaml. SKIPPED.") - return None - - -def parse_setting( - yaml_data: dict[str, Any], key: str, default: T, constraint: Optional[str] = None -) -> T: - value = parse_optional_setting(yaml_data, key, type(default)) - result = default if value is None else value - if constraint: - assert isinstance(result, (float, int)) - assert eval(f"{default} {constraint}") - if not eval(f"{result} {constraint}"): - warn( - f"value for '{key}' in problem.yaml should be {constraint} but is {result}. SKIPPED." - ) - return default - return result - - -def parse_optional_list_setting(yaml_data: dict[str, Any], key: str, t: type[T]) -> list[T]: - if key in yaml_data: - value = yaml_data.pop(key) - if isinstance(value, t): - return [value] - if isinstance(value, list): - if not all(isinstance(v, t) for v in value): - warn( - f"some values for key '{key}' in problem.yaml do not have type {t.__name__}. SKIPPED." - ) - return [] - if not value: - warn(f"value for '{key}' in problem.yaml should not be an empty list.") - return value - warn(f"incompatible value for key '{key}' in problem.yaml. SKIPPED.") - return [] - - -def parse_deprecated_setting( - yaml_data: dict[str, Any], key: str, new: Optional[str] = None -) -> None: - if key in yaml_data: - use = f", use '{new}' instead" if new else "" - warn(f"key '{key}' is deprecated{use}. SKIPPED.") - yaml_data.pop(key) - - def _ask_variable(name: str, default: Optional[str] = None, allow_empty: bool = False) -> str: if config.args.defaults: if not default and not allow_empty: From 101d8660964af0bdeacb9be0fd4452b8cfa44588 Mon Sep 17 00:00:00 2001 From: mzuenni Date: Mon, 20 Oct 2025 14:30:42 +0200 Subject: [PATCH 09/12] add types --- bin/program.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/bin/program.py b/bin/program.py index 5ffa66a6..08328e5c 100644 --- a/bin/program.py +++ b/bin/program.py @@ -279,7 +279,7 @@ def _is_executable(path: Path) -> bool: warn_cache: set[str] = set() # Sets self.language and self.env['mainfile'] - def _get_language(self, bar: ProgressBar): + def _get_language(self, bar: ProgressBar) -> bool: fallback = False candidates = [] for lang in languages(): @@ -369,7 +369,7 @@ def _get_language(self, bar: ProgressBar): bar.error(f"No language detected for {self.path}.") return False - def _checks(self, bar: ProgressBar): + def _checks(self, bar: ProgressBar) -> None: for f in self.source_files: if f.stat().st_size >= config.ICPC_FILE_LIMIT * 1024**2: bar.warn( @@ -436,7 +436,7 @@ def _checks(self, bar: ProgressBar): pass # Return True on success. - def _compile(self, bar: ProgressBar): + def _compile(self, bar: ProgressBar) -> bool: meta_path = self.tmpdir / "meta_.yaml" # Remove all non-source files. @@ -484,7 +484,7 @@ def _compile(self, bar: ProgressBar): return True # Return True on success, False on failure. - def build(self, bar: ProgressBar): + def build(self, bar: ProgressBar) -> bool: assert not self.built self.built = True @@ -586,7 +586,7 @@ def build(self, bar: ProgressBar): return True - def _exec_command(self, *args, **kwargs) -> ExecResult: + def _exec_command(self, *args: Any, **kwargs: Any) -> ExecResult: if "timeout" not in kwargs and "timeout" in self.limits: kwargs["timeout"] = self.limits["timeout"] if "memory" not in kwargs and "memory" in self.limits: @@ -594,14 +594,14 @@ def _exec_command(self, *args, **kwargs) -> ExecResult: return exec_command(*args, **kwargs) @staticmethod - def add_callback(problem, path, c): + def add_callback(problem: "Problem", path: Path, c: Callable[["Program"], None]) -> None: if path not in problem._program_callbacks: problem._program_callbacks[path] = [] problem._program_callbacks[path].append(c) class Generator(Program): - def __init__(self, problem: "Problem", path: Path, **kwargs): + def __init__(self, problem: "Problem", path: Path, **kwargs: Any): super().__init__( problem, path, @@ -614,7 +614,9 @@ def __init__(self, problem: "Problem", path: Path, **kwargs): # Run the generator in the given working directory. # May write files in |cwd| and stdout is piped to {name}.in if it's not written already. # Returns ExecResult. Success when result.status == ExecStatus.ACCEPTED. - def run(self, bar, cwd, name, args=[]): + def run( + self, bar: ProgressBar, cwd: Path, name: str, args: list[str | Path] = [] + ) -> ExecResult: assert self.run_command is not None in_path = cwd / (name + ".in") From 916600dd48fa0f40fcdc11b39a15a0def2c019c4 Mon Sep 17 00:00:00 2001 From: mzuenni Date: Mon, 20 Oct 2025 15:19:23 +0200 Subject: [PATCH 10/12] refactor exe handling --- bin/program.py | 56 +++++++++++++++++++++++++------------------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/bin/program.py b/bin/program.py index 08328e5c..e89f03eb 100644 --- a/bin/program.py +++ b/bin/program.py @@ -56,16 +56,23 @@ def get_value(key: str, t: type[T]) -> T: self.compile = get_optional_value("compile", str) self.run = get_value("run", str) - def get_exe(command: str) -> str: + def get_exe(key: str, command: str, optional: bool = True) -> Optional[str]: try: - return shlex.split(command)[0] - except (IndexError, ValueError) as e: - print(e) - self.ok = False - return "" + exe = shlex.split(command)[0] + if exe and exe[0] != "{": + return exe + if optional: + return None + except (IndexError, ValueError): + pass + error(f"invalid value for key '{key}' in languages.yaml for '{lang_id}'") + self.ok = False + return None - self.compile_exe = get_exe(self.compile) if self.compile is not None else None - self.run_exe = get_exe(self.run) + self.compile_exe = get_exe("compile", self.compile) if self.compile is not None else None + run_exe = get_exe("run", self.run, optional=False) + assert run_exe is not None + self.run_exe = run_exe for key in conf: assert isinstance(key, str) @@ -81,10 +88,6 @@ def matches_shebang(self, f: Path) -> bool: with f.open() as o: return self.shebang.search(o.readline()) is not None - @staticmethod - def command_is_exe(command: Optional[str]) -> bool: - return bool(command and command[0] != "{") - CHECKTESTDATA: Final[Language] = Language( "BAPCtools:checktestdata", @@ -299,24 +302,21 @@ def _get_language(self, bar: ProgressBar) -> bool: for _, lang, files in candidates: name = lang.name # Make sure we can compile programs for this language. - if lang.compile_exe is not None: - exe = lang.compile_exe - if Language.command_is_exe(exe) and shutil.which(exe) is None: - fallback = True - if exe not in Program.warn_cache and config.args.verbose: - Program.warn_cache.add(exe) - bar.debug( - f"Compile program {exe} not found for language {name}. Falling back to lower priority languages." - ) - continue - exe = lang.run_exe - # Make sure we can compile programs for this language. - if Language.command_is_exe(exe) and shutil.which(exe) is None: + if lang.compile_exe is not None and shutil.which(lang.compile_exe) is None: + fallback = True + if lang.compile_exe not in Program.warn_cache and config.args.verbose: + Program.warn_cache.add(lang.compile_exe) + bar.debug( + f"Compile program {lang.compile_exe} not found for language {name}. Falling back to lower priority languages." + ) + continue + # Make sure we can run programs for this language. + if shutil.which(lang.run_exe) is None: fallback = True - if exe not in Program.warn_cache and config.args.verbose: - Program.warn_cache.add(exe) + if lang.run_exe not in Program.warn_cache and config.args.verbose: + Program.warn_cache.add(lang.run_exe) bar.debug( - f"Run program {exe} not found for language {name}. Falling back to lower priority languages." + f"Run program {lang.run_exe} not found for language {name}. Falling back to lower priority languages." ) continue From 284e7e30ba9e291fb85cee7210289a39b075e520 Mon Sep 17 00:00:00 2001 From: mzuenni Date: Mon, 20 Oct 2025 15:31:24 +0200 Subject: [PATCH 11/12] fix command split --- bin/program.py | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/bin/program.py b/bin/program.py index e89f03eb..1bfa9366 100644 --- a/bin/program.py +++ b/bin/program.py @@ -56,23 +56,18 @@ def get_value(key: str, t: type[T]) -> T: self.compile = get_optional_value("compile", str) self.run = get_value("run", str) - def get_exe(key: str, command: str, optional: bool = True) -> Optional[str]: + def get_exe(key: str, command: str) -> Optional[str]: try: exe = shlex.split(command)[0] if exe and exe[0] != "{": return exe - if optional: - return None except (IndexError, ValueError): - pass - error(f"invalid value for key '{key}' in languages.yaml for '{lang_id}'") - self.ok = False + error(f"invalid value for key '{key}' in languages.yaml for '{lang_id}'") + self.ok = False return None - self.compile_exe = get_exe("compile", self.compile) if self.compile is not None else None - run_exe = get_exe("run", self.run, optional=False) - assert run_exe is not None - self.run_exe = run_exe + self.compile_exe = get_exe("compile", self.compile) if self.compile else None + self.run_exe = get_exe("run", self.run) for key in conf: assert isinstance(key, str) @@ -311,7 +306,7 @@ def _get_language(self, bar: ProgressBar) -> bool: ) continue # Make sure we can run programs for this language. - if shutil.which(lang.run_exe) is None: + if lang.run_exe is not None and shutil.which(lang.run_exe) is None: fallback = True if lang.run_exe not in Program.warn_cache and config.args.verbose: Program.warn_cache.add(lang.run_exe) @@ -558,8 +553,8 @@ def build(self, bar: ProgressBar) -> bool: if "run" in sanitizer: run_command += " " + sanitizer["run"] - self.compile_command = compile_command.format(**self.env).split() - self.run_command = run_command.format(**self.env).split() + self.compile_command = shlex.split(compile_command.format(**self.env)) + self.run_command = shlex.split(run_command.format(**self.env)) # Compare the hash to the last build. up_to_date = False From ba467c866a3ec5361837d2972f9355d6c5690b48 Mon Sep 17 00:00:00 2001 From: mzuenni Date: Mon, 20 Oct 2025 16:32:14 +0200 Subject: [PATCH 12/12] more types --- bin/verdicts.py | 119 +++++++++++++++++++++++++++++++----------------- 1 file changed, 77 insertions(+), 42 deletions(-) diff --git a/bin/verdicts.py b/bin/verdicts.py index c4996d2c..20c28808 100644 --- a/bin/verdicts.py +++ b/bin/verdicts.py @@ -4,16 +4,16 @@ import threading from enum import Enum from pathlib import Path -from typing import Literal, TYPE_CHECKING +from typing import Any, Literal, Optional, Sequence, TextIO, TYPE_CHECKING from colorama import Fore, Style import config import testcase -from util import ProgressBar +from util import ITEM_TYPE, ProgressBar if TYPE_CHECKING: - import run + pass class Verdict(Enum): @@ -26,7 +26,7 @@ class Verdict(Enum): VALIDATOR_CRASH = 5 COMPILER_ERROR = 6 - def __str__(self): + def __str__(self) -> str: return { Verdict.ACCEPTED: "ACCEPTED", Verdict.WRONG_ANSWER: "WRONG ANSWER", @@ -36,10 +36,10 @@ def __str__(self): Verdict.COMPILER_ERROR: "COMPILER ERROR", }[self] - def __lt__(self, other): + def __lt__(self, other: "Verdict") -> bool: return self.value < other.value - def short(self): + def short(self) -> str: return { Verdict.ACCEPTED: "AC", Verdict.WRONG_ANSWER: "WA", @@ -49,7 +49,7 @@ def short(self): Verdict.COMPILER_ERROR: "CE", }[self] - def color(self): + def color(self) -> str: return { Verdict.ACCEPTED: Fore.GREEN, Verdict.WRONG_ANSWER: Fore.RED, @@ -78,7 +78,7 @@ class RunUntil(Enum): ALL = 3 -def to_char(v: Verdict | None | Literal[False], lower: bool = False): +def to_char(v: Verdict | None | Literal[False], lower: bool = False) -> str: if v is None or v is False: return f"{Fore.BLUE}?{Style.RESET_ALL}" else: @@ -86,7 +86,7 @@ def to_char(v: Verdict | None | Literal[False], lower: bool = False): return f"{v.color()}{char}{Style.RESET_ALL}" -def to_string(v: Verdict | None | Literal[False]): +def to_string(v: Verdict | None | Literal[False]) -> str: if v is None or v is False: return to_char(v) else: @@ -199,10 +199,10 @@ def __init__( self.children[tg] = sorted(self.children[tg]) # Allow `with self` to lock. - def __enter__(self): + def __enter__(self) -> None: self.lock.__enter__() - def __exit__(self, *args): + def __exit__(self, *args) -> None: self.lock.__exit__(*args) def is_test_group(self, node: str) -> bool: @@ -217,7 +217,7 @@ def is_test_case(self, node: str) -> bool: """ return node not in self.children - def set(self, test_case: str, verdict: str | Verdict, duration: float): + def set(self, test_case: str, verdict: str | Verdict, duration: float) -> None: """Set the verdict and duration of the given test case (implying possibly others) verdict can be given as a Verdict or as a string using either long or @@ -229,7 +229,7 @@ def set(self, test_case: str, verdict: str | Verdict, duration: float): self.duration[test_case] = duration self._set_verdict_for_node(test_case, verdict, duration >= self.timeout) - def __getitem__(self, test_node) -> Verdict | None | Literal[False]: + def __getitem__(self, test_node: str) -> Verdict | None | Literal[False]: with self: return self.verdict[test_node] @@ -297,7 +297,7 @@ def aggregate(self, test_group: str) -> Verdict: assert first_error is not False return first_error - def _set_verdict_for_node(self, test_node: str, verdict: Verdict, timeout: bool): + def _set_verdict_for_node(self, test_node: str, verdict: Verdict, timeout: bool) -> None: # This assumes self.lock is already held. if timeout: assert verdict != Verdict.ACCEPTED @@ -374,8 +374,8 @@ def __init__(self, length: int, text: str): self.length = length self.text = text - def __iter__(self): - yield from [self.length, self.text] + def tuple(self) -> tuple[int, str]: + return (self.length, self.text) def __init__( self, @@ -420,7 +420,8 @@ def __init__( verdicts[-1].text += "s" if test_case in self.samples else "-" printed = self.name_width + 1 - for length, tmp in verdicts: + for verdict_value in verdicts: + length, tmp = verdict_value.tuple() if printed + 1 + length > self.width: lines.append(f"{str():{self.name_width + 1}}") printed = self.name_width + 1 @@ -443,18 +444,18 @@ def __init__( file=sys.stderr, ) - def next_submission(self, verdicts: Verdicts): + def next_submission(self, verdicts: Verdicts) -> None: self.results.append(verdicts) self.current_test_cases = set() - def add_test_case(self, test_case: str): + def add_test_case(self, test_case: str) -> None: self.current_test_cases.add(test_case) - def update_verdicts(self, test_case: str, verdict: str | Verdict, duration: float): + def update_verdicts(self, test_case: str, verdict: str | Verdict, duration: float) -> None: self.results[-1].set(test_case, verdict, duration) self.current_test_cases.discard(test_case) - def _clear(self, *, force: bool = True): + def _clear(self, *, force: bool = True) -> None: if force or self.print_without_force: if self.last_printed: actual_width = ProgressBar.columns @@ -480,7 +481,7 @@ def _get_verdict(self, s: int, test_case: str, check_sample: bool = True) -> str res = Style.DIM + to_char(None) return res - def print(self, **kwargs): + def print(self, **kwargs: Any) -> None: if config.args.tree: self._print_tree(**kwargs) else: @@ -492,7 +493,7 @@ def _print_tree( force: bool = True, new_lines: int = 1, printed_lengths: list[int] | None = None, - ): + ) -> None: if printed_lengths is None: printed_lengths = [] if force or self.print_without_force: @@ -551,7 +552,8 @@ def _print_tree( width = -1 if ProgressBar.columns - pref_len < 10 else self.width space = "" - for length, group in grouped: + for grouped_value in grouped: + length, group = grouped_value.tuple() if width >= 0 and printed + 1 + length > width: printed_text.append( f"\n{Style.DIM}{indent}{pipe} {pipe2} {Style.RESET_ALL}" @@ -592,7 +594,7 @@ def _print_table( force: bool = True, new_lines: int = 2, printed_lengths: list[int] | None = None, - ): + ) -> None: if printed_lengths is None: printed_lengths = [] if force or self.print_without_force: @@ -615,7 +617,8 @@ def _print_table( verdicts[-1].length += 1 verdicts[-1].text += self._get_verdict(s, test_case) - for length, tmp in verdicts: + for verdict_value in verdicts: + length, tmp = verdict_value.tuple() if self.width >= 0 and printed + 1 + length > self.width: printed_text.append(f"\n{str():{self.name_width + 1}}") printed_lengths.append(printed) @@ -632,12 +635,12 @@ def _print_table( def ProgressBar( self, - prefix, - max_len=None, - count=None, + prefix: str, + max_len: Optional[int] = None, + count: Optional[int] = None, *, - items=None, - needs_leading_newline=False, + items: Optional[Sequence[ITEM_TYPE]] = None, + needs_leading_newline: bool = False, ) -> "TableProgressBar": return TableProgressBar( self, @@ -650,7 +653,16 @@ def ProgressBar( class TableProgressBar(ProgressBar): - def __init__(self, table, prefix, max_len, count, *, items, needs_leading_newline): + def __init__( + self, + table: VerdictTable, + prefix: str, + max_len: Optional[int], + count: Optional[int], + *, + items: Optional[Sequence[ITEM_TYPE]], + needs_leading_newline: bool, + ): super().__init__( prefix, max_len, @@ -661,7 +673,7 @@ def __init__(self, table, prefix, max_len, count, *, items, needs_leading_newlin self.table = table # at the begin of any IO the progress bar locks so we can clear the table at this point - def __enter__(self): + def __enter__(self) -> None: super().__enter__() if ProgressBar.lock_depth == 1: if isinstance(sys.stderr, io.TextIOWrapper): @@ -670,7 +682,7 @@ def __enter__(self): self.table._clear(force=False) # at the end of any IO the progress bar unlocks so we can reprint the table at this point - def __exit__(self, *args): + def __exit__(self, *args: Any) -> None: if ProgressBar.lock_depth == 1: # ProgressBar.columns is just an educated guess for the number of printed chars # in the ProgressBar @@ -680,20 +692,43 @@ def __exit__(self, *args): print(end="", flush=True, file=sys.stderr) super().__exit__(*args) - def _print(self, *objects, sep="", end="\n", file=sys.stderr, flush=True): + def _print( + self, + *objects: Any, + sep: str = "", + end: str = "\n", + file: TextIO = sys.stderr, + flush: bool = True, + ) -> None: assert self._is_locked() # drop all flushes... print(*objects, sep=sep, end=end, file=file, flush=False) - # TODO #102: item has type `str` in the base class, but type `run.Run` here. - def start(self, item: "run.Run"): # type: ignore[override] - self.table.add_test_case(item.testcase.name) - return super().start(item) + def start(self, item: ITEM_TYPE = "") -> "TableProgressBar": + from run import Run - def done(self, success=True, message="", data="", print_item=True): - return super().done(success, message, data, print_item) + assert isinstance(item, Run) + self.table.add_test_case(item.testcase.name) + copy = super().start(item) + assert isinstance(copy, TableProgressBar) + return copy - def finalize(self, *, print_done=True, message=None, suppress_newline=False): + def done( + self, + success: bool = True, + message: str = "", + data: Optional[str] = None, + print_item: bool = True, + ) -> None: + super().done(success, message, data, print_item) + + def finalize( + self, + *, + print_done: bool = True, + message: Optional[str] = None, + suppress_newline: bool = False, + ) -> bool: with self: res = super().finalize( print_done=print_done,