Skip to content

Commit f07a4cd

Browse files
authored
Add rich based spinner (#13451)
The benefit of this spinner over our legacy spinners is that Rich will update and render the spinner automatically for us. This is much nicer than having to call .spin() manually.
1 parent b817b2b commit f07a4cd

File tree

2 files changed

+142
-4
lines changed

2 files changed

+142
-4
lines changed

src/pip/_internal/cli/spinners.py

Lines changed: 77 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,26 @@
66
import sys
77
import time
88
from collections.abc import Generator
9-
from typing import IO
9+
from typing import IO, Final
10+
11+
from pip._vendor.rich.console import (
12+
Console,
13+
ConsoleOptions,
14+
RenderableType,
15+
RenderResult,
16+
)
17+
from pip._vendor.rich.live import Live
18+
from pip._vendor.rich.measure import Measurement
19+
from pip._vendor.rich.text import Text
1020

1121
from pip._internal.utils.compat import WINDOWS
12-
from pip._internal.utils.logging import get_indentation
22+
from pip._internal.utils.logging import get_console, get_indentation
1323

1424
logger = logging.getLogger(__name__)
1525

26+
SPINNER_CHARS: Final = r"-\|/"
27+
SPINS_PER_SECOND: Final = 8
28+
1629

1730
class SpinnerInterface:
1831
def spin(self) -> None:
@@ -27,9 +40,9 @@ def __init__(
2740
self,
2841
message: str,
2942
file: IO[str] | None = None,
30-
spin_chars: str = "-\\|/",
43+
spin_chars: str = SPINNER_CHARS,
3144
# Empirically, 8 updates/second looks nice
32-
min_update_interval_seconds: float = 0.125,
45+
min_update_interval_seconds: float = 1 / SPINS_PER_SECOND,
3346
):
3447
self._message = message
3548
if file is None:
@@ -139,6 +152,66 @@ def open_spinner(message: str) -> Generator[SpinnerInterface, None, None]:
139152
spinner.finish("done")
140153

141154

155+
class _PipRichSpinner:
156+
"""
157+
Custom rich spinner that matches the style of the legacy spinners.
158+
159+
(*) Updates will be handled in a background thread by a rich live panel
160+
which will call render() automatically at the appropriate time.
161+
"""
162+
163+
def __init__(self, label: str) -> None:
164+
self.label = label
165+
self._spin_cycle = itertools.cycle(SPINNER_CHARS)
166+
self._spinner_text = ""
167+
self._finished = False
168+
self._indent = get_indentation() * " "
169+
170+
def __rich_console__(
171+
self, console: Console, options: ConsoleOptions
172+
) -> RenderResult:
173+
yield self.render()
174+
175+
def __rich_measure__(
176+
self, console: Console, options: ConsoleOptions
177+
) -> Measurement:
178+
text = self.render()
179+
return Measurement.get(console, options, text)
180+
181+
def render(self) -> RenderableType:
182+
if not self._finished:
183+
self._spinner_text = next(self._spin_cycle)
184+
185+
return Text.assemble(self._indent, self.label, " ... ", self._spinner_text)
186+
187+
def finish(self, status: str) -> None:
188+
"""Stop spinning and set a final status message."""
189+
self._spinner_text = status
190+
self._finished = True
191+
192+
193+
@contextlib.contextmanager
194+
def open_rich_spinner(label: str, console: Console | None = None) -> Generator[None]:
195+
if not logger.isEnabledFor(logging.INFO):
196+
# Don't show spinner if --quiet is given.
197+
yield
198+
return
199+
200+
console = console or get_console()
201+
spinner = _PipRichSpinner(label)
202+
with Live(spinner, refresh_per_second=SPINS_PER_SECOND, console=console):
203+
try:
204+
yield
205+
except KeyboardInterrupt:
206+
spinner.finish("canceled")
207+
raise
208+
except Exception:
209+
spinner.finish("error")
210+
raise
211+
else:
212+
spinner.finish("done")
213+
214+
142215
HIDE_CURSOR = "\x1b[?25l"
143216
SHOW_CURSOR = "\x1b[?25h"
144217

tests/unit/test_cli_spinners.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
from collections.abc import Generator
5+
from contextlib import contextmanager
6+
from io import StringIO
7+
from typing import Callable
8+
from unittest.mock import Mock
9+
10+
import pytest
11+
12+
from pip._vendor.rich.console import Console
13+
14+
from pip._internal.cli import spinners
15+
from pip._internal.cli.spinners import open_rich_spinner
16+
17+
18+
@contextmanager
19+
def patch_logger_level(level: int) -> Generator[None]:
20+
"""Patch the spinner logger level temporarily."""
21+
original_level = spinners.logger.level
22+
spinners.logger.setLevel(level)
23+
try:
24+
yield
25+
finally:
26+
spinners.logger.setLevel(original_level)
27+
28+
29+
class TestRichSpinner:
30+
@pytest.mark.parametrize(
31+
"status, func",
32+
[
33+
("done", lambda: None),
34+
("error", lambda: 1 / 0),
35+
("canceled", Mock(side_effect=KeyboardInterrupt)),
36+
],
37+
)
38+
def test_finish(self, status: str, func: Callable[[], None]) -> None:
39+
"""
40+
Check that the spinner finish message is set correctly depending
41+
on how the spinner came to a stop.
42+
"""
43+
stream = StringIO()
44+
try:
45+
with patch_logger_level(logging.INFO):
46+
with open_rich_spinner("working", Console(file=stream)):
47+
func()
48+
except BaseException:
49+
pass
50+
51+
output = stream.getvalue()
52+
assert output == f"working ... {status}"
53+
54+
@pytest.mark.parametrize(
55+
"level, visible",
56+
[(logging.ERROR, False), (logging.INFO, True), (logging.DEBUG, True)],
57+
)
58+
def test_verbosity(self, level: int, visible: bool) -> None:
59+
"""Is the spinner hidden at the appropriate verbosity?"""
60+
stream = StringIO()
61+
with patch_logger_level(level):
62+
with open_rich_spinner("working", Console(file=stream)):
63+
pass
64+
65+
assert bool(stream.getvalue()) == visible

0 commit comments

Comments
 (0)