Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 241 additions & 0 deletions bin/check_testing_tool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
import shutil
import sys
from pathlib import Path
from typing import Optional, Sequence

import config
import parallel
from program import Program
from run import Submission
from util import *

if TYPE_CHECKING: # Prevent circular import: https://stackoverflow.com/a/39757388
from problem import Problem

"""DISCLAIMER:

This tool was only made to check testing tools faster.
You should still carefully review the code of the testing tool.

For this tool to work the following things must hold:
- the testing tool must be found under `attachments/testing_tool.<ext>`
- the testing tool must be callable as `{program} -f {in_path} {submission program}`
- the testing tool must accept the downloadable samples as well as those found under
`data/testing_tool_test/` as input files
- the testing tool must exits with a non zero exit code if something goes wrong
- the testing tool must not change the working directory
"""


class TestInput:
def __init__(self, problem: "Problem", in_path: Path, short_path: Path):
assert in_path.suffix in [".in", ".download", ".statement"]
self.problem = problem
self.in_path = in_path
self.short_path = short_path
if self.short_path.suffix in [".download", ".statement"]:
ext = self.short_path.suffix
name = self.short_path.with_suffix("")
assert name.suffix in [".in"]
self.name = str(name.with_suffix(ext))
else:
self.name = str(self.short_path.with_suffix(""))


class WrappedSubmission:
def __init__(self, problem: "Problem", submission: Submission):
self.problem = problem
self.submission = submission
self.name = submission.name
self.tmpdir = (
problem.tmpdir / "testing_tool" / submission.tmpdir.relative_to(problem.tmpdir)
)
self.tmpdir.mkdir(parents=True, exist_ok=True)
self.run_command: Optional[list[Path | str]] = None

def supports_memory_limit(self) -> bool:
assert self.run_command is not None
assert self.submission.run_command is not None
return command_supports_memory_limit(self.run_command) and command_supports_memory_limit(
self.submission.run_command
)

def _wrapper_script(self) -> str:
assert self.submission.run_command is not None
args = ", ".join(map(repr, self.submission.run_command))
# script assumes that the working directory is not changed
script = """#!/usr/bin/env python3
import subprocess
import sys
from pathlib import Path

result = subprocess.run(
[{args}],
stdout=sys.stdout,
stderr=sys.stderr,
stdin=sys.stdin,
)
returncode_file = Path(".returncode")
# For multipass we store the first non zero return code
write_returncode = True
if returncode_file.is_file():
raw = returncode_file.read_text()
try:
if int(raw) != 0:
write_returncode = False
except ValueError:
pass
if write_returncode:
returncode_file.write_text(f"{result.returncode}\\n")
sys.exit(result.returncode)
"""
return script.replace("{args}", args)

def build(self) -> None:
wrapper_file = self.tmpdir / "wrapper.py"
wrapper_file.write_text(self._wrapper_script())
self.run_command = [sys.executable, wrapper_file]

def run(self, bar: ProgressBar, testing_tool: "TestingTool", testinput: TestInput) -> bool:
assert self.run_command is not None
rundir = self.tmpdir / testinput.short_path
if rundir.is_file():
rundir.unlink()
elif rundir.exists():
shutil.rmtree(rundir)
rundir.mkdir(exist_ok=True, parents=True)

returncode_file = rundir / ".returncode"
in_path = rundir / "testcase.in"
ensure_symlink(in_path, testinput.in_path)

localbar = bar.start(testinput)

result = testing_tool.run(in_path, self)
submission_returncode = None
submission_status = None
if returncode_file.is_file():
raw = returncode_file.read_text()
try:
submission_returncode = int(raw)
submission_status = default_exec_code_map(submission_returncode)
except ValueError:
pass
ok = bool(result.status) and bool(submission_status)

message = []
if result.status == ExecStatus.TIMEOUT:
message.append("TIMEOUT")
elif not result.status:
message.append(f"Testing Tool exit code: {result.returncode}")
if (
submission_status is not None
and not submission_status
and submission_status != ExecStatus.TIMEOUT
):
message.append(f"Submission exit code: {submission_returncode}")
if not message:
message.append("OK")

