diff --git a/tests/test_cli.py b/tests/test_cli.py index b2d93ac3..3449959e 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -1,6 +1,8 @@ import datetime import sys from os.path import exists +from os.path import isdir +from pathlib import Path from unittest import mock from unittest.mock import call from unittest.mock import patch @@ -468,8 +470,8 @@ def test_edit_move(runner, todo_factory, default_database, tmpdir, todos): tmpdir.mkdir("another_list") default_database.paths = [ - str(tmpdir.join("default")), - str(tmpdir.join("another_list")), + Path(tmpdir.join("default")), + Path(tmpdir.join("another_list")), ] default_database.update_cache() diff --git a/tests/test_model.py b/tests/test_model.py index 922d0bbe..c0a2bbae 100644 --- a/tests/test_model.py +++ b/tests/test_model.py @@ -1,6 +1,7 @@ from datetime import date from datetime import datetime from datetime import timedelta +from pathlib import Path from unittest.mock import patch import pytest @@ -65,7 +66,7 @@ def test_change_paths(tmpdir, create): assert {t.summary for t in db.todos()} == old_todos - db.paths = [str(tmpdir.join("3"))] + db.paths = [Path(tmpdir.join("3"))] db.update_cache() assert len(list(db.lists())) == 1 diff --git a/tests/test_ui.py b/tests/test_ui.py index 02dac42b..4950d348 100644 --- a/tests/test_ui.py +++ b/tests/test_ui.py @@ -1,4 +1,5 @@ from datetime import datetime +from pathlib import Path from unittest import mock import pytest @@ -27,8 +28,8 @@ def test_todo_editor_list(default_database, todo_factory, default_formatter, tmp tmpdir.mkdir("another_list") default_database.paths = [ - str(tmpdir.join("default")), - str(tmpdir.join("another_list")), + Path(tmpdir.join("default")), + Path(tmpdir.join("another_list")), ] default_database.update_cache() diff --git a/todoman/model.py b/todoman/model.py index f324d7ad..3ef71a57 100644 --- a/todoman/model.py +++ b/todoman/model.py @@ -8,8 +8,8 @@ from datetime import datetime from datetime import time from datetime import timedelta -from os.path import normpath -from os.path import split +from pathlib import Path +from pathlib import PosixPath from typing import Iterable from uuid import uuid4 @@ -29,7 +29,20 @@ LOCAL_TIMEZONE = tzlocal() -class cached_property: +def register_adapters_and_converters(): + sqlite3.register_adapter(Path, str) + sqlite3.register_adapter(PosixPath, str) + + sqlite3.register_converter("path", lambda p: Path(p.decode())) + sqlite3.register_converter( + "timestamp", lambda d: datetime.fromtimestamp(float(d), LOCAL_TIMEZONE) + ) + + +register_adapters_and_converters() + + +class cached_property: # noqa """A read-only @property that is only evaluated once. Only usable on class instances' methods. """ @@ -274,7 +287,7 @@ def path(self) -> str: if not self.list: raise ValueError("A todo without a list does not have a path.") - return os.path.join(self.list.path, self.filename) + return self.list.path.joinpath(self.filename) def cancel(self) -> None: self.status = "CANCELLED" @@ -373,7 +386,7 @@ def _read(self, path): return component def write(self): - if os.path.exists(self.todo.path): + if self.todo.path.exists(): self._write_existing(self.todo.path) else: self._write_new(self.todo.path) @@ -422,11 +435,15 @@ class Cache: SCHEMA_VERSION = 7 - def __init__(self, path: str): - self.cache_path = str(path) - os.makedirs(os.path.dirname(self.cache_path), exist_ok=True) + def __init__(self, path: Path): + self.cache_path = path + # XXX: Use the below once we drop python3.4 + self.cache_path.parent.mkdir(parents=True, exist_ok=True) - self._conn = sqlite3.connect(self.cache_path) + self._conn = sqlite3.connect( + str(self.cache_path), + detect_types=sqlite3.PARSE_DECLTYPES, + ) self._conn.row_factory = sqlite3.Row self._conn.execute("PRAGMA foreign_keys = ON") @@ -468,7 +485,7 @@ def create_tables(self): """ CREATE TABLE IF NOT EXISTS lists ( "name" TEXT PRIMARY KEY, - "path" TEXT, + "path" path, "colour" TEXT, "mtime" INTEGER, @@ -480,7 +497,7 @@ def create_tables(self): self._conn.execute( """ CREATE TABLE IF NOT EXISTS files ( - "path" TEXT PRIMARY KEY, + "path" path PRIMARY KEY, "list_name" TEXT, "mtime" INTEGER, @@ -493,20 +510,20 @@ def create_tables(self): self._conn.execute( """ CREATE TABLE IF NOT EXISTS todos ( - "file_path" TEXT, + "file_path" path, "id" INTEGER PRIMARY KEY, "uid" TEXT, "summary" TEXT, - "due" INTEGER, + "due" TIMESTAMP, "due_dt" INTEGER, - "start" INTEGER, + "start" TIMESTAMP, "start_dt" INTEGER, "priority" INTEGER, - "created_at" INTEGER, - "completed_at" INTEGER, + "created_at" timestamp, + "completed_at" timestamp, "percent_complete" INTEGER, - "dtstamp" INTEGER, + "dtstamp" timestamp, "status" TEXT, "description" TEXT, "location" TEXT, @@ -522,7 +539,7 @@ def create_tables(self): def clear(self): self._conn.close() - os.remove(self.cache_path) + self.cache_path.unlink self._conn = None def add_list(self, name: str, path: str, colour: str, mtime: int): @@ -841,32 +858,17 @@ def todos( seen_paths.add(path) yield todo - def _datetime_from_db(self, dt) -> datetime | None: - if dt: - return datetime.fromtimestamp(dt, LOCAL_TIMEZONE) - return None - - def _date_from_db(self, dt, is_date=False) -> date | None: - """Deserialise a date (possible datetime).""" - if not dt: - return dt - - if is_date: - return datetime.fromtimestamp(dt, LOCAL_TIMEZONE).date() - else: - return datetime.fromtimestamp(dt, LOCAL_TIMEZONE) - def _todo_from_db(self, row: dict) -> Todo: todo = Todo() todo.id = row["id"] todo.uid = row["uid"] todo.summary = row["summary"] - todo.due = self._date_from_db(row["due"], row["due_dt"]) - todo.start = self._date_from_db(row["start"], row["start_dt"]) + todo.due = row["due"] + todo.start = row["start"] todo.priority = row["priority"] - todo.created_at = self._datetime_from_db(row["created_at"]) - todo.completed_at = self._datetime_from_db(row["completed_at"]) - todo.dtstamp = self._datetime_from_db(row["dtstamp"]) + todo.created_at = row["created_at"] + todo.completed_at = row["completed_at"] + todo.dtstamp = row["dtstamp"] todo.percent_complete = row["percent_complete"] todo.status = row["status"] todo.description = row["description"] @@ -874,7 +876,7 @@ def _todo_from_db(self, row: dict) -> Todo: todo.sequence = row["sequence"] todo.last_modified = row["last_modified"] todo.list = self.lists_map[row["list_name"]] - todo.filename = os.path.basename(row["path"]) + todo.filename = row["path"].name todo.rrule = row["rrule"] return todo @@ -955,18 +957,16 @@ def __init__(self, name: str, path: str, colour: str = None): @staticmethod def colour_for_path(path: str) -> str | None: try: - with open(os.path.join(path, "color")) as f: - return f.read().strip() + return path.joinpath("color").read_text().strip() except OSError: logger.debug("No colour for list %s", path) @staticmethod def name_for_path(path: str) -> str: try: - with open(os.path.join(path, "displayname")) as f: - return f.read().strip() + return path.joinpath("displayname").read_text().strip() except OSError: - return split(normpath(path))[1] + return path.name @staticmethod def mtime_for_path(path: str) -> int: @@ -1003,8 +1003,8 @@ class Database: """ def __init__(self, paths, cache_path): - self.cache = Cache(cache_path) - self.paths = [str(path) for path in paths] + self.cache = Cache(Path(cache_path)) + self.paths = [Path(path) for path in paths] self.update_cache() def update_cache(self) -> None: @@ -1021,13 +1021,11 @@ def update_cache(self) -> None: TodoList.colour_for_path(path), paths[path], ) - for entry in os.listdir(path): - if not entry.endswith(".ics"): + for entry in path.iterdir(): + if not entry.name.endswith(".ics"): continue - entry_path = os.path.join(path, entry) - mtime = _getmtime(entry_path) - paths_to_mtime[entry_path] = mtime - paths_to_list_name[entry_path] = list_name + paths_to_mtime[entry] = _getmtime(entry) + paths_to_list_name[entry] = list_name self.cache.expire_files(paths_to_mtime) @@ -1041,10 +1039,10 @@ def update_cache(self) -> None: continue try: - with open(entry_path, "rb") as f: - cal = icalendar.Calendar.from_ical(f.read()) - for component in cal.walk("VTODO"): - self.cache.add_vtodo(component, entry_path) + data = entry_path.read_bytes() + cal = icalendar.Calendar.from_ical(data) + for component in cal.walk("VTODO"): + self.cache.add_vtodo(component, entry_path) except Exception: logger.exception("Failed to read entry %s.", entry_path) @@ -1060,17 +1058,16 @@ def lists(self) -> Iterable[TodoList]: return self.cache.lists() def move(self, todo: Todo, new_list: TodoList, from_list: TodoList) -> None: - orig_path = os.path.join(from_list.path, todo.filename) - dest_path = os.path.join(new_list.path, todo.filename) + orig_path = from_list.path.joinpath(todo.filename) + dest_path = new_list.path.joinpath(todo.filename) - os.rename(orig_path, dest_path) + orig_path.rename(dest_path) def delete(self, todo: Todo) -> None: if not todo.list: raise ValueError("Cannot delete Todo without a list.") - path = os.path.join(todo.list.path, todo.filename) - os.remove(path) + todo.list.path.joinpath(todo.filename).unlink() def flush(self) -> Iterable[Todo]: for todo in self.todos(status=["ANY"]): @@ -1102,5 +1099,5 @@ def save(self, todo: Todo) -> None: def _getmtime(path: str) -> int: - stat = os.stat(path) + stat = path.stat() return getattr(stat, "st_mtime_ns", stat.st_mtime)