diff --git a/AUTHORS b/AUTHORS index 771de13f..2ed06b12 100644 --- a/AUTHORS +++ b/AUTHORS @@ -144,6 +144,7 @@ Contributors: * Jay Knight (jay-knight) * fbdb * Charbel Jacquin (charbeljc) + * Diego Creator: -------- diff --git a/changelog.rst b/changelog.rst index 96eefd74..22def80c 100644 --- a/changelog.rst +++ b/changelog.rst @@ -9,6 +9,9 @@ Features: * Support dsn specific init-command in the config file * Add suggestion when setting the search_path * Allow per dsn_alias ssh tunnel selection +* Add support for `single-command` to run a SQL command and exit. + * Command line option `-c` or `--command`. + * You can specify multiple times. Internal: --------- diff --git a/pgcli/main.py b/pgcli/main.py index 0b4b64f5..dbfb7091 100644 --- a/pgcli/main.py +++ b/pgcli/main.py @@ -911,6 +911,25 @@ def _check_ongoing_transaction_and_allow_quitting(self): def run_cli(self): logger = self.logger + # Handle command mode (-c flag) - similar to psql behavior + # Multiple -c options are executed sequentially + if hasattr(self, 'commands') and self.commands: + try: + for command in self.commands: + logger.debug("Running command: %s", command) + # Execute the command using the same logic as interactive mode + self.handle_watch_command(command) + except PgCliQuitError: + # Normal exit from quit command + sys.exit(0) + except Exception as e: + logger.error("Error executing command: %s", e) + logger.error("traceback: %r", traceback.format_exc()) + click.secho(str(e), err=True, fg="red") + sys.exit(1) + # Exit successfully after executing all commands + sys.exit(0) + history_file = self.config["main"]["history_file"] if history_file == "default": history_file = config_location() + "history" @@ -1278,7 +1297,8 @@ def is_too_tall(self, lines): return len(lines) >= (self.prompt_app.output.get_size().rows - 4) def echo_via_pager(self, text, color=None): - if self.pgspecial.pager_config == PAGER_OFF or self.watch_command: + # Disable pager for -c/--command mode and \watch command + if self.pgspecial.pager_config == PAGER_OFF or self.watch_command or (hasattr(self, 'commands') and self.commands): click.echo(text, color=color) elif self.pgspecial.pager_config == PAGER_LONG_OUTPUT and self.table_format != "csv": lines = text.split("\n") @@ -1426,6 +1446,13 @@ def echo_via_pager(self, text, color=None): type=str, help="SQL statement to execute after connecting.", ) +@click.option( + "-c", + "--command", + "commands", + multiple=True, + help="run command (SQL or internal) and exit. Multiple -c options are allowed.", +) @click.argument("dbname", default=lambda: None, envvar="PGDATABASE", nargs=1) @click.argument("username", default=lambda: None, envvar="PGUSER", nargs=1) def cli( @@ -1454,6 +1481,7 @@ def cli( ssh_tunnel: str, init_command: str, log_file: str, + commands: tuple, ): if version: print("Version:", __version__) @@ -1514,6 +1542,9 @@ def cli( log_file=log_file, ) + # Store commands for -c option (can be multiple) + pgcli.commands = commands if commands else None + # Choose which ever one has a valid value. if dbname_opt and dbname: # work as psql: when database is given as option and argument use the argument as user diff --git a/tests/features/command_option.feature b/tests/features/command_option.feature new file mode 100644 index 00000000..38f5fef5 --- /dev/null +++ b/tests/features/command_option.feature @@ -0,0 +1,38 @@ +Feature: run the cli with -c/--command option, + execute a single command, + and exit + + Scenario: run pgcli with -c and a SQL query + When we run pgcli with -c "SELECT 1 as test_column" + then we see the query result + and pgcli exits successfully + + Scenario: run pgcli with --command and a SQL query + When we run pgcli with --command "SELECT 'hello' as greeting" + then we see the query result + and pgcli exits successfully + + Scenario: run pgcli with -c and a special command + When we run pgcli with -c "\dt" + then we see the command output + and pgcli exits successfully + + Scenario: run pgcli with -c and an invalid query + When we run pgcli with -c "SELECT invalid_column FROM nonexistent_table" + then we see an error message + and pgcli exits successfully + + Scenario: run pgcli with -c and multiple statements + When we run pgcli with -c "SELECT 1; SELECT 2" + then we see both query results + and pgcli exits successfully + + Scenario: run pgcli with multiple -c options + When we run pgcli with multiple -c options + then we see all command outputs + and pgcli exits successfully + + Scenario: run pgcli with mixed -c and --command options + When we run pgcli with mixed -c and --command + then we see all command outputs + and pgcli exits successfully diff --git a/tests/features/steps/command_option.py b/tests/features/steps/command_option.py new file mode 100644 index 00000000..48188361 --- /dev/null +++ b/tests/features/steps/command_option.py @@ -0,0 +1,192 @@ +""" +Steps for testing -c/--command option behavioral tests. +""" + +import subprocess +from behave import when, then + + +@when('we run pgcli with -c "{command}"') +def step_run_pgcli_with_c(context, command): + """Run pgcli with -c flag and a command.""" + cmd = [ + "pgcli", + "-h", context.conf["host"], + "-p", str(context.conf["port"]), + "-U", context.conf["user"], + "-d", context.conf["dbname"], + "-c", command + ] + try: + context.cmd_output = subprocess.check_output( + cmd, + cwd=context.package_root, + stderr=subprocess.STDOUT, + timeout=5 + ) + context.exit_code = 0 + except subprocess.CalledProcessError as e: + context.cmd_output = e.output + context.exit_code = e.returncode + except subprocess.TimeoutExpired as e: + context.cmd_output = b"Command timed out" + context.exit_code = -1 + + +@when('we run pgcli with --command "{command}"') +def step_run_pgcli_with_command(context, command): + """Run pgcli with --command flag and a command.""" + cmd = [ + "pgcli", + "-h", context.conf["host"], + "-p", str(context.conf["port"]), + "-U", context.conf["user"], + "-d", context.conf["dbname"], + "--command", command + ] + try: + context.cmd_output = subprocess.check_output( + cmd, + cwd=context.package_root, + stderr=subprocess.STDOUT, + timeout=5 + ) + context.exit_code = 0 + except subprocess.CalledProcessError as e: + context.cmd_output = e.output + context.exit_code = e.returncode + except subprocess.TimeoutExpired as e: + context.cmd_output = b"Command timed out" + context.exit_code = -1 + + +@then("we see the query result") +def step_see_query_result(context): + """Verify that the query result is in the output.""" + output = context.cmd_output.decode('utf-8') + # Check for common query result indicators + assert any([ + "SELECT" in output, + "test_column" in output, + "greeting" in output, + "hello" in output, + "+-" in output, # table border + "|" in output, # table column separator + ]), f"Expected query result in output, but got: {output}" + + +@then("we see both query results") +def step_see_both_query_results(context): + """Verify that both query results are in the output.""" + output = context.cmd_output.decode('utf-8') + # Should contain output from both SELECT statements + assert "SELECT" in output, f"Expected SELECT in output, but got: {output}" + # The output should have multiple result sets + assert output.count("SELECT") >= 2, f"Expected at least 2 SELECT results, but got: {output}" + + +@then("we see the command output") +def step_see_command_output(context): + """Verify that the special command output is present.""" + output = context.cmd_output.decode('utf-8') + # For \dt we should see table-related output + # It might be empty if no tables exist, but shouldn't error + assert context.exit_code == 0, f"Expected exit code 0, but got: {context.exit_code}" + + +@then("we see an error message") +def step_see_error_message(context): + """Verify that an error message is in the output.""" + output = context.cmd_output.decode('utf-8') + assert any([ + "does not exist" in output, + "error" in output.lower(), + "ERROR" in output, + ]), f"Expected error message in output, but got: {output}" + + +@then("pgcli exits successfully") +def step_pgcli_exits_successfully(context): + """Verify that pgcli exited with code 0.""" + assert context.exit_code == 0, f"Expected exit code 0, but got: {context.exit_code}" + # Clean up + context.cmd_output = None + context.exit_code = None + + +@then("pgcli exits with error") +def step_pgcli_exits_with_error(context): + """Verify that pgcli exited with a non-zero code.""" + assert context.exit_code != 0, f"Expected non-zero exit code, but got: {context.exit_code}" + # Clean up + context.cmd_output = None + context.exit_code = None + + +@when("we run pgcli with multiple -c options") +def step_run_pgcli_with_multiple_c(context): + """Run pgcli with multiple -c flags.""" + cmd = [ + "pgcli", + "-h", context.conf["host"], + "-p", str(context.conf["port"]), + "-U", context.conf["user"], + "-d", context.conf["dbname"], + "-c", "SELECT 'first' as result", + "-c", "SELECT 'second' as result", + "-c", "SELECT 'third' as result" + ] + try: + context.cmd_output = subprocess.check_output( + cmd, + cwd=context.package_root, + stderr=subprocess.STDOUT, + timeout=10 + ) + context.exit_code = 0 + except subprocess.CalledProcessError as e: + context.cmd_output = e.output + context.exit_code = e.returncode + except subprocess.TimeoutExpired as e: + context.cmd_output = b"Command timed out" + context.exit_code = -1 + + +@when("we run pgcli with mixed -c and --command") +def step_run_pgcli_with_mixed_options(context): + """Run pgcli with mixed -c and --command flags.""" + cmd = [ + "pgcli", + "-h", context.conf["host"], + "-p", str(context.conf["port"]), + "-U", context.conf["user"], + "-d", context.conf["dbname"], + "-c", "SELECT 'from_c' as source", + "--command", "SELECT 'from_command' as source" + ] + try: + context.cmd_output = subprocess.check_output( + cmd, + cwd=context.package_root, + stderr=subprocess.STDOUT, + timeout=10 + ) + context.exit_code = 0 + except subprocess.CalledProcessError as e: + context.cmd_output = e.output + context.exit_code = e.returncode + except subprocess.TimeoutExpired as e: + context.cmd_output = b"Command timed out" + context.exit_code = -1 + + +@then("we see all command outputs") +def step_see_all_command_outputs(context): + """Verify that all command outputs are present.""" + output = context.cmd_output.decode('utf-8') + # Should contain output from all commands + assert "first" in output or "from_c" in output, f"Expected 'first' or 'from_c' in output, but got: {output}" + assert "second" in output or "from_command" in output, f"Expected 'second' or 'from_command' in output, but got: {output}" + # For the 3-command test, also check for third + if "third" in output or "result" in output: + assert "third" in output, f"Expected 'third' in output for 3-command test, but got: {output}"