Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions Lib/sqlite3/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
from argparse import ArgumentParser
from code import InteractiveConsole
from textwrap import dedent
from _colorize import get_theme, theme_no_color


def execute(c, sql, suppress_errors=True):
def execute(c, sql, suppress_errors=True, theme=theme_no_color):
"""Helper that wraps execution of SQL code.

This is used both by the REPL and by direct execution from the CLI.
Expand All @@ -25,22 +25,28 @@ def execute(c, sql, suppress_errors=True):
for row in c.execute(sql):
print(row)
except sqlite3.Error as e:
t = theme.traceback
tp = type(e).__name__
try:
print(f"{tp} ({e.sqlite_errorname}): {e}", file=sys.stderr)
tp += f" ({e.sqlite_errorname})"
except AttributeError:
print(f"{tp}: {e}", file=sys.stderr)
pass
print(
f"{t.type}{tp}{t.reset}: {t.message}{e}{t.reset}",
file=sys.stderr,
)
if not suppress_errors:
sys.exit(1)


class SqliteInteractiveConsole(InteractiveConsole):
"""A simple SQLite REPL."""

def __init__(self, connection):
def __init__(self, connection, use_color=False):
super().__init__()
self._con = connection
self._cur = connection.cursor()
self._use_color = use_color

def runsource(self, source, filename="<input>", symbol="single"):
"""Override runsource, the core of the InteractiveConsole REPL.
Expand All @@ -58,7 +64,8 @@ def runsource(self, source, filename="<input>", symbol="single"):
case _:
if not sqlite3.complete_statement(source):
return True
execute(self._cur, source)
theme = get_theme(force_no_color=not self._use_color)
execute(self._cur, source, theme=theme)
return False


Expand Down Expand Up @@ -105,8 +112,11 @@ def main(*args):
Each command will be run using execute() on the cursor.
Type ".help" for more information; type ".quit" or {eofkey} to quit.
""").strip()
sys.ps1 = "sqlite> "
sys.ps2 = " ... "

s = get_theme().syntax

sys.ps1 = f"{s.prompt}sqlite> {s.reset}"
sys.ps2 = f"{s.prompt} ... {s.reset}"

con = sqlite3.connect(args.filename, isolation_level=None)
try:
Expand All @@ -115,7 +125,7 @@ def main(*args):
execute(con, args.sql, suppress_errors=False)
else:
# No SQL provided; start the REPL.
console = SqliteInteractiveConsole(con)
console = SqliteInteractiveConsole(con, use_color=True)
try:
import readline # noqa: F401
except ImportError:
Expand Down
10 changes: 10 additions & 0 deletions Lib/test/test_sqlite3/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
captured_stderr,
captured_stdin,
force_not_colorized,
force_not_colorized_test_class,
)


Expand Down Expand Up @@ -69,6 +70,7 @@ def test_cli_on_disk_db(self):
self.assertIn("(0,)", out)


@force_not_colorized_test_class
class InteractiveSession(unittest.TestCase):
MEMORY_DB_MSG = "Connected to a transient in-memory database"
PS1 = "sqlite> "
Expand Down Expand Up @@ -158,6 +160,14 @@ def test_interact_on_disk_file(self):
out, _ = self.run_cli(TESTFN, commands=("SELECT count(t) FROM t;",))
self.assertIn("(0,)\n", out)

def test_color(self):
with unittest.mock.patch("_colorize.can_colorize", return_value=True):
out, err = self.run_cli(commands="\n")
self.assertIn("\x1b[1;35msqlite> \x1b[0m", out)
self.assertIn("\x1b[1;35m ... \x1b[0m\x1b", out)
out, err = self.run_cli(commands=("sel;",))
self.assertIn('\x1b[1;35mOperationalError (SQLITE_ERROR)\x1b[0m: '
'\x1b[35mnear "sel": syntax error\x1b[0m', err)

if __name__ == "__main__":
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add basic color to :mod:`sqlite3` CLI interface.
Loading