Skip to content

Commit 7164755

Browse files
committed
Support table, index, trigger, view, column, function, and schema completion for sqlite3 CLI
1 parent bd928a3 commit 7164755

File tree

3 files changed

+158
-13
lines changed

3 files changed

+158
-13
lines changed

Lib/sqlite3/__main__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ def main(*args):
142142
execute(con, args.sql, suppress_errors=False, theme=theme)
143143
else:
144144
# No SQL provided; start the REPL.
145-
with completer():
145+
with completer(con):
146146
console = SqliteInteractiveConsole(con, use_color=True)
147147
console.interact(banner, exitmsg="")
148148
finally:

Lib/sqlite3/_completer.py

Lines changed: 47 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from _sqlite3 import OperationalError
12
from contextlib import contextmanager
23

34
try:
@@ -8,29 +9,70 @@
89
_completion_matches = []
910

1011

11-
def _complete(text, state):
12+
def _complete(con, text, state):
1213
global _completion_matches
1314

1415
if state == 0:
1516
text_upper = text.upper()
16-
_completion_matches = [c for c in SQLITE_KEYWORDS if c.startswith(text_upper)]
17+
text_lower = text.lower()
18+
_completion_matches = [c + " " for c in SQLITE_KEYWORDS if c.startswith(text_upper)]
19+
cursor = con.cursor()
20+
schemata = tuple(row[1] for row
21+
in cursor.execute("PRAGMA database_list"))
22+
# tables, indexes, triggers, and views
23+
select_clauses = (f"SELECT name FROM \"{schema}\".sqlite_master"
24+
for schema in schemata)
25+
tables = (row[0] for row
26+
in cursor.execute(" UNION ".join(select_clauses)))
27+
_completion_matches.extend(c + " " for c in tables
28+
if c.lower().startswith(text_lower))
29+
# columns
30+
try:
31+
select_clauses = (f"""\
32+
SELECT pti.name FROM "{schema}".sqlite_master AS sm
33+
JOIN pragma_table_xinfo(sm.name,'{schema}') AS pti
34+
WHERE sm.type='table'""" for schema in schemata)
35+
columns = (row[0] for row
36+
in cursor.execute(" UNION ".join(select_clauses)))
37+
_completion_matches.extend(c + " " for c in columns
38+
if c.lower().startswith(text_lower))
39+
except OperationalError:
40+
# skip on SQLite<3.16.0 where pragma table-valued function is not
41+
# supported yet
42+
pass
43+
# functions
44+
try:
45+
funcs = (row[0] for row in cursor.execute("""\
46+
SELECT DISTINCT UPPER(name) FROM pragma_function_list()
47+
WHERE name NOT IN ('->', '->>')"""))
48+
_completion_matches.extend(c + "(" for c in funcs
49+
if c.startswith(text_upper))
50+
except OperationalError:
51+
# skip on SQLite<3.30.0 where function_list is not supported yet
52+
pass
53+
# schemata
54+
_completion_matches.extend(c for c in schemata
55+
if c.lower().startswith(text_lower))
56+
_completion_matches = sorted(set(_completion_matches))
1757
try:
18-
return _completion_matches[state] + " "
58+
return _completion_matches[state]
1959
except IndexError:
2060
return None
2161

2262

2363
@contextmanager
24-
def completer():
64+
def completer(con):
2565
try:
2666
import readline
2767
except ImportError:
2868
yield
2969
return
3070

3171
old_completer = readline.get_completer()
72+
def complete(text, state):
73+
return _complete(con, text, state)
3274
try:
33-
readline.set_completer(_complete)
75+
readline.set_completer(complete)
3476
if readline.backend == "editline":
3577
# libedit uses "^I" instead of "tab"
3678
command_string = "bind ^I rl_complete"

Lib/test/test_sqlite3/test_cli.py

Lines changed: 110 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -216,10 +216,6 @@ class Completion(unittest.TestCase):
216216

