Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions beetsplug/replace.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import optparse
import shutil
from pathlib import Path

import mediafile

from beets import ui, util
from beets.library import Item, Library
from beets.library.exceptions import FileOperationError
from beets.plugins import BeetsPlugin


Expand All @@ -16,7 +18,9 @@ def commands(self):
cmd.func = self.run
return [cmd]

def run(self, lib: Library, args: list[str]) -> None:
def run(
self, lib: Library, _opts: optparse.Values, args: list[str]
) -> None:
if len(args) < 2:
raise ui.UserError("Usage: beet replace <query> <new_file_path>")

Expand Down Expand Up @@ -54,9 +58,9 @@ def file_check(self, filepath: Path) -> None:
except mediafile.FileTypeError as fte:
raise ui.UserError(fte)

def select_song(self, items: list[Item]):
def select_song(self, items: list[Item]) -> Item | None:
"""Present a menu of matching songs and get user selection."""
ui.print_("\nMatching songs:")
ui.print_("Matching songs:")
for i, item in enumerate(items, 1):
ui.print_(f"{i}. {util.displayable_path(item)}")

Expand All @@ -79,7 +83,7 @@ def select_song(self, items: list[Item]):
except ValueError:
ui.print_("Invalid input. Please type in a number.")

def confirm_replacement(self, new_file_path: Path, song: Item):
def confirm_replacement(self, new_file_path: Path, song: Item) -> bool:
"""Get user confirmation for the replacement."""
original_file_path: Path = Path(song.path.decode())

Expand All @@ -104,7 +108,7 @@ def replace_file(self, new_file_path: Path, song: Item) -> None:

try:
shutil.move(util.syspath(new_file_path), util.syspath(dest))
except Exception as e:
except OSError as e:
raise ui.UserError(f"Error replacing file: {e}")

if (
Expand All @@ -113,10 +117,17 @@ def replace_file(self, new_file_path: Path, song: Item) -> None:
):
try:
original_file_path.unlink()
except Exception as e:
except OSError as e:
raise ui.UserError(f"Could not delete original file: {e}")

# Update the path to point to the new file.
song.path = str(dest).encode()
song.store()

# Write the metadata in the database to the song file's tags.
try:
song.write()
except FileOperationError as e:
raise ui.UserError(f"Error writing metadata to file: {e}")

ui.print_("Replacement successful.")
3 changes: 3 additions & 0 deletions docs/plugins/replace.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,8 @@ and then type:
The plugin will show you a list of files for you to pick from, and then ask for
confirmation.

The file you pick will be replaced with the file at `path`, and the tags in the
database will be written to that file's metadata.

Consider using the ``replaygain`` command from the :doc:`/plugins/replaygain`
plugin, if you usually use it during imports.
185 changes: 162 additions & 23 deletions test/plugins/test_replace.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,131 @@
import optparse
import shutil
from collections.abc import Generator
from pathlib import Path
from unittest.mock import Mock

import pytest
from mediafile import MediaFile

from beets import ui
from beets.library import Item, Library
from beets.library.exceptions import WriteError
from beets.test import _common
from beets.test.helper import TestHelper
from beetsplug.replace import ReplacePlugin

replace = ReplacePlugin()


def always(x):
return lambda *args, **kwargs: x


def always_raise(x):
def err(*args, **kwargs):
raise x

return err


class TestReplace:
@pytest.fixture(autouse=True)
def _fake_dir(self, tmp_path):
self.fake_dir = tmp_path
@pytest.fixture
def mp3_file(self, tmp_path) -> Path:
dest = tmp_path / "full.mp3"
src = Path(_common.RSRC.decode()) / "full.mp3"
shutil.copyfile(src, dest)

mediafile = MediaFile(dest)
mediafile.title = "AAA"
mediafile.save()

return dest

@pytest.fixture
def opus_file(self, tmp_path) -> Path:
dest = tmp_path / "full.opus"
src = Path(_common.RSRC.decode()) / "full.opus"
shutil.copyfile(src, dest)

return dest

@pytest.fixture
def library(self) -> Generator[Library]:
helper = TestHelper()
helper.setup_beets()

yield helper.lib

helper.teardown_beets()

def test_run_replace_too_few_args(self):
with pytest.raises(ui.UserError):
replace.run(None, optparse.Values(), [])

def test_run_replace_no_matches(self, library):
with pytest.raises(ui.UserError):
replace.run(library, optparse.Values(), ["BBB", ""])