data = ""
if result.out and result.err:
data = (
"TESTING TOOL STDERR:"
+ localbar._format_data(result.err)
+ "\nTESTING TOOL STDOUT:"
+ localbar._format_data(result.out)
+ "\n"
)
elif result.err:
data = result.err
elif result.out:
data = result.out

localbar.done(ok, ", ".join(message), data)
return ok


class TestingTool(Program):
def __init__(self, problem: "Problem", path: Path):
super().__init__(
problem,
path,
"testing_tool",
limits={
"timeout": problem.limits.timeout,
"memory": problem.limits.memory,
},
)

def run(self, in_path: Path, submission: WrappedSubmission) -> ExecResult:
assert self.run_command is not None
assert submission.run_command is not None
exec_res = self._exec_command(
[*self.run_command, "-f", in_path, *submission.run_command],
cwd=in_path.parent,
crop=True,
memory=self.limits["memory"] if submission.supports_memory_limit() else None,
)
return exec_res


def run(
problem: "Problem", testinputs: Sequence[TestInput], submissions: Sequence[Submission]
) -> bool:
wrapped_submissions = [WrappedSubmission(problem, submission) for submission in submissions]
for submission in wrapped_submissions:
submission.build()

tool_dir = problem.path / "attachments" / "testing_tool"
tool_files = list((problem.path / "attachments").glob("testing_tool.*"))
if (tool_dir.is_dir() and tool_files) or len(tool_files) > 1:
error("Multiple testing tools found!")
return False
elif not tool_dir.is_dir() and not tool_files:
error("No testing tool found!")
return False

if tool_dir.is_dir():
testing_tool = TestingTool(problem, tool_dir)
else:
testing_tool = TestingTool(problem, tool_files[0])

bar = ProgressBar("Building testing tool", items=[testing_tool])
localbar = bar.start(testing_tool)
if not testing_tool.build(bar):
localbar.done(False)
return False
localbar.done()
bar.finalize(print_done=False)

ok = True

max_submission_len = max([len(x.name) for x in wrapped_submissions])
max_testinput_len = max(len(x.name) for x in testinputs)

# When True, the ProgressBar will print a newline before the first error log.
needs_leading_newline = False if config.args.verbose else True
for submission in wrapped_submissions:
bar = ProgressBar(
submission.name,
count=len(testinputs),
max_len=max_testinput_len + max_submission_len - len(submission.name),
needs_leading_newline=needs_leading_newline,
)
cur_ok = True

def run_submission(testinput: TestInput) -> None:
nonlocal cur_ok
# skip after first error
if not cur_ok and not config.args.all:
bar.skip()
return
if not submission.run(bar, testing_tool, testinput):
# just writing False is thread safe
cur_ok = False

parallel.run_tasks(run_submission, testinputs, pin=True)
ok &= cur_ok
needs_leading_newline = bar.finalize()

return ok
26 changes: 17 additions & 9 deletions bin/constraints.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import re
import sys
from collections import defaultdict
from typing import Optional

