Skip to content

Commit b7a1af7

Browse files
committed
fixed logic for snapshot and added tests
1 parent c2cf437 commit b7a1af7

File tree

5 files changed

+145
-1
lines changed

5 files changed

+145
-1
lines changed

dbt/include/sqlserver/macros/materializations/snapshot/snapshot.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
{% set temp_snapshot_relation_sql = model['compiled_code'].replace("'", "''") %}
3737
{% call statement('create temp_snapshot_relation') %}
38+
USE [{{ model.database}}];
3839
EXEC('DROP VIEW IF EXISTS {{ temp_snapshot_relation.include(database=False) }};');
3940
EXEC('create view {{ temp_snapshot_relation.include(database=False) }} as {{ temp_snapshot_relation_sql }};');
4041
{% endcall %}

dbt/include/sqlserver/macros/relations/table/create.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
{%- set tmp_relation = relation.incorporate(path={"identifier": relation.identifier ~ '__dbt_tmp_vw'}, type='view') -%}
44

55
{%- do adapter.drop_relation(tmp_relation) -%}
6+
USE [{{ relation.database }}];
67
{{ get_create_view_as_sql(tmp_relation, sql) }}
78

89
{%- set table_name -%}

dbt/include/sqlserver/macros/relations/views/create.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
{% set tst %}
1414
SELECT '1' as col
1515
{% endset %}
16-
16+
USE [{{ relation.database }}];
1717
EXEC('{{- escape_single_quotes(query) -}}')
1818

1919
{% endmacro %}

devops/scripts/init_db.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ fi
77
for i in {1..50};
88
do
99
/opt/mssql-tools/bin/sqlcmd -C -S localhost -U sa -P "${SA_PASSWORD}" -d master -I -Q "CREATE DATABASE TestDB COLLATE ${COLLATION}"
10+
/opt/mssql-tools/bin/sqlcmd -C -S localhost -U sa -P "${SA_PASSWORD}" -d master -I -Q "CREATE DATABASE TestDB_Secondary COLLATE ${COLLATION}"
1011
if [ $? -eq 0 ]
1112
then
1213
echo "database creation completed"
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
import pytest
2+
from dbt.tests.util import get_connection, run_dbt
3+
4+
snapshot_sql = """
5+
{% snapshot claims_snapshot %}
6+
7+
{{
8+
config(
9+
target_database='TestDB_Secondary',
10+
target_schema='dbo',
11+
unique_key='id',
12+
13+
strategy='timestamp',
14+
updated_at='updated_at',
15+
)
16+
}}
17+
18+
select * from {{source('mysource', 'claims')}}
19+
20+
{% endsnapshot %}
21+
"""
22+
23+
source_csv = """id,updated_date
24+
1,2024-01-01
25+
2,2024-01-01
26+
3,2024-01-01
27+
"""
28+
29+
sources_yml = """
30+
version: 2
31+
sources:
32+
- name: mysource
33+
database: TestDB
34+
tables:
35+
- name: claims
36+
"""
37+
38+
39+
class TestCrossDB:
40+
def cleanup_primary_table(self, project):
41+
drop_sql = "DROP TABLE IF EXISTS {database}.mysource.claims"
42+
with get_connection(project.adapter):
43+
project.adapter.execute(
44+
drop_sql.format(database=project.database),
45+
fetch=True,
46+
)
47+
48+
def cleanup_snapshot_table(self, project):
49+
drop_sql = "DROP TABLE IF EXISTS TestDB_Secondary.dbo.claims_snapshot"
50+
with get_connection(project.adapter):
51+
project.adapter.execute(
52+
drop_sql,
53+
fetch=True,
54+
)
55+
56+
def create_source_schema(self, project):
57+
create_sql = """
58+
IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = 'mysource')
59+
BEGIN
60+
EXEC('CREATE SCHEMA mysource')
61+
END
62+
"""
63+
with get_connection(project.adapter):
64+
project.adapter.execute(
65+
create_sql,
66+
fetch=True,
67+
)
68+
69+
def create_primary_table(self, project):
70+
src_query = """
71+
SELECT *
72+
INTO
73+
{database}.mysource.claims
74+
FROM
75+
(
76+
SELECT
77+
1 as id,
78+
CAST('2024-01-01' as DATETIME2(6)) updated_at
79+
80+
UNION ALL
81+
82+
SELECT
83+
2 as id,
84+
CAST('2024-01-01' as DATETIME2(6)) updated_at
85+
86+
UNION ALL
87+
88+
SELECT
89+
3 as id,
90+
CAST('2024-01-01' as DATETIME2(6)) updated_at
91+
) as src_data
92+
"""
93+
with get_connection(project.adapter):
94+
project.adapter.execute(
95+
src_query.format(database=project.database, schema=project.test_schema),
96+
fetch=True,
97+
)
98+
99+
def create_secondary_schema(self, project):
100+
src_query = """
101+
USE [TestDB_Secondary]
102+
EXEC ('CREATE SCHEMA {schema}')
103+
"""
104+
with get_connection(project.adapter):
105+
project.adapter.execute(
106+
src_query.format(database=project.database, schema=project.test_schema),
107+
fetch=True,
108+
)
109+
110+
def update_primary_table(self, project):
111+
sql = """
112+
UPDATE [{database}].[mysource].[claims]
113+
SET
114+
updated_at = CAST('2024-02-01' as datetime2(6))
115+
WHERE
116+
id = 3
117+
"""
118+
with get_connection(project.adapter):
119+
project.adapter.execute(
120+
sql.format(database=project.database),
121+
fetch=True,
122+
)
123+
124+
@pytest.fixture(scope="class")
125+
def models(self):
126+
return {"sources.yml": sources_yml}
127+
128+
@pytest.fixture(scope="class")
129+
def snapshots(self):
130+
return {"claims_snapshot.sql": snapshot_sql}
131+
132+
def test_cross_db_snapshot(self, project):
133+
self.cleanup_primary_table(project)
134+
self.cleanup_snapshot_table(project)
135+
136+
self.create_source_schema(project)
137+
self.create_primary_table(project)
138+
# self.create_secondary_schema(project)
139+
run_dbt(["snapshot"])
140+
self.update_primary_table(project)
141+
run_dbt(["snapshot"])

0 commit comments

Comments
 (0)