Skip to content

Commit 3c17ebb

Browse files
feat: [SNOW-1890085] add run_async option
1 parent 584b4ca commit 3c17ebb

File tree

4 files changed

+33
-4
lines changed

4 files changed

+33
-4
lines changed

src/snowflake/cli/_plugins/dbt/commands.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
from snowflake.cli.api.commands.snow_typer import SnowTyperFactory
2727
from snowflake.cli.api.feature_flags import FeatureFlag
2828
from snowflake.cli.api.identifiers import FQN
29-
from snowflake.cli.api.output.types import CommandResult, QueryResult
29+
from snowflake.cli.api.output.types import CommandResult, MessageResult, QueryResult
3030

3131
app = SnowTyperFactory(
3232
name="dbt",
@@ -112,6 +112,9 @@ def deploy_dbt(
112112
@global_options_with_connection
113113
def before_callback(
114114
name: FQN = DBTNameArgument,
115+
run_async: Optional[bool] = typer.Option(
116+
False, help="Run dbt command asynchronously and check it's result later."
117+
),
115118
**options,
116119
):
117120
"""Handles global options passed before the command and takes pipeline name to be accessed through child context later"""
@@ -134,4 +137,10 @@ def _dbt_execute(
134137
dbt_cli_args = ctx.args
135138
dbt_command = ctx.command.name
136139
name = ctx.parent.params["name"]
137-
return QueryResult(DBTManager().execute(dbt_command, name, *dbt_cli_args))
140+
run_async = ctx.parent.params["run_async"]
141+
result = DBTManager().execute(dbt_command, name, run_async, *dbt_cli_args)
142+
if not run_async:
143+
return QueryResult(result)
144+
return MessageResult(
145+
f"Command submitted. You can check the result with `snow sql -q \"select execution_status from table(information_schema.query_history_by_user()) where query_id in ('{result.sfqid}');\"`"
146+
)

src/snowflake/cli/_plugins/dbt/manager.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ def deploy(
7575
query += f"\nWAREHOUSE='{execute_in_warehouse}'"
7676
return self.execute_query(query)
7777

78-
def execute(self, dbt_command: str, name: str, *dbt_cli_args):
78+
def execute(self, dbt_command: str, name: str, run_async: bool, *dbt_cli_args):
7979
if dbt_cli_args:
8080
dbt_command = dbt_command + " " + " ".join([arg for arg in dbt_cli_args])
8181
query = f"EXECUTE DBT PROJECT {name} args='{dbt_command.strip()}'"
82-
return self.execute_query(query)
82+
return self.execute_query(query, _exec_async=run_async)

tests/dbt/test_dbt_commands.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,4 +267,24 @@ def test_dbt_execute(self, mock_connect, runner, args, expected_query):
267267
result = runner.invoke(args)
268268

269269
assert result.exit_code == 0, result.output
270+
assert mock_connect.mocked_ctx.kwargs[0]["_exec_async"] is False
270271
assert mock_connect.mocked_ctx.get_query() == expected_query
272+
273+
def test_execute_async(self, mock_connect, runner):
274+
result = runner.invoke(
275+
[
276+
"dbt",
277+
"execute",
278+
"--run-async",
279+
"pipeline_name",
280+
"compile",
281+
]
282+
)
283+
284+
assert result.exit_code == 0, result.output
285+
assert result.output.startswith("Command submitted")
286+
assert mock_connect.mocked_ctx.kwargs[0]["_exec_async"] is True
287+
assert (
288+
mock_connect.mocked_ctx.get_query()
289+
== "EXECUTE DBT PROJECT pipeline_name args='compile'"
290+
)

tests/testing_utils/fixtures.py

Whitespace-only changes.

0 commit comments

Comments
 (0)