Skip to content

Commit 5d08a11

Browse files
feat: [SNOW-2326969] implement dcm refresh
1 parent 90cf35f commit 5d08a11

File tree

9 files changed

+400
-31
lines changed

9 files changed

+400
-31
lines changed

src/snowflake/cli/_plugins/dcm/commands.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616

1717
import typer
1818
from snowflake.cli._plugins.dcm.manager import DCMProjectManager
19-
from snowflake.cli._plugins.dcm.utils import format_test_failures
19+
from snowflake.cli._plugins.dcm.utils import (
20+
format_refresh_results,
21+
format_test_failures,
22+
)
2023
from snowflake.cli._plugins.object.command_aliases import add_object_command_aliases
2124
from snowflake.cli._plugins.object.commands import scope_option
2225
from snowflake.cli._plugins.object.manager import ObjectManager
@@ -278,6 +281,33 @@ def test(
278281
return MessageResult(f"All {len(expectations)} expectation(s) passed successfully.")
279282

280283

284+
@app.command(requires_connection=True)
285+
def refresh(
286+
identifier: FQN = dcm_identifier,
287+
**options,
288+
):
289+
"""
290+
Refreshes dynamic tables defined in DCM project.
291+
"""
292+
with cli_console.spinner() as spinner:
293+
spinner.add_task(description=f"Refreshing dcm project {identifier}", total=None)
294+
result = DCMProjectManager().refresh(project_identifier=identifier)
295+
296+
row = result.fetchone()
297+
if not row:
298+
return MessageResult("No data.")
299+
300+
result_data = row[0]
301+
result_json = (
302+
json.loads(result_data) if isinstance(result_data, str) else result_data
303+
)
304+
305+
refreshed_tables = result_json.get("refreshed_tables", [])
306+
message = format_refresh_results(refreshed_tables)
307+
308+
return MessageResult(message)
309+
310+
281311
def _get_effective_stage(identifier: FQN, from_location: Optional[str]):
282312
manager = DCMProjectManager()
283313
if not from_location:

src/snowflake/cli/_plugins/dcm/manager.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,10 @@ def test(self, project_identifier: FQN):
140140
query = f"EXECUTE DCM PROJECT {project_identifier.sql_identifier} TEST ALL"
141141
return self.execute_query(query=query)
142142

143+
def refresh(self, project_identifier: FQN):
144+
query = f"EXECUTE DCM PROJECT {project_identifier.sql_identifier} REFRESH ALL"
145+
return self.execute_query(query=query)
146+
143147
@staticmethod
144148
def sync_local_files(
145149
project_identifier: FQN, source_directory: str | None = None

src/snowflake/cli/_plugins/dcm/utils.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,17 @@ def format_test_failures(
4141
)
4242

4343
return "\n".join(lines)
44+
45+
46+
def format_refresh_results(refreshed_tables: list) -> str:
47+
"""Format refresh results into a concise user-friendly message."""
48+
if not refreshed_tables:
49+
return "No dynamic tables found in the project."
50+
51+
total_tables = len(refreshed_tables)
52+
refreshed_count = sum(
53+
1 for table in refreshed_tables if table.get("refreshed_dt_count", 0) > 0
54+
)
55+
up_to_date_count = total_tables - refreshed_count
56+
57+
return f"{refreshed_count} dynamic table(s) refreshed. {up_to_date_count} dynamic table(s) up-to-date."

src/snowflake/cli/api/output/types.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -146,9 +146,13 @@ def __init__(self, cursor: SnowflakeCursor):
146146
def _prepare_payload(self, cursor):
147147
results = list(QueryResult(cursor).result)
148148
if results:
149-
# Return value of the first tuple
150-
return json.loads(list(results[0].items())[0][1])
151-
return None
149+
# Parse the JSON value from the first tuple
150+
parsed_value = json.loads(list(results[0].items())[0][1])
151+
# If it's a list, yield each element; if it's a dict, yield the single dict
152+
if isinstance(parsed_value, list):
153+
yield from parsed_value
154+
else:
155+
yield parsed_value
152156

153157

154158
class MessageResult(CommandResult):

tests/__snapshots__/test_help_messages.ambr

Lines changed: 154 additions & 27 deletions
Large diffs are not rendered by default.
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# serializer version: 1
2+
# name: TestDCMRefresh.test_refresh_with_fresh_tables
3+
'''
4+
5+
0 dynamic table(s) refreshed. 1 dynamic table(s) up-to-date.
6+
7+
'''
8+
# ---
9+
# name: TestDCMRefresh.test_refresh_with_no_dynamic_tables
10+
'''
11+
12+
No dynamic tables found in the project.
13+
14+
'''
15+
# ---
16+
# name: TestDCMRefresh.test_refresh_with_outdated_tables
17+
'''
18+
19+
1 dynamic table(s) refreshed. 0 dynamic table(s) up-to-date.
20+
21+
'''
22+
# ---

tests/dcm/test_commands.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -799,3 +799,70 @@ def test_test_with_multiple_failed_expectations(self, mock_pm, runner, mock_curs
799799
mock_pm().test.assert_called_once_with(
800800
project_identifier=FQN.from_string("my_project")
801801
)
802+
803+
804+
class TestDCMRefresh:
805+
@mock.patch(DCMProjectManager)
806+
def test_refresh_with_outdated_tables(self, mock_pm, runner, mock_cursor, snapshot):
807+
refresh_result = {
808+
"refreshed_tables": [
809+
{
810+
"dt_name": "JW_DCM_TESTALL.ANALYTICS.DYNAMIC_EMPLOYEES",
811+
"refreshed_dt_count": 1,
812+
"data_timestamp": "1760357032.175",
813+
"statistics": '{"insertedRows":5,"copiedRows":0,"deletedRows":5}',
814+
}
815+
]
816+
}
817+
mock_pm().refresh.return_value = mock_cursor(
818+
rows=[(json.dumps(refresh_result),)], columns=("result",)
819+
)
820+
821+
result = runner.invoke(["dcm", "refresh", "my_project"])
822+
823+
assert result.exit_code == 0, result.output
824+
assert result.output == snapshot
825+
mock_pm().refresh.assert_called_once_with(
826+
project_identifier=FQN.from_string("my_project")
827+
)
828+
829+
@mock.patch(DCMProjectManager)
830+
def test_refresh_with_fresh_tables(self, mock_pm, runner, mock_cursor, snapshot):
831+
refresh_result = {
832+
"refreshed_tables": [
833+
{
834+
"dt_name": "JW_DCM_TESTALL.ANALYTICS.DYNAMIC_EMPLOYEES",
835+
"refreshed_dt_count": 0,
836+
"data_timestamp": "1760356974.543",
837+
"statistics": "No new data",
838+
}
839+
]
840+
}
841+
mock_pm().refresh.return_value = mock_cursor(
842+
rows=[(json.dumps(refresh_result),)], columns=("result",)
843+
)
844+
845+
result = runner.invoke(["dcm", "refresh", "my_project"])
846+
847+
assert result.exit_code == 0, result.output
848+
assert result.output == snapshot
849+
mock_pm().refresh.assert_called_once_with(
850+
project_identifier=FQN.from_string("my_project")
851+
)
852+
853+
@mock.patch(DCMProjectManager)
854+
def test_refresh_with_no_dynamic_tables(
855+
self, mock_pm, runner, mock_cursor, snapshot
856+
):
857+
refresh_result = {"refreshed_tables": []}
858+
mock_pm().refresh.return_value = mock_cursor(
859+
rows=[(json.dumps(refresh_result),)], columns=("result",)
860+
)
861+
862+
result = runner.invoke(["dcm", "refresh", "my_project"])
863+
864+
assert result.exit_code == 0, result.output
865+
assert result.output == snapshot
866+
mock_pm().refresh.assert_called_once_with(
867+
project_identifier=FQN.from_string("my_project")
868+
)

tests/dcm/test_manager.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,16 @@ def test_test_project(mock_execute_query):
169169
)
170170

171171

172+
@mock.patch(execute_queries)
173+
def test_refresh_project(mock_execute_query):
174+
mgr = DCMProjectManager()
175+
mgr.refresh(project_identifier=TEST_PROJECT)
176+
177+
mock_execute_query.assert_called_once_with(
178+
query="EXECUTE DCM PROJECT IDENTIFIER('my_project') REFRESH ALL"
179+
)
180+
181+
172182
@mock.patch(execute_queries)
173183
def test_plan_project_with_output_path__stage(mock_execute_query, project_directory):
174184
mgr = DCMProjectManager()

tests_integration/test_dcm_project.py

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,3 +642,94 @@ def test_dcm_test_command(
642642
result = runner.invoke_with_connection(["dcm", "test", project_name])
643643
assert result.exit_code == 0, result.output
644644
assert "expectation(s) passed successfully" in result.output
645+
646+
647+
@pytest.mark.qa_only
648+
@pytest.mark.integration
649+
def test_dcm_refresh_command(
650+
runner,
651+
test_database,
652+
project_directory,
653+
object_name_provider,
654+
sql_test_helper,
655+
):
656+
project_name = object_name_provider.create_and_get_next_object_name()
657+
base_table_name = f"{test_database}.PUBLIC.RefreshBaseTable"
658+
dynamic_table_name = f"{test_database}.PUBLIC.RefreshDynamicTable"
659+
660+
with project_directory("dcm_project") as project_root:
661+
result = runner.invoke_with_connection(["dcm", "create", project_name])
662+
assert result.exit_code == 0, result.output
663+
664+
# Deploy the project.
665+
result = runner.invoke_with_connection_json(
666+
[
667+
"dcm",
668+
"deploy",
669+
project_name,
670+
"-D",
671+
f"table_name='{test_database}.PUBLIC.OutputTestTable'",
672+
]
673+
)
674+
assert result.exit_code == 0, result.output
675+
676+
# 1) Without any dynamic tables, run refresh command - should report no dynamic tables.
677+
result = runner.invoke_with_connection(["dcm", "refresh", project_name])
678+
assert result.exit_code == 0, result.output
679+
assert "No dynamic tables found in the project." in result.output
680+
681+
# 2) Define base table and dynamic table with long refresh time.
682+
table_definitions = f"""
683+
define table identifier('{base_table_name}') (
684+
id int, name varchar, email varchar
685+
);
686+
687+
define dynamic table identifier('{dynamic_table_name}')
688+
target_lag = '1000 minutes'
689+
WAREHOUSE = xs
690+
as select * from {base_table_name};
691+
"""
692+
file_a_path = project_root / "file_a.sql"
693+
original_content = file_a_path.read_text()
694+
file_a_path.write_text(original_content + table_definitions)
695+
696+
# Deploy the project.
697+
result = runner.invoke_with_connection_json(
698+
[
699+
"dcm",
700+
"deploy",
701+
project_name,
702+
"-D",
703+
f"table_name='{test_database}.PUBLIC.OutputTestTable'",
704+
]
705+
)
706+
assert result.exit_code == 0, result.output
707+
708+
# 3) Insert data into the base table.
709+
insert_data_sql = f"""
710+
INSERT INTO {base_table_name} (id, name, email) VALUES
711+
(1, 'Alice Johnson', '[email protected]'),
712+
(2, 'Bob Williams', '[email protected]'),
713+
(3, 'Charlie Brown', '[email protected]');
714+
"""
715+
sql_test_helper.execute_single_sql(insert_data_sql)
716+
717+
# 4) Verify that data is NOT yet in the dynamic table (due to long refresh time).
718+
check_dt_sql = f"SELECT COUNT(*) as cnt FROM {dynamic_table_name}"
719+
result = sql_test_helper.execute_single_sql(check_dt_sql)
720+
count_before = result[0]["CNT"]
721+
assert count_before == 0, "Dynamic table should be empty before refresh."
722+
723+
# 5) Run dcm refresh command.
724+
result = runner.invoke_with_connection(["dcm", "refresh", project_name])
725+
assert result.exit_code == 0, result.output
726+
# Should show at least 1 table was refreshed
727+
assert (
728+
"1 dynamic table(s) refreshed" in result.output
729+
or "dynamic table(s) refreshed" in result.output
730+
)
731+
732+
# 6) Verify that data is NOW in the dynamic table.
733+
result = sql_test_helper.execute_single_sql(check_dt_sql)
734+
count_after = result[0]["CNT"]
735+
assert count_after == 3, "Dynamic table should have 3 rows after refresh."

0 commit comments

Comments
 (0)