diff --git a/coverage/sqldata.py b/coverage/sqldata.py index 6b3f0dbff..314d4c86b 100644 --- a/coverage/sqldata.py +++ b/coverage/sqldata.py @@ -18,6 +18,7 @@ import sys import textwrap import threading +import uuid import zlib from collections.abc import Collection, Mapping, Sequence from typing import Any, Callable, cast @@ -263,7 +264,7 @@ def _debug_dataio(self, msg: str, filename: str) -> None: def _choose_filename(self) -> None: """Set self._filename based on inited attributes.""" if self._no_disk: - self._filename = ":memory:" + self._filename = f"file:coverage-{uuid.uuid4()}?mode=memory&cache=shared" else: self._filename = self._basename suffix = filename_suffix(self._suffix) @@ -289,7 +290,7 @@ def close(self, force: bool = False) -> None: def _open_db(self) -> None: """Open an existing db file, and read its metadata.""" self._debug_dataio("Opening data file", self._filename) - self._dbs[threading.get_ident()] = SqliteDb(self._filename, self._debug) + self._dbs[threading.get_ident()] = SqliteDb(self._filename, self._debug, self._no_disk) self._read_db() def _read_db(self) -> None: @@ -402,7 +403,7 @@ def loads(self, data: bytes) -> None: f"Unrecognized serialization: {data[:40]!r} (head of {len(data)} bytes)", ) script = zlib.decompress(data[1:]).decode("utf-8") - self._dbs[threading.get_ident()] = db = SqliteDb(self._filename, self._debug) + self._dbs[threading.get_ident()] = db = SqliteDb(self._filename, self._debug, self._no_disk) with db: db.executescript(script) self._read_db() diff --git a/coverage/sqlitedb.py b/coverage/sqlitedb.py index 544594eb8..c80702290 100644 --- a/coverage/sqlitedb.py +++ b/coverage/sqlitedb.py @@ -28,9 +28,10 @@ class SqliteDb: etc(a, b) """ - def __init__(self, filename: str, debug: TDebugCtl) -> None: + def __init__(self, filename: str, debug: TDebugCtl, no_disk: bool = False) -> None: self.debug = debug self.filename = filename + self.no_disk = no_disk self.nest = 0 self.con: sqlite3.Connection | None = None @@ -49,7 +50,11 @@ def _connect(self) -> None: if self.debug.should("sql"): self.debug.write(f"Connecting to {self.filename!r}") try: - self.con = sqlite3.connect(self.filename, check_same_thread=False) + # Use uri=True when connecting to memory URIs + if self.filename.startswith("file:"): + self.con = sqlite3.connect(self.filename, check_same_thread=False, uri=True) + else: + self.con = sqlite3.connect(self.filename, check_same_thread=False) except sqlite3.Error as exc: raise DataError(f"Couldn't use data file {self.filename!r}: {exc}") from exc @@ -78,7 +83,7 @@ def _connect(self) -> None: def close(self, force: bool = False) -> None: """If needed, close the connection.""" if self.con is not None: - if force or self.filename != ":memory:": + if force or not self.no_disk: if self.debug.should("sql"): self.debug.write(f"Closing {self.con!r} on {self.filename!r}") self.con.close() @@ -120,7 +125,7 @@ def _execute(self, sql: str, parameters: Iterable[Any]) -> sqlite3.Cursor: return self.con.execute(sql, parameters) # type: ignore[arg-type] except sqlite3.Error as exc: msg = str(exc) - if self.filename != ":memory:": + if not self.no_disk: try: # `execute` is the first thing we do with the database, so try # hard to provide useful hints if something goes wrong now.