diff --git a/beetsplug/replace.py b/beetsplug/replace.py index 0c570877b8..e1ceceec83 100644 --- a/beetsplug/replace.py +++ b/beetsplug/replace.py @@ -1,3 +1,4 @@ +import optparse import shutil from pathlib import Path @@ -5,6 +6,7 @@ from beets import ui, util from beets.library import Item, Library +from beets.library.exceptions import FileOperationError from beets.plugins import BeetsPlugin @@ -16,7 +18,9 @@ def commands(self): cmd.func = self.run return [cmd] - def run(self, lib: Library, args: list[str]) -> None: + def run( + self, lib: Library, _opts: optparse.Values, args: list[str] + ) -> None: if len(args) < 2: raise ui.UserError("Usage: beet replace ") @@ -54,9 +58,9 @@ def file_check(self, filepath: Path) -> None: except mediafile.FileTypeError as fte: raise ui.UserError(fte) - def select_song(self, items: list[Item]): + def select_song(self, items: list[Item]) -> Item | None: """Present a menu of matching songs and get user selection.""" - ui.print_("\nMatching songs:") + ui.print_("Matching songs:") for i, item in enumerate(items, 1): ui.print_(f"{i}. {util.displayable_path(item)}") @@ -79,7 +83,7 @@ def select_song(self, items: list[Item]): except ValueError: ui.print_("Invalid input. Please type in a number.") - def confirm_replacement(self, new_file_path: Path, song: Item): + def confirm_replacement(self, new_file_path: Path, song: Item) -> bool: """Get user confirmation for the replacement.""" original_file_path: Path = Path(song.path.decode()) @@ -104,7 +108,7 @@ def replace_file(self, new_file_path: Path, song: Item) -> None: try: shutil.move(util.syspath(new_file_path), util.syspath(dest)) - except Exception as e: + except OSError as e: raise ui.UserError(f"Error replacing file: {e}") if ( @@ -113,10 +117,17 @@ def replace_file(self, new_file_path: Path, song: Item) -> None: ): try: original_file_path.unlink() - except Exception as e: + except OSError as e: raise ui.UserError(f"Could not delete original file: {e}") + # Update the path to point to the new file. song.path = str(dest).encode() song.store() + # Write the metadata in the database to the song file's tags. + try: + song.write() + except FileOperationError as e: + raise ui.UserError(f"Error writing metadata to file: {e}") + ui.print_("Replacement successful.") diff --git a/docs/plugins/replace.rst b/docs/plugins/replace.rst index 7216f83991..931b018afb 100644 --- a/docs/plugins/replace.rst +++ b/docs/plugins/replace.rst @@ -15,5 +15,8 @@ and then type: The plugin will show you a list of files for you to pick from, and then ask for confirmation. +The file you pick will be replaced with the file at `path`, and the tags in the +database will be written to that file's metadata. + Consider using the ``replaygain`` command from the :doc:`/plugins/replaygain` plugin, if you usually use it during imports. diff --git a/test/plugins/test_replace.py b/test/plugins/test_replace.py index a247e317ad..11d7d85cff 100644 --- a/test/plugins/test_replace.py +++ b/test/plugins/test_replace.py @@ -1,50 +1,131 @@ +import optparse import shutil +from collections.abc import Generator from pathlib import Path +from unittest.mock import Mock import pytest from mediafile import MediaFile from beets import ui +from beets.library import Item, Library +from beets.library.exceptions import WriteError from beets.test import _common +from beets.test.helper import TestHelper from beetsplug.replace import ReplacePlugin replace = ReplacePlugin() +def always(x): + return lambda *args, **kwargs: x + + +def always_raise(x): + def err(*args, **kwargs): + raise x + + return err + + class TestReplace: - @pytest.fixture(autouse=True) - def _fake_dir(self, tmp_path): - self.fake_dir = tmp_path + @pytest.fixture + def mp3_file(self, tmp_path) -> Path: + dest = tmp_path / "full.mp3" + src = Path(_common.RSRC.decode()) / "full.mp3" + shutil.copyfile(src, dest) + + mediafile = MediaFile(dest) + mediafile.title = "AAA" + mediafile.save() + + return dest + + @pytest.fixture + def opus_file(self, tmp_path) -> Path: + dest = tmp_path / "full.opus" + src = Path(_common.RSRC.decode()) / "full.opus" + shutil.copyfile(src, dest) + + return dest + + @pytest.fixture + def library(self) -> Generator[Library]: + helper = TestHelper() + helper.setup_beets() + + yield helper.lib + + helper.teardown_beets() + + def test_run_replace_too_few_args(self): + with pytest.raises(ui.UserError): + replace.run(None, optparse.Values(), []) + + def test_run_replace_no_matches(self, library): + with pytest.raises(ui.UserError): + replace.run(library, optparse.Values(), ["BBB", ""]) + + def test_run_replace_no_song_selected( + self, library, mp3_file, opus_file, monkeypatch + ): + monkeypatch.setattr(replace, "file_check", always(None)) + monkeypatch.setattr(replace, "select_song", always(None)) + + item = Item.from_path(mp3_file) + library.add(item) + + replace.run(library, optparse.Values(), ["AAA", str(opus_file)]) + + assert mp3_file.exists() + assert opus_file.exists() + + def test_run_replace_not_confirmed( + self, library, mp3_file, opus_file, monkeypatch + ): + monkeypatch.setattr(replace, "file_check", always(None)) + monkeypatch.setattr(replace, "confirm_replacement", always(False)) + + item = Item.from_path(mp3_file) + library.add(item) - @pytest.fixture(autouse=True) - def _fake_file(self, tmp_path): - self.fake_file = tmp_path + monkeypatch.setattr(replace, "select_song", always(item)) - def test_path_is_dir(self): - fake_directory = self.fake_dir / "fakeDir" + replace.run(library, optparse.Values(), ["AAA", str(opus_file)]) + + assert mp3_file.exists() + assert opus_file.exists() + + def test_run_replace(self, library, mp3_file, opus_file, monkeypatch): + replace_file = Mock(replace.replace_file, return_value=None) + monkeypatch.setattr(replace, "replace_file", replace_file) + + monkeypatch.setattr(replace, "file_check", always(None)) + monkeypatch.setattr(replace, "confirm_replacement", always(True)) + + item = Item.from_path(mp3_file) + library.add(item) + + monkeypatch.setattr(replace, "select_song", always(item)) + + replace.run(library, optparse.Values(), ["AAA", str(opus_file)]) + + replace_file.assert_called_once() + + def test_path_is_dir(self, tmp_path): + fake_directory = tmp_path / "fakeDir" fake_directory.mkdir() with pytest.raises(ui.UserError): replace.file_check(fake_directory) - def test_path_is_unsupported_file(self): - fake_file = self.fake_file / "fakefile.txt" + def test_path_is_unsupported_file(self, tmp_path): + fake_file = tmp_path / "fakefile.txt" fake_file.write_text("test", encoding="utf-8") with pytest.raises(ui.UserError): replace.file_check(fake_file) - def test_path_is_supported_file(self): - dest = self.fake_file / "full.mp3" - src = Path(_common.RSRC.decode()) / "full.mp3" - shutil.copyfile(src, dest) - - mediafile = MediaFile(dest) - mediafile.albumartist = "AAA" - mediafile.disctitle = "DDD" - mediafile.genres = ["a", "b", "c"] - mediafile.composer = None - mediafile.save() - - replace.file_check(Path(str(dest))) + def test_path_is_supported_file(self, mp3_file): + replace.file_check(mp3_file) def test_select_song_valid_choice(self, monkeypatch, capfd): songs = ["Song A", "Song B", "Song C"] @@ -113,3 +194,61 @@ class Song: song = Song() assert replace.confirm_replacement("test", song) is False + + def test_replace_file_move_fails(self, tmp_path): + item = Item() + item.path = bytes(tmp_path / "not_a_song.mp3") + + with pytest.raises(ui.UserError): + replace.replace_file(tmp_path / "not_a_file.opus", item) + + def test_replace_file_delete_fails( + self, library, mp3_file, opus_file, monkeypatch + ): + monkeypatch.setattr(Path, "unlink", always_raise(OSError)) + + item = Item.from_path(mp3_file) + library.add(item) + + with pytest.raises(ui.UserError): + replace.replace_file(opus_file, item) + + def test_replace_file_write_fails( + self, library, mp3_file, opus_file, monkeypatch + ): + monkeypatch.setattr( + Item, "write", always_raise(WriteError("path", "reason")) + ) + + item = Item.from_path(mp3_file) + library.add(item) + + with pytest.raises(ui.UserError): + replace.replace_file(opus_file, item) + + def test_replace_file( + self, mp3_file: Path, opus_file: Path, library: Library + ): + old_mediafile = MediaFile(mp3_file) + old_mediafile.albumartist = "ABC" + old_mediafile.disctitle = "DEF" + old_mediafile.genre = "GHI" + old_mediafile.save() + + item = Item.from_path(mp3_file) + library.add(item) + + replace.replace_file(opus_file, item) + + # Check that the file has been replaced. + assert opus_file.exists() + assert not mp3_file.exists() + + # Check that the database path has been updated. + assert item.path == bytes(opus_file) + + # Check that the new file has the old file's metadata. + new_mediafile = MediaFile(opus_file) + assert new_mediafile.albumartist == "ABC" + assert new_mediafile.disctitle == "DEF" + assert new_mediafile.genre == "GHI"