|
30 | 30 | from collections import Counter
|
31 | 31 | from contextlib import suppress
|
32 | 32 | from enum import Enum
|
| 33 | +from functools import cache |
33 | 34 | from importlib import import_module
|
34 | 35 | from multiprocessing.pool import ThreadPool
|
35 | 36 | from pathlib import Path
|
|
47 | 48 |
|
48 | 49 | from unidecode import unidecode
|
49 | 50 |
|
| 51 | +import beets |
50 | 52 | from beets.util import hidden
|
51 | 53 |
|
52 | 54 | if TYPE_CHECKING:
|
@@ -694,105 +696,87 @@ def sanitize_path(path: str, replacements: Replacements | None = None) -> str:
|
694 | 696 | return os.path.join(*comps)
|
695 | 697 |
|
696 | 698 |
|
697 |
| -def truncate_path(path: AnyStr, length: int = MAX_FILENAME_LENGTH) -> AnyStr: |
698 |
| - """Given a bytestring path or a Unicode path fragment, truncate the |
699 |
| - components to a legal length. In the last component, the extension |
700 |
| - is preserved. |
| 699 | +def truncate_str(s: str, length: int) -> str: |
| 700 | + """Truncate the string to the given byte length. |
| 701 | +
|
| 702 | + If we end up truncating a unicode character in the middle (rendering it invalid), |
| 703 | + it is removed: |
| 704 | +
|
| 705 | + >>> s = "🎹🎶" # 8 bytes |
| 706 | + >>> truncate_str(s, 6) |
| 707 | + '🎹' |
701 | 708 | """
|
702 |
| - comps = components(path) |
| 709 | + return os.fsencode(s)[:length].decode(sys.getfilesystemencoding(), "ignore") |
703 | 710 |
|
704 |
| - out = [c[:length] for c in comps] |
705 |
| - base, ext = os.path.splitext(comps[-1]) |
706 |
| - if ext: |
707 |
| - # Last component has an extension. |
708 |
| - base = base[: length - len(ext)] |
709 |
| - out[-1] = base + ext |
710 | 711 |
|
711 |
| - return os.path.join(*out) |
| 712 | +def truncate_path(str_path: str) -> str: |
| 713 | + """Truncate each path part to a legal length preserving the extension.""" |
| 714 | + max_length = get_max_filename_length() |
| 715 | + path = Path(str_path) |
| 716 | + parent_parts = [truncate_str(p, max_length) for p in path.parts[:-1]] |
| 717 | + stem = truncate_str(path.stem, max_length - len(path.suffix)) |
| 718 | + return str(Path(*parent_parts, stem).with_suffix(path.suffix)) |
712 | 719 |
|
713 | 720 |
|
714 | 721 | def _legalize_stage(
|
715 |
| - path: str, |
716 |
| - replacements: Replacements | None, |
717 |
| - length: int, |
718 |
| - extension: str, |
719 |
| - fragment: bool, |
720 |
| -) -> tuple[BytesOrStr, bool]: |
| 722 | + path: str, replacements: Replacements | None, extension: str |
| 723 | +) -> tuple[str, bool]: |
721 | 724 | """Perform a single round of path legalization steps
|
722 |
| - (sanitation/replacement, encoding from Unicode to bytes, |
723 |
| - extension-appending, and truncation). Return the path (Unicode if |
724 |
| - `fragment` is set, `bytes` otherwise) and whether truncation was |
725 |
| - required. |
| 725 | + 1. sanitation/replacement |
| 726 | + 2. appending the extension |
| 727 | + 3. truncation. |
| 728 | +
|
| 729 | + Return the path and whether truncation was required. |
726 | 730 | """
|
727 | 731 | # Perform an initial sanitization including user replacements.
|
728 | 732 | path = sanitize_path(path, replacements)
|
729 | 733 |
|
730 |
| - # Encode for the filesystem. |
731 |
| - if not fragment: |
732 |
| - path = bytestring_path(path) # type: ignore |
733 |
| - |
734 | 734 | # Preserve extension.
|
735 | 735 | path += extension.lower()
|
736 | 736 |
|
737 | 737 | # Truncate too-long components.
|
738 | 738 | pre_truncate_path = path
|
739 |
| - path = truncate_path(path, length) |
| 739 | + path = truncate_path(path) |
740 | 740 |
|
741 | 741 | return path, path != pre_truncate_path
|
742 | 742 |
|
743 | 743 |
|
744 | 744 | def legalize_path(
|
745 |
| - path: str, |
746 |
| - replacements: Replacements | None, |
747 |
| - length: int, |
748 |
| - extension: bytes, |
749 |
| - fragment: bool, |
750 |
| -) -> tuple[BytesOrStr, bool]: |
751 |
| - """Given a path-like Unicode string, produce a legal path. Return |
752 |
| - the path and a flag indicating whether some replacements had to be |
753 |
| - ignored (see below). |
754 |
| -
|
755 |
| - The legalization process (see `_legalize_stage`) consists of |
756 |
| - applying the sanitation rules in `replacements`, encoding the string |
757 |
| - to bytes (unless `fragment` is set), truncating components to |
758 |
| - `length`, appending the `extension`. |
759 |
| -
|
760 |
| - This function performs up to three calls to `_legalize_stage` in |
761 |
| - case truncation conflicts with replacements (as can happen when |
762 |
| - truncation creates whitespace at the end of the string, for |
763 |
| - example). The limited number of iterations iterations avoids the |
764 |
| - possibility of an infinite loop of sanitation and truncation |
765 |
| - operations, which could be caused by replacement rules that make the |
766 |
| - string longer. The flag returned from this function indicates that |
767 |
| - the path has to be truncated twice (indicating that replacements |
768 |
| - made the string longer again after it was truncated); the |
769 |
| - application should probably log some sort of warning. |
| 745 | + path: str, replacements: Replacements | None, extension: str |
| 746 | +) -> tuple[str, bool]: |
| 747 | + """Given a path-like Unicode string, produce a legal path. Return the path |
| 748 | + and a flag indicating whether some replacements had to be ignored (see |
| 749 | + below). |
| 750 | +
|
| 751 | + This function uses `_legalize_stage` function to legalize the path, see its |
| 752 | + documentation for the details of what this involves. It is called up to |
| 753 | + three times in case truncation conflicts with replacements (as can happen |
| 754 | + when truncation creates whitespace at the end of the string, for example). |
| 755 | +
|
| 756 | + The limited number of iterations avoids the possibility of an infinite loop |
| 757 | + of sanitation and truncation operations, which could be caused by |
| 758 | + replacement rules that make the string longer. |
| 759 | +
|
| 760 | + The flag returned from this function indicates that the path has to be |
| 761 | + truncated twice (indicating that replacements made the string longer again |
| 762 | + after it was truncated); the application should probably log some sort of |
| 763 | + warning. |
770 | 764 | """
|
| 765 | + suffix = as_string(extension) |
771 | 766 |
|
772 |
| - if fragment: |
773 |
| - # Outputting Unicode. |
774 |
| - extension = extension.decode("utf-8", "ignore") |
775 |
| - |
776 |
| - first_stage_path, _ = _legalize_stage( |
777 |
| - path, replacements, length, extension, fragment |
| 767 | + first_stage, _ = os.path.splitext( |
| 768 | + _legalize_stage(path, replacements, suffix)[0] |
778 | 769 | )
|
779 | 770 |
|
780 |
| - # Convert back to Unicode with extension removed. |
781 |
| - first_stage_path, _ = os.path.splitext(displayable_path(first_stage_path)) |
782 |
| - |
783 | 771 | # Re-sanitize following truncation (including user replacements).
|
784 |
| - second_stage_path, retruncated = _legalize_stage( |
785 |
| - first_stage_path, replacements, length, extension, fragment |
786 |
| - ) |
| 772 | + second_stage, truncated = _legalize_stage(first_stage, replacements, suffix) |
787 | 773 |
|
788 |
| - # If the path was once again truncated, discard user replacements |
789 |
| - # and run through one last legalization stage. |
790 |
| - if retruncated: |
791 |
| - second_stage_path, _ = _legalize_stage( |
792 |
| - first_stage_path, None, length, extension, fragment |
793 |
| - ) |
| 774 | + if not truncated: |
| 775 | + return second_stage, False |
794 | 776 |
|
795 |
| - return second_stage_path, retruncated |
| 777 | + # If the path was truncated, discard user replacements |
| 778 | + # and run through one last legalization stage. |
| 779 | + return _legalize_stage(first_stage, None, suffix)[0], True |
796 | 780 |
|
797 | 781 |
|
798 | 782 | def str2bool(value: str) -> bool:
|
@@ -871,16 +855,21 @@ def command_output(cmd: list[BytesOrStr], shell: bool = False) -> CommandOutput:
|
871 | 855 | return CommandOutput(stdout, stderr)
|
872 | 856 |
|
873 | 857 |
|
874 |
| -def max_filename_length(path: BytesOrStr, limit=MAX_FILENAME_LENGTH) -> int: |
| 858 | +@cache |
| 859 | +def get_max_filename_length() -> int: |
875 | 860 | """Attempt to determine the maximum filename length for the
|
876 | 861 | filesystem containing `path`. If the value is greater than `limit`,
|
877 | 862 | then `limit` is used instead (to prevent errors when a filesystem
|
878 | 863 | misreports its capacity). If it cannot be determined (e.g., on
|
879 | 864 | Windows), return `limit`.
|
880 | 865 | """
|
| 866 | + if length := beets.config["max_filename_length"].get(int): |
| 867 | + return length |
| 868 | + |
| 869 | + limit = MAX_FILENAME_LENGTH |
881 | 870 | if hasattr(os, "statvfs"):
|
882 | 871 | try:
|
883 |
| - res = os.statvfs(path) |
| 872 | + res = os.statvfs(beets.config["directory"].as_str()) |
884 | 873 | except OSError:
|
885 | 874 | return limit
|
886 | 875 | return min(res[9], limit)
|
|
0 commit comments