Expand Down Expand Up @@ -29,7 +30,7 @@ def check_validators(
problem.validate_data(validate.Mode.ANSWER, constraints=ans_constraints)
if not problem.settings.ans_is_output and not ans_constraints:
log("No constraint validation of answer values found in answer or output validators.")
print()
print(file=sys.stderr)

validator_values: set[int | float] = set()
validator_defs: list[str | tuple[int | float, str, int | float]] = []
Expand Down Expand Up @@ -275,6 +276,7 @@ def check_constraints(problem: Problem) -> bool:
print(
"{:^{width}}|{:^40}".format("VALIDATORS", "PROBLEM STATEMENT", width=left_width),
sep="",
file=sys.stderr,
)

while statement_defs or validator_defs:
Expand All @@ -292,28 +294,29 @@ def check_constraints(problem: Problem) -> bool:
if val is not None:
validator_defs.remove(val)
if isinstance(val, str):
print("{:^{width}}".format(val, width=left_width), sep="", end="")
print("{:^{width}}".format(val, width=left_width), sep="", end="", file=sys.stderr)
else:
print(
"{:>{value_len}_} <= {:^{name_len}} <= {:<{value_len}_}".format(
*val, name_len=name_len, value_len=value_len
),
sep="",
end="",
file=sys.stderr,
)
else:
print("{:^{width}}".format("", width=left_width), sep="", end="")
print("|", end="")
print("{:^{width}}".format("", width=left_width), sep="", end="", file=sys.stderr)
print("|", end="", file=sys.stderr)
if st is not None:
languages = ",".join(statement_defs[st])
print("{:^40} {}".format(st, languages), sep="", end="")
print("{:^40} {}".format(st, languages), sep="", end="", file=sys.stderr)
else:
print("{:^40}".format(""), sep="", end="")
print()
print("{:^40}".format(""), sep="", end="", file=sys.stderr)
print(file=sys.stderr)
if st is not None:
statement_defs.pop(st)

print()
print(file=sys.stderr)

warned = False
for value in validator_values:
Expand All @@ -323,7 +326,11 @@ def check_constraints(problem: Problem) -> bool:
if not warned:
warned = True
warn("Values in validators but missing in some statement:")
print(f"{Fore.YELLOW}{value}{Style.RESET_ALL} missing in", ",".join(missing))
print(
f"{Fore.YELLOW}{value}{Style.RESET_ALL} missing in",
",".join(missing),
file=sys.stderr,
)

extra_in_statement = set(statement_values.keys()).difference(validator_values)
if extra_in_statement:
Expand All @@ -332,6 +339,7 @@ def check_constraints(problem: Problem) -> bool:
print(
f"{Fore.YELLOW}{value}{Style.RESET_ALL} in",
",".join(sorted(statement_values[value])),
file=sys.stderr,
)

return True
16 changes: 14 additions & 2 deletions bin/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,9 @@ def validate_in(t, problem: Problem, testcase: Testcase, meta_yaml: dict, bar: P
infile = problem.tmpdir / "data" / t.hash / "testcase.in"
assert infile.is_file()

if testcase.root == "testing_tool_test":
return True

input_validator_hashes = testcase.validator_hashes(validate.InputValidator, bar)
if all(h in meta_yaml["input_validator_hashes"] for h in input_validator_hashes):
return True
Expand Down Expand Up @@ -705,7 +708,7 @@ def validate_ans_and_out(
infile = problem.tmpdir / "data" / t.hash / "testcase.in"
assert infile.is_file()

if testcase.root == "invalid_input":
if testcase.root in ["invalid_input", "testing_tool_test"]:
return True

ansfile = infile.with_suffix(".ans")
Expand Down Expand Up @@ -939,7 +942,11 @@ def generate_from_rule():
def generate_from_solution(testcase: Testcase, bar: ProgressBar):
nonlocal meta_yaml

if testcase.root in [*config.INVALID_CASE_DIRECTORIES, "valid_output"]:
if testcase.root in [
*config.INVALID_CASE_DIRECTORIES,
"valid_output",
"testing_tool_test",
]:
return True
if config.args.no_solution:
return True
Expand Down Expand Up @@ -1021,6 +1028,8 @@ def generate_visualization(testcase: Testcase, bar: ProgressBar):

if testcase.root in config.INVALID_CASE_DIRECTORIES:
return True
if testcase.root == "testing_tool_test":
return True
if config.args.no_visualizer:
return True

Expand Down Expand Up @@ -1182,6 +1191,7 @@ def add_test_case_to_cache():

# consider specific files for the uniqueness of this testcase
relevant_files = {
"testing_tool_test": [".in"],
"invalid_input": [".in"],
"invalid_answer": [".in", ".ans"],
"invalid_output": [".in", ".ans", ".out"],
Expand Down Expand Up @@ -2155,6 +2165,8 @@ def reorder(self):
warn(f"{d} is used for invalid test data. Skipping.")
elif parts[0] == "valid_output":
warn(f"{d} is used for valid test data. Skipping.")
elif parts[0] == "testing_tool_test":
warn(f"{d} is used to test the testing tool. Skipping.")
elif path not in self.known_directories:
warn(f"{d} is not a generated directory. Skipping.")
elif not self.known_directories[path].numbered:
Expand Down
Loading
Loading