diff --git a/Doc/whatsnew/3.15.rst b/Doc/whatsnew/3.15.rst index 4b176d6c8e6034..efaeffba624bca 100644 --- a/Doc/whatsnew/3.15.rst +++ b/Doc/whatsnew/3.15.rst @@ -452,6 +452,9 @@ sqlite3 details. (Contributed by Stan Ulbrych and Ɓukasz Langa in :gh:`133461`.) + * Table, index, trigger, view, column, function, and schema completion on . + (Contributed by Long Tan in :gh:`136101`.) + ssl --- diff --git a/Lib/sqlite3/__main__.py b/Lib/sqlite3/__main__.py index 093b38c0001387..b3746ed757332f 100644 --- a/Lib/sqlite3/__main__.py +++ b/Lib/sqlite3/__main__.py @@ -143,7 +143,7 @@ def main(*args): execute(con, args.sql, suppress_errors=False, theme=theme) else: # No SQL provided; start the REPL. - with completer(): + with completer(con): console = SqliteInteractiveConsole(con, use_color=True) console.interact(banner, exitmsg="") finally: diff --git a/Lib/sqlite3/_completer.py b/Lib/sqlite3/_completer.py index b3e8c0e5f36208..ba580f968bf92d 100644 --- a/Lib/sqlite3/_completer.py +++ b/Lib/sqlite3/_completer.py @@ -1,3 +1,4 @@ +from _sqlite3 import OperationalError from contextlib import contextmanager try: @@ -10,23 +11,84 @@ _completion_matches = [] -def _complete(text, state): +def _complete(con, text, state): global _completion_matches if state == 0: - if text.startswith('.'): - _completion_matches = [c for c in CLI_COMMANDS if c.startswith(text)] + if text.startswith("."): + _completion_matches = [ + c + " " for c in CLI_COMMANDS if c.startswith(text) + ] else: text_upper = text.upper() - _completion_matches = [c for c in SQLITE_KEYWORDS if c.startswith(text_upper)] + _completion_matches = [ + c + " " for c in SQLITE_KEYWORDS if c.startswith(text_upper) + ] + + cursor = con.cursor() + schemata = tuple(row[1] for row + in cursor.execute("PRAGMA database_list")) + # tables, indexes, triggers, and views + # escape '_' which can appear in attached database names + select_clauses = ( + f"""\ + SELECT name || ' ' FROM \"{schema}\".sqlite_master + WHERE name LIKE REPLACE(:text, '_', '^_') || '%' ESCAPE '^'""" + for schema in schemata + ) + _completion_matches.extend( + row[0] + for row in cursor.execute( + " UNION ".join(select_clauses), {"text": text} + ) + ) + # columns + try: + select_clauses = ( + f"""\ + SELECT pti.name || ' ' FROM "{schema}".sqlite_master AS sm + JOIN pragma_table_xinfo(sm.name,'{schema}') AS pti + WHERE sm.type='table' AND + pti.name LIKE REPLACE(:text, '_', '^_') || '%' ESCAPE '^'""" + for schema in schemata + ) + _completion_matches.extend( + row[0] + for row in cursor.execute( + " UNION ".join(select_clauses), {"text": text} + ) + ) + except OperationalError: + # skip on SQLite<3.16.0 where pragma table-valued function is + # not supported yet + pass + # functions + try: + _completion_matches.extend( + row[0] for row in cursor.execute("""\ + SELECT DISTINCT UPPER(name) || '(' + FROM pragma_function_list() + WHERE name NOT IN ('->', '->>') AND + name LIKE REPLACE(:text, '_', '^_') || '%' ESCAPE '^'""", + {"text": text}, + ) + ) + except OperationalError: + # skip on SQLite<3.30.0 where function_list is not supported yet + pass + # schemata + text_lower = text.lower() + _completion_matches.extend(c for c in schemata + if c.lower().startswith(text_lower)) + _completion_matches = sorted(set(_completion_matches)) try: - return _completion_matches[state] + " " + return _completion_matches[state] except IndexError: return None @contextmanager -def completer(): +def completer(con): try: import readline except ImportError: @@ -34,8 +96,10 @@ def completer(): return old_completer = readline.get_completer() + def complete(text, state): + return _complete(con, text, state) try: - readline.set_completer(_complete) + readline.set_completer(complete) if readline.backend == "editline": # libedit uses "^I" instead of "tab" command_string = "bind ^I rl_complete" diff --git a/Lib/test/test_sqlite3/test_cli.py b/Lib/test/test_sqlite3/test_cli.py index 5926cec0569ddb..dcd05d545b7b77 100644 --- a/Lib/test/test_sqlite3/test_cli.py +++ b/Lib/test/test_sqlite3/test_cli.py @@ -216,10 +216,6 @@ class Completion(unittest.TestCase): @classmethod def setUpClass(cls): - _sqlite3 = import_module("_sqlite3") - if not hasattr(_sqlite3, "SQLITE_KEYWORDS"): - raise unittest.SkipTest("unable to determine SQLite keywords") - readline = import_module("readline") if readline.backend == "editline": raise unittest.SkipTest("libedit readline is not supported") @@ -229,12 +225,24 @@ def write_input(self, input_, env=None): import readline from sqlite3.__main__ import main + # Configure readline to ...: + # - hide control sequences surrounding each candidate + # - hide "Display all xxx possibilities? (y or n)" + # - show candidates one per line readline.parse_and_bind("set colored-completion-prefix off") + readline.parse_and_bind("set completion-query-items 0") + readline.parse_and_bind("set page-completions off") + readline.parse_and_bind("set completion-display-width 0") + main() """) return run_pty(script, input_, env) def test_complete_sql_keywords(self): + _sqlite3 = import_module("_sqlite3") + if not hasattr(_sqlite3, "SQLITE_KEYWORDS"): + raise unittest.SkipTest("unable to determine SQLite keywords") + # List candidates starting with 'S', there should be multiple matches. input_ = b"S\t\tEL\t 1;\n.quit\n" output = self.write_input(input_) @@ -254,6 +262,103 @@ def test_complete_sql_keywords(self): output = self.write_input(input_) self.assertIn(b".version", output) + def test_complete_table_indexes_triggers_views(self): + input_ = textwrap.dedent("""\ + CREATE TABLE _table (id); + CREATE INDEX _index ON _table (id); + CREATE TRIGGER _trigger BEFORE INSERT + ON _table BEGIN SELECT 1; END; + CREATE VIEW _view AS SELECT 1; + + CREATE TEMP TABLE _temp_table (id); + CREATE INDEX temp._temp_index ON _temp_table (id); + CREATE TEMP TRIGGER _temp_trigger BEFORE INSERT + ON _table BEGIN SELECT 1; END; + CREATE TEMP VIEW _temp_view AS SELECT 1; + + ATTACH ':memory:' AS attached; + CREATE TABLE attached._attached_table (id); + CREATE INDEX attached._attached_index ON _attached_table (id); + CREATE TRIGGER attached._attached_trigger BEFORE INSERT + ON _attached_table BEGIN SELECT 1; END; + CREATE VIEW attached._attached_view AS SELECT 1; + + SELECT id FROM _\t\tta\t; + .quit\n""").encode() + output = self.write_input(input_) + lines = output.decode().splitlines() + indices = [i for i, line in enumerate(lines) + if line.startswith(self.PS1)] + start, end = indices[-3], indices[-2] + candidates = [l.strip() for l in lines[start+1:end]] + self.assertEqual(candidates, + [ + "_attached_index", + "_attached_table", + "_attached_trigger", + "_attached_view", + "_index", + "_table", + "_temp_index", + "_temp_table", + "_temp_trigger", + "_temp_view", + "_trigger", + "_view", + ], + ) + + def test_complete_columns(self): + input_ = textwrap.dedent("""\ + CREATE TABLE _table (_col_table); + CREATE TEMP TABLE _temp_table (_col_temp); + ATTACH ':memory:' AS attached; + CREATE TABLE attached._attached_table (_col_attached); + + SELECT _col_\t\tta\tFROM _table; + .quit\n""").encode() + output = self.write_input(input_) + lines = output.decode().splitlines() + indices = [ + i for i, line in enumerate(lines) if line.startswith(self.PS1) + ] + start, end = indices[-3], indices[-2] + candidates = [l.strip() for l in lines[start+1:end]] + + self.assertEqual( + candidates, ["_col_attached", "_col_table", "_col_temp"] + ) + + def test_complete_functions(self): + input_ = b"SELECT AV\t1);\n.quit\n" + output = self.write_input(input_) + self.assertIn(b"AVG(1);", output) + self.assertIn(b"(1.0,)", output) + + # Functions are completed in upper case for even lower case user input. + input_ = b"SELECT av\t1);\n.quit\n" + output = self.write_input(input_) + self.assertIn(b"AVG(1);", output) + self.assertIn(b"(1.0,)", output) + + def test_complete_schemata(self): + input_ = textwrap.dedent("""\ + ATTACH ':memory:' AS _attached; + CREATE TEMP TABLE _table (id); + + SELECT * FROM \t\t_att\t.sqlite_master; + .quit\n""").encode() + output = self.write_input(input_) + lines = output.decode().splitlines() + indices = [ + i for i, line in enumerate(lines) if line.startswith(self.PS1) + ] + start, end = indices[-3], indices[-2] + candidates = [l.strip() for l in lines[start+1:end]] + self.assertIn("_attached", candidates) + self.assertIn("main", candidates) + self.assertIn("temp", candidates) + @unittest.skipIf(sys.platform.startswith("freebsd"), "Two actual tabs are inserted when there are no matching" " completions in the pseudo-terminal opened by run_pty()" @@ -274,8 +379,6 @@ def test_complete_no_match(self): self.assertEqual(line_num, len(lines)) def test_complete_no_input(self): - from _sqlite3 import SQLITE_KEYWORDS - script = textwrap.dedent(""" import readline from sqlite3.__main__ import main @@ -306,7 +409,7 @@ def test_complete_no_input(self): self.assertEqual(len(indices), 2) start, end = indices candidates = [l.strip() for l in lines[start+1:end]] - self.assertEqual(candidates, sorted(SQLITE_KEYWORDS)) + self.assertEqual(candidates, sorted(candidates)) except: if verbose: print(' PTY output: '.center(30, '-')) diff --git a/Misc/NEWS.d/next/Library/2025-06-29-22-01-00.gh-issue-133390.I1DW_3.rst b/Misc/NEWS.d/next/Library/2025-06-29-22-01-00.gh-issue-133390.I1DW_3.rst new file mode 100644 index 00000000000000..c57f802d4c8a78 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-06-29-22-01-00.gh-issue-133390.I1DW_3.rst @@ -0,0 +1,2 @@ +Support table, index, trigger, view, column, function, and schema completion +for :mod:`sqlite3`'s :ref:`command-line interface `.