Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions benchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,16 @@ def file_replace(file_name: Path, old_text: str, new_text: str) -> Iterator[None
"""
file_text = ""
if old_text:
file_text = file_name.read_text()
file_text = file_name.read_text(encoding="utf-8")
if old_text not in file_text:
raise Exception("Old text {old_text!r} not found in {file_name}")
updated_text = file_text.replace(old_text, new_text)
file_name.write_text(updated_text)
file_name.write_text(updated_text, encoding="utf-8")
try:
yield
finally:
if old_text:
file_name.write_text(file_text)
file_name.write_text(file_text, encoding="utf-8")


def file_must_exist(file_name: str, kind: str = "file") -> Path:
Expand Down Expand Up @@ -624,7 +624,7 @@ def run_no_coverage(self, env: Env) -> float:
def run_with_coverage(self, env: Env, cov_ver: Coverage) -> float:
env.shell.run_command(f"{env.python} -m pip install {cov_ver.pip_args}")
pforce = Path("force.ini")
pforce.write_text("[run]\nbranch=false\n")
pforce.write_text("[run]\nbranch=false\n", encoding="utf-8")
with env.shell.set_env({"COVERAGE_FORCE_CONFIG": str(pforce.resolve())}):
env.shell.run_command(f"{env.python} -m pytest {self.FAST} --cov")
duration = env.shell.last_duration
Expand Down Expand Up @@ -907,13 +907,13 @@ def __init__(

def save_results(self) -> None:
"""Save current results to the JSON file."""
with self.results_file.open("w") as f:
with self.results_file.open("w", encoding="utf-8") as f:
json.dump({" ".join(k): v for k, v in self.result_data.items()}, f)

def load_results(self) -> dict[ResultKey, list[float]]:
"""Load results from the JSON file if it exists."""
if self.results_file.exists():
with self.results_file.open("r") as f:
with self.results_file.open("r", encoding="utf-8") as f:
data: dict[str, list[float]] = json.load(f)
return {
(k.split()[0], k.split()[1], k.split()[2]): v for k, v in data.items()
Expand Down
8 changes: 4 additions & 4 deletions coverage/html.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def data_filename(fname: str) -> str:

def read_data(fname: str) -> str:
"""Return the contents of a data file of ours."""
with open(data_filename(fname)) as data_file:
with open(data_filename(fname), encoding="utf-8") as data_file:
return data_file.read()


Expand Down Expand Up @@ -412,7 +412,7 @@ def make_local_static_report_files(self) -> None:
# .gitignore can't be copied from the source tree because if it was in
# the source tree, it would stop the static files from being checked in.
if self.directory_was_empty:
with open(os.path.join(self.directory, ".gitignore"), "w") as fgi:
with open(os.path.join(self.directory, ".gitignore"), "w", encoding="utf-8") as fgi:
fgi.write("# Created by coverage.py\n*\n")

def should_report(self, analysis: Analysis, index_page: IndexPage) -> bool:
Expand Down Expand Up @@ -706,7 +706,7 @@ def read(self) -> None:
"""Read the information we stored last time."""
try:
status_file = os.path.join(self.directory, self.STATUS_FILE)
with open(status_file) as fstatus:
with open(status_file, encoding="utf-8") as fstatus:
status = json.load(fstatus)
except (OSError, ValueError):
# Status file is missing or malformed.
Expand Down Expand Up @@ -747,7 +747,7 @@ def write(self) -> None:
for fname, finfo in self.files.items()
},
}
with open(status_file, "w") as fout:
with open(status_file, "w", encoding="utf-8") as fout:
json.dump(status_data, fout, separators=(",", ":"))

def check_global_data(self, *data: Any) -> None:
Expand Down
2 changes: 1 addition & 1 deletion coverage/pytracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ def __repr__(self) -> str:

def log(self, marker: str, *args: Any) -> None:
"""For hard-core logging of what this tracer is doing."""
with open("/tmp/debug_trace.txt", "a") as f:
with open("/tmp/debug_trace.txt", "a", encoding="utf-8") as f:
f.write(f"{marker} {self.id}[{len(self.data_stack)}]")
if 0: # if you want thread ids..
f.write(".{:x}.{:x}".format( # type: ignore[unreachable]
Expand Down
2 changes: 1 addition & 1 deletion coverage/sysmon.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def log(msg: str) -> None:
# f"{root}-{pid}.out",
# f"{root}-{pid}-{tslug}.out",
]:
with open(filename, "a") as f:
with open(filename, "a", encoding="utf-8") as f:
try:
print(f"{pid}:{tslug}: {msg}", file=f, flush=True)
except UnicodeError:
Expand Down
2 changes: 1 addition & 1 deletion doc/cog_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def _read_config(text, fname):
text = textwrap.dedent(text[1:])

os.makedirs("tmp", exist_ok=True)
with open(f"tmp/{fname}", "w") as f:
with open(f"tmp/{fname}", "w", encoding="utf-8") as f:
f.write(text)

config = read_coverage_config(f"tmp/{fname}", warn=cog.error)
Expand Down
2 changes: 1 addition & 1 deletion doc/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@
# missing, so only use the extension if we are specifically spell-checking.
extensions += ['sphinxcontrib.spelling']
names_file = tempfile.NamedTemporaryFile(mode='w', prefix="coverage_names_", suffix=".txt")
with open("../CONTRIBUTORS.txt") as contributors:
with open("../CONTRIBUTORS.txt", encoding="utf-8") as contributors:
names = set(re.split(r"[^\w']", contributors.read()))
names = [n for n in names if len(n) >= 2 and n[0].isupper()]
names_file.write("\n".join(names))
Expand Down
6 changes: 3 additions & 3 deletions igor.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def run_tests_with_coverage(core, *runner_args):
# There's an entry in "make clean" to get rid of this file.
pth_dir = sysconfig.get_path("purelib")
pth_path = os.path.join(pth_dir, "zzz_metacov.pth")
with open(pth_path, "w") as pth_file:
with open(pth_path, "w", encoding="utf-8") as pth_file:
pth_file.write("import coverage; coverage.process_startup()\n")

suffix = f"{make_env_id(core)}_{platform.platform()}"
Expand Down Expand Up @@ -374,14 +374,14 @@ def get_release_facts():

def update_file(fname, pattern, replacement):
"""Update the contents of a file, replacing pattern with replacement."""
with open(fname) as fobj:
with open(fname, encoding="utf-8") as fobj:
old_text = fobj.read()

new_text = re.sub(pattern, replacement, old_text, count=1)

if new_text != old_text:
print(f"Updating {fname}")
with open(fname, "w") as fobj:
with open(fname, "w", encoding="utf-8") as fobj:
fobj.write(new_text)


Expand Down
2 changes: 1 addition & 1 deletion lab/branch_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def trace(frame, event, arg):
last = this
return trace

code = open(sys.argv[1]).read()
code = open(sys.argv[1], encoding="utf-8").read()
sys.settrace(trace)
exec(code)
print(sorted(pairs))
2 changes: 1 addition & 1 deletion lab/extract_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def f(a, b):
fname, lineno = sys.argv[1:]
lineno = int(lineno)

with open(fname) as code_file:
with open(fname, encoding="utf-8") as code_file:
lines = ["", *code_file]

# Find opening triple-quote
Expand Down
2 changes: 1 addition & 1 deletion lab/goals.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def main(argv):
print("Need either --file or --group")
return 1

with open("coverage.json") as j:
with open("coverage.json", encoding="utf-8") as j:
data = json.load(j)
all_files = list(data["files"].keys())
selected = select_files(all_files, args.pattern)
Expand Down
2 changes: 1 addition & 1 deletion lab/run_sysmon.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
print(sys.version)
the_program = sys.argv[1]

code = compile(open(the_program).read(), filename=the_program, mode="exec")
code = compile(open(the_program, encoding="utf-8").read(), filename=the_program, mode="exec")

my_id = sys.monitoring.COVERAGE_ID
sys.monitoring.use_tool_id(my_id, "run_sysmon.py")
Expand Down
2 changes: 1 addition & 1 deletion lab/run_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ def trace(frame, event, arg):
print(sys.version)
the_program = sys.argv[1]

code = open(the_program).read()
code = open(the_program, encoding="utf-8").read()
sys.settrace(trace)
exec(code)
2 changes: 1 addition & 1 deletion lab/show_pyc.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def show_pyc_file(fname):
show_code(code)

def show_py_file(fname):
text = open(fname).read().replace('\r\n', '\n')
text = open(fname, encoding="utf-8").read().replace('\r\n', '\n')
show_py_text(text, fname=fname)

def show_py_text(text, fname="<string>"):
Expand Down
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@
"""

cov_ver_py = os.path.join(os.path.split(__file__)[0], "coverage/version.py")
with open(cov_ver_py) as version_file:
with open(cov_ver_py, encoding="utf-8") as version_file:
# __doc__ will be overwritten by version.py.
doc = __doc__
# Keep pylint happy.
__version__ = __url__ = version_info = ""
# Execute the code in version.py.
exec(compile(version_file.read(), cov_ver_py, "exec", dont_inherit=True))

with open("README.rst") as readme:
with open("README.rst", encoding="utf-8") as readme:
readme_text = readme.read()

temp_url = __url__.replace("readthedocs", "@@")
Expand Down
6 changes: 3 additions & 3 deletions tests/balance_xdist_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def pytest_sessionstart(self, session):
if self.worker == "none":
if tests_csv_dir.exists():
for csv_file in tests_csv_dir.iterdir():
with csv_file.open(newline="") as fcsv:
with csv_file.open(newline="", encoding="utf-8") as fcsv:
reader = csv.reader(fcsv)
for row in reader:
self.times[row[1]] += float(row[3])
Expand All @@ -81,7 +81,7 @@ def write_duration_row(self, item, phase, duration):
"""Helper to write a row to the tracked-test csv file."""
if self.running_all:
self.tests_csv.parent.mkdir(parents=True, exist_ok=True)
with self.tests_csv.open("a", newline="") as fcsv:
with self.tests_csv.open("a", newline="", encoding="utf-8") as fcsv:
csv.writer(fcsv).writerow([self.worker, item.nodeid, phase, duration])

@pytest.hookimpl(hookwrapper=True)
Expand Down Expand Up @@ -171,7 +171,7 @@ def show_worker_times(): # pragma: debugging
tests_csv_dir = Path("tmp/tests_csv")

for csv_file in tests_csv_dir.iterdir():
with csv_file.open(newline="") as fcsv:
with csv_file.open(newline="", encoding="utf-8") as fcsv:
reader = csv.reader(fcsv)
for row in reader:
worker = row[0]
Expand Down
5 changes: 3 additions & 2 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ def pytest_sessionstart() -> None:
# Create a .pth file for measuring subprocess coverage.
pth_dir = find_writable_pth_directory()
assert pth_dir
(pth_dir / "subcover.pth").write_text("import coverage; coverage.process_startup()\n")
sub_dir = pth_dir / "subcover.pth"
sub_dir.write_text("import coverage; coverage.process_startup()\n", encoding="utf-8")
# subcover.pth is deleted by pytest_sessionfinish below.


Expand Down Expand Up @@ -137,7 +138,7 @@ def find_writable_pth_directory() -> Path | None:
for pth_dir in possible_pth_dirs(): # pragma: part covered
try_it = pth_dir / f"touch_{WORKER}.it"
try:
try_it.write_text("foo")
try_it.write_text("foo", encoding="utf-8")
except OSError: # pragma: cant happen
continue

Expand Down
16 changes: 8 additions & 8 deletions tests/goldtest.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ def save_mismatch(f: str) -> None:
save_path = expected_dir.replace(os_sep("/gold/"), os_sep("/actual/"))
os.makedirs(save_path, exist_ok=True)
save_file = os.path.join(save_path, f)
with open(save_file, "w") as savef:
with open(os.path.join(actual_dir, f)) as readf:
with open(save_file, "w", encoding="utf-8") as savef:
with open(os.path.join(actual_dir, f), encoding="utf-8") as readf:
savef.write(readf.read())
print(os_sep(f"Saved actual output to '{save_file}': see tests/gold/README.rst"))

Expand All @@ -70,13 +70,13 @@ def save_mismatch(f: str) -> None:
text_diff = []
for f in diff_files:
expected_file = os.path.join(expected_dir, f)
with open(expected_file) as fobj:
with open(expected_file, encoding="utf-8") as fobj:
expected = fobj.read()
if expected_file.endswith(".xml"):
expected = canonicalize_xml(expected)

actual_file = os.path.join(actual_dir, f)
with open(actual_file) as fobj:
with open(actual_file, encoding="utf-8") as fobj:
actual = fobj.read()
if actual_file.endswith(".xml"):
actual = canonicalize_xml(actual)
Expand Down Expand Up @@ -114,7 +114,7 @@ def contains(filename: str, *strlist: str) -> None:

"""
__tracebackhide__ = True # pytest, please don't show me this function.
with open(filename) as fobj:
with open(filename, encoding="utf-8") as fobj:
text = fobj.read()
for s in strlist:
assert s in text, f"Missing content in {filename}: {s!r}"
Expand All @@ -128,7 +128,7 @@ def contains_rx(filename: str, *rxlist: str) -> None:

"""
__tracebackhide__ = True # pytest, please don't show me this function.
with open(filename) as fobj:
with open(filename, encoding="utf-8") as fobj:
lines = fobj.readlines()
for rx in rxlist:
assert any(re.search(rx, line) for line in lines), (
Expand All @@ -144,7 +144,7 @@ def contains_any(filename: str, *strlist: str) -> None:

"""
__tracebackhide__ = True # pytest, please don't show me this function.
with open(filename) as fobj:
with open(filename, encoding="utf-8") as fobj:
text = fobj.read()
for s in strlist:
if s in text:
Expand All @@ -161,7 +161,7 @@ def doesnt_contain(filename: str, *strlist: str) -> None:

"""
__tracebackhide__ = True # pytest, please don't show me this function.
with open(filename) as fobj:
with open(filename, encoding="utf-8") as fobj:
text = fobj.read()
for s in strlist:
assert s not in text, f"Forbidden content in {filename}: {s!r}"
Expand Down
15 changes: 12 additions & 3 deletions tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import re
import shutil
import subprocess
import sys
import textwrap
import warnings

Expand Down Expand Up @@ -42,10 +43,18 @@ def run_command(cmd: str) -> tuple[int, str]:
# Subprocesses are expensive, but convenient, and so may be over-used in
# the test suite. Use these lines to get a list of the tests using them:
if 0: # pragma: debugging
with open("/tmp/processes.txt", "a") as proctxt: # type: ignore[unreachable]
pth = "/tmp/processes.txt" # type: ignore[unreachable]
with open(pth, "a", encoding="utf-8") as proctxt:
print(os.getenv("PYTEST_CURRENT_TEST", "unknown"), file=proctxt, flush=True)

encoding = os.device_encoding(1) or locale.getpreferredencoding()
# Type checking trick due to "unreachable" being set
_locale_type_erased: Any = locale

encoding = os.device_encoding(1) or (
_locale_type_erased.getpreferredencoding()
if sys.version_info < (3, 11)
else _locale_type_erased.getencoding()
)

# In some strange cases (PyPy3 in a virtualenv!?) the stdout encoding of
# the subprocess is set incorrectly to ascii. Use an environment variable
Expand Down Expand Up @@ -113,7 +122,7 @@ def make_file(

if text and basename.endswith(".py") and SHOW_DIS: # pragma: debugging
os.makedirs("/tmp/dis", exist_ok=True)
with open(f"/tmp/dis/{basename}.dis", "w") as fdis:
with open(f"/tmp/dis/{basename}.dis", "w", encoding="utf-8") as fdis:
print(f"# {os.path.abspath(filename)}", file=fdis)
cur_test = os.getenv("PYTEST_CURRENT_TEST", "unknown")
print(f"# PYTEST_CURRENT_TEST = {cur_test}", file=fdis)
Expand Down
2 changes: 1 addition & 1 deletion tests/osinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def _VmB(key: str) -> int:
"""Read the /proc/PID/status file to find memory use."""
try:
# Get pseudo file /proc/<pid>/status
with open(f"/proc/{os.getpid()}/status") as t:
with open(f"/proc/{os.getpid()}/status", encoding="utf-8") as t:
v = t.read()
except OSError: # pragma: cant happen
return 0 # non-Linux?
Expand Down
Loading