217217
@classmethod
218218
def setUpClass(cls):
219-
_sqlite3 = import_module("_sqlite3")
220-
if not hasattr(_sqlite3, "SQLITE_KEYWORDS"):
221-
raise unittest.SkipTest("unable to determine SQLite keywords")
222-
223219
readline = import_module("readline")
224220
if readline.backend == "editline":
225221
raise unittest.SkipTest("libedit readline is not supported")
@@ -229,12 +225,24 @@ def write_input(self, input_, env=None):
229225
import readline
230226
from sqlite3.__main__ import main
231227
228+
# Configure readline to ...:
229+
# - hide control sequences surrounding each candidate
230+
# - hide "Display all xxx possibilities? (y or n)"
231+
# - show candidates one per line
232232
readline.parse_and_bind("set colored-completion-prefix off")
233+
readline.parse_and_bind("set completion-query-items 0")
234+
readline.parse_and_bind("set page-completions off")
235+
readline.parse_and_bind("set completion-display-width 0")
236+
233237
main()
234238
""")
235239
return run_pty(script, input_, env)
236240

237241
def test_complete_sql_keywords(self):
242+
_sqlite3 = import_module("_sqlite3")
243+
if not hasattr(_sqlite3, "SQLITE_KEYWORDS"):
244+
raise unittest.SkipTest("unable to determine SQLite keywords")
245+
238246
# List candidates starting with 'S', there should be multiple matches.
239247
input_ = b"S\t\tEL\t 1;\n.quit\n"
240248
output = self.write_input(input_)
@@ -249,6 +257,103 @@ def test_complete_sql_keywords(self):
249257
self.assertIn(b"SELECT", output)
250258
self.assertIn(b"(1,)", output)
251259

260+
def test_complete_table_indexes_triggers_views(self):
261+
input_ = textwrap.dedent("""\
262+
CREATE TABLE _table (id);
263+
CREATE INDEX _index ON _table (id);
264+
CREATE TRIGGER _trigger BEFORE INSERT
265+
ON _table BEGIN SELECT 1; END;
266+
CREATE VIEW _view AS SELECT 1;
267+
268+
CREATE TEMP TABLE _temp_table (id);
269+
CREATE INDEX temp._temp_index ON _temp_table (id);
270+
CREATE TEMP TRIGGER _temp_trigger BEFORE INSERT
271+
ON _table BEGIN SELECT 1; END;
272+
CREATE TEMP VIEW _temp_view AS SELECT 1;
273+
274+
ATTACH ':memory:' AS attached;
275+
CREATE TABLE attached._attached_table (id);
276+
CREATE INDEX attached._attached_index ON _attached_table (id);
277+
CREATE TRIGGER attached._attached_trigger BEFORE INSERT
278+
ON _attached_table BEGIN SELECT 1; END;
279+
CREATE VIEW attached._attached_view AS SELECT 1;
280+
281+
SELECT id FROM _\t\tta\t;
282+
.quit\n""").encode()
283+
output = self.write_input(input_)
284+
lines = output.decode().splitlines()
285+
indices = [i for i, line in enumerate(lines)
286+
if line.startswith(self.PS1)]
287+
start, end = indices[-3], indices[-2]
288+
candidates = [l.strip() for l in lines[start+1:end]]
289+
self.assertEqual(candidates,
290+
[
291+
"_attached_index",
292+
"_attached_table",
293+
"_attached_trigger",
294+
"_attached_view",
295+
"_index",
296+
"_table",
297+
"_temp_index",
298+
"_temp_table",
299+
"_temp_trigger",
300+
"_temp_view",
301+
"_trigger",
302+
"_view",
303+
],
304+
)
305+
306+
def test_complete_columns(self):
307+
input_ = textwrap.dedent("""\
308+
CREATE TABLE _table (_col_table);
309+
CREATE TEMP TABLE _temp_table (_col_temp);
310+
ATTACH ':memory:' AS attached;
311+
CREATE TABLE attached._attached_table (_col_attached);
312+
313+
SELECT _col_\t\tta\tFROM _table;
314+
.quit\n""").encode()
315+
output = self.write_input(input_)
316+
lines = output.decode().splitlines()
317+
indices = [
318+
i for i, line in enumerate(lines) if line.startswith(self.PS1)
319+
]
320+
start, end = indices[-3], indices[-2]
321+
candidates = [l.strip() for l in lines[start+1:end]]
322+
323+
self.assertEqual(
324+
candidates, ["_col_attached", "_col_table", "_col_temp"]
325+
)
326+
327+
def test_complete_functions(self):
328+
input_ = b"SELECT AV\t1);\n.quit\n"
329+
output = self.write_input(input_)
330+
self.assertIn(b"AVG(1);", output)
331+
self.assertIn(b"(1.0,)", output)
332+
333+
# Functions are completed in upper case for even lower case user input.
334+
input_ = b"SELECT av\t1);\n.quit\n"
335+
output = self.write_input(input_)
336+
self.assertIn(b"AVG(1);", output)
337+
self.assertIn(b"(1.0,)", output)
338+
339+
def test_complete_schemata(self):
340+
input_ = textwrap.dedent("""\
341+
ATTACH ':memory:' AS _attached;
342+
CREATE TEMP TABLE _table (id);
343+
344+
SELECT * FROM \t\t_att\t.sqlite_master;
345+
.quit\n""").encode()
346+
output = self.write_input(input_)
347+
lines = output.decode().splitlines()
348+
indices = [
349+
i for i, line in enumerate(lines) if line.startswith(self.PS1)
350+
]
351+
start, end = indices[-3], indices[-2]
352+
candidates = [l.strip() for l in lines[start+1:end]]
353+
self.assertIn("_attached", candidates)
354+
self.assertIn("main", candidates)
355+
self.assertIn("temp", candidates)
356+
252357
@unittest.skipIf(sys.platform.startswith("freebsd"),
253358
"Two actual tabs are inserted when there are no matching"
254359
" completions in the pseudo-terminal opened by run_pty()"
@@ -269,8 +374,6 @@ def test_complete_no_match(self):
269374
self.assertEqual(line_num, len(lines))
270375

271376
def test_complete_no_input(self):
272-
from _sqlite3 import SQLITE_KEYWORDS
273-
274377
script = textwrap.dedent("""
275378
import readline
276379
from sqlite3.__main__ import main
@@ -301,7 +404,7 @@ def test_complete_no_input(self):
301404
self.assertEqual(len(indices), 2)
302405
start, end = indices
303406
candidates = [l.strip() for l in lines[start+1:end]]
304-
self.assertEqual(candidates, sorted(SQLITE_KEYWORDS))
407+
self.assertEqual(candidates, sorted(candidates))
305408
except:
306409
if verbose:
307410
print(' PTY output: '.center(30, '-'))

0 commit comments

Comments
 (0)