def test_run_replace_no_song_selected(
self, library, mp3_file, opus_file, monkeypatch
):
monkeypatch.setattr(replace, "file_check", always(None))
monkeypatch.setattr(replace, "select_song", always(None))

item = Item.from_path(mp3_file)
library.add(item)

replace.run(library, optparse.Values(), ["AAA", str(opus_file)])

assert mp3_file.exists()
assert opus_file.exists()

def test_run_replace_not_confirmed(
self, library, mp3_file, opus_file, monkeypatch
):
monkeypatch.setattr(replace, "file_check", always(None))
monkeypatch.setattr(replace, "confirm_replacement", always(False))

item = Item.from_path(mp3_file)
library.add(item)

@pytest.fixture(autouse=True)
def _fake_file(self, tmp_path):
self.fake_file = tmp_path
monkeypatch.setattr(replace, "select_song", always(item))

def test_path_is_dir(self):
fake_directory = self.fake_dir / "fakeDir"
replace.run(library, optparse.Values(), ["AAA", str(opus_file)])

assert mp3_file.exists()
assert opus_file.exists()

def test_run_replace(self, library, mp3_file, opus_file, monkeypatch):
replace_file = Mock(replace.replace_file, return_value=None)
monkeypatch.setattr(replace, "replace_file", replace_file)

monkeypatch.setattr(replace, "file_check", always(None))
monkeypatch.setattr(replace, "confirm_replacement", always(True))

item = Item.from_path(mp3_file)
library.add(item)

monkeypatch.setattr(replace, "select_song", always(item))

replace.run(library, optparse.Values(), ["AAA", str(opus_file)])

replace_file.assert_called_once()

def test_path_is_dir(self, tmp_path):
fake_directory = tmp_path / "fakeDir"
fake_directory.mkdir()
with pytest.raises(ui.UserError):
replace.file_check(fake_directory)

def test_path_is_unsupported_file(self):
fake_file = self.fake_file / "fakefile.txt"
def test_path_is_unsupported_file(self, tmp_path):
fake_file = tmp_path / "fakefile.txt"
fake_file.write_text("test", encoding="utf-8")
with pytest.raises(ui.UserError):
replace.file_check(fake_file)

def test_path_is_supported_file(self):
dest = self.fake_file / "full.mp3"
src = Path(_common.RSRC.decode()) / "full.mp3"
shutil.copyfile(src, dest)

mediafile = MediaFile(dest)
mediafile.albumartist = "AAA"
mediafile.disctitle = "DDD"
mediafile.genres = ["a", "b", "c"]
mediafile.composer = None
mediafile.save()

replace.file_check(Path(str(dest)))
def test_path_is_supported_file(self, mp3_file):
replace.file_check(mp3_file)

def test_select_song_valid_choice(self, monkeypatch, capfd):
songs = ["Song A", "Song B", "Song C"]
Expand Down Expand Up @@ -113,3 +194,61 @@ class Song:
song = Song()

assert replace.confirm_replacement("test", song) is False

def test_replace_file_move_fails(self, tmp_path):
item = Item()
item.path = bytes(tmp_path / "not_a_song.mp3")

with pytest.raises(ui.UserError):
replace.replace_file(tmp_path / "not_a_file.opus", item)

def test_replace_file_delete_fails(
self, library, mp3_file, opus_file, monkeypatch
):
monkeypatch.setattr(Path, "unlink", always_raise(OSError))

item = Item.from_path(mp3_file)
library.add(item)

with pytest.raises(ui.UserError):
replace.replace_file(opus_file, item)

def test_replace_file_write_fails(
self, library, mp3_file, opus_file, monkeypatch
):
monkeypatch.setattr(
Item, "write", always_raise(WriteError("path", "reason"))
)

item = Item.from_path(mp3_file)
library.add(item)

with pytest.raises(ui.UserError):
replace.replace_file(opus_file, item)

def test_replace_file(
self, mp3_file: Path, opus_file: Path, library: Library
):
old_mediafile = MediaFile(mp3_file)
old_mediafile.albumartist = "ABC"
old_mediafile.disctitle = "DEF"
old_mediafile.genre = "GHI"
old_mediafile.save()

item = Item.from_path(mp3_file)
library.add(item)

replace.replace_file(opus_file, item)

# Check that the file has been replaced.
assert opus_file.exists()
assert not mp3_file.exists()

# Check that the database path has been updated.
assert item.path == bytes(opus_file)

# Check that the new file has the old file's metadata.
new_mediafile = MediaFile(opus_file)
assert new_mediafile.albumartist == "ABC"
assert new_mediafile.disctitle == "DEF"
assert new_mediafile.genre == "GHI"