Skip to content

Commit 853043f

Browse files
authored
Merge pull request #537 from dbt-msft/snapshot-issue
2 parents c2cf437 + 4f2370b commit 853043f

File tree

4 files changed

+173
-1
lines changed

4 files changed

+173
-1
lines changed

dbt/include/sqlserver/macros/materializations/snapshot/snapshot.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
{% set temp_snapshot_relation_sql = model['compiled_code'].replace("'", "''") %}
3737
{% call statement('create temp_snapshot_relation') %}
38+
USE [{{ model.database}}];
3839
EXEC('DROP VIEW IF EXISTS {{ temp_snapshot_relation.include(database=False) }};');
3940
EXEC('create view {{ temp_snapshot_relation.include(database=False) }} as {{ temp_snapshot_relation_sql }};');
4041
{% endcall %}

dbt/include/sqlserver/macros/relations/table/create.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
{%- set tmp_relation = relation.incorporate(path={"identifier": relation.identifier ~ '__dbt_tmp_vw'}, type='view') -%}
44

55
{%- do adapter.drop_relation(tmp_relation) -%}
6+
USE [{{ relation.database }}];
67
{{ get_create_view_as_sql(tmp_relation, sql) }}
78

89
{%- set table_name -%}

dbt/include/sqlserver/macros/relations/views/create.sql

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
{% set tst %}
1414
SELECT '1' as col
1515
{% endset %}
16-
16+
USE [{{ relation.database }}];
1717
EXEC('{{- escape_single_quotes(query) -}}')
1818

1919
{% endmacro %}
Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
import pytest
2+
from dbt.tests.util import get_connection, run_dbt
3+
4+
snapshot_sql = """
5+
{% snapshot claims_snapshot %}
6+
7+
{{
8+
config(
9+
target_database='secondary_db',
10+
target_schema='dbo',
11+
unique_key='id',
12+
13+
strategy='timestamp',
14+
updated_at='updated_at',
15+
)
16+
}}
17+
18+
select * from {{source('mysource', 'claims')}}
19+
20+
{% endsnapshot %}
21+
"""
22+
23+
source_csv = """id,updated_date
24+
1,2024-01-01
25+
2,2024-01-01
26+
3,2024-01-01
27+
"""
28+
29+
sources_yml = """
30+
version: 2
31+
sources:
32+
- name: mysource
33+
database: TestDB
34+
tables:
35+
- name: claims
36+
"""
37+
38+
39+
class TestCrossDB:
40+
def create_secondary_db(self, project):
41+
create_sql = """
42+
DECLARE @col NVARCHAR(256)
43+
SET @col = (SELECT CONVERT (varchar(256), SERVERPROPERTY('collation')));
44+
45+
IF NOT EXISTS (SELECT * FROM sys.databases WHERE name='secondary_db')
46+
BEGIN
47+
EXEC ('CREATE DATABASE secondary_db COLLATE ' + @col)
48+
END
49+
"""
50+
51+
with get_connection(project.adapter):
52+
project.adapter.execute(
53+
create_sql.format(database=project.database),
54+
fetch=True,
55+
)
56+
57+
def cleanup_secondary_database(self, project):
58+
drop_sql = "DROP DATABASE IF EXISTS secondary_db"
59+
with get_connection(project.adapter):
60+
project.adapter.execute(
61+
drop_sql.format(database=project.database),
62+
fetch=True,
63+
)
64+
65+
def cleanup_primary_table(self, project):
66+
drop_sql = "DROP TABLE IF EXISTS {database}.mysource.claims"
67+
with get_connection(project.adapter):
68+
project.adapter.execute(
69+
drop_sql.format(database=project.database),
70+
fetch=True,
71+
)
72+
73+
def cleanup_snapshot_table(self, project):
74+
drop_sql = "DROP TABLE IF EXISTS TestDB_Secondary.dbo.claims_snapshot"
75+
with get_connection(project.adapter):
76+
project.adapter.execute(
77+
drop_sql,
78+
fetch=True,
79+
)
80+
81+
def create_source_schema(self, project):
82+
create_sql = """
83+
IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = 'mysource')
84+
BEGIN
85+
EXEC('CREATE SCHEMA mysource')
86+
END
87+
"""
88+
with get_connection(project.adapter):
89+
project.adapter.execute(
90+
create_sql,
91+
fetch=True,
92+
)
93+
94+
def create_primary_table(self, project):
95+
src_query = """
96+
SELECT *
97+
INTO
98+
{database}.mysource.claims
99+
FROM
100+
(
101+
SELECT
102+
1 as id,
103+
CAST('2024-01-01' as DATETIME2(6)) updated_at
104+
105+
UNION ALL
106+
107+
SELECT
108+
2 as id,
109+
CAST('2024-01-01' as DATETIME2(6)) updated_at
110+
111+
UNION ALL
112+
113+
SELECT
114+
3 as id,
115+
CAST('2024-01-01' as DATETIME2(6)) updated_at
116+
) as src_data
117+
"""
118+
with get_connection(project.adapter):
119+
project.adapter.execute(
120+
src_query.format(database=project.database, schema=project.test_schema),
121+
fetch=True,
122+
)
123+
124+
def create_secondary_schema(self, project):
125+
src_query = """
126+
USE [secondary_db]
127+
EXEC ('CREATE SCHEMA {schema}')
128+
"""
129+
with get_connection(project.adapter):
130+
project.adapter.execute(
131+
src_query.format(database=project.database, schema=project.test_schema),
132+
fetch=True,
133+
)
134+
135+
def update_primary_table(self, project):
136+
sql = """
137+
UPDATE [{database}].[mysource].[claims]
138+
SET
139+
updated_at = CAST('2024-02-01' as datetime2(6))
140+
WHERE
141+
id = 3
142+
"""
143+
with get_connection(project.adapter):
144+
project.adapter.execute(
145+
sql.format(database=project.database),
146+
fetch=True,
147+
)
148+
149+
@pytest.fixture(scope="class")
150+
def models(self):
151+
return {"sources.yml": sources_yml}
152+
153+
@pytest.fixture(scope="class")
154+
def snapshots(self):
155+
return {"claims_snapshot.sql": snapshot_sql}
156+
157+
def test_cross_db_snapshot(self, project):
158+
self.create_secondary_db(project)
159+
160+
self.cleanup_primary_table(project)
161+
self.cleanup_snapshot_table(project)
162+
163+
self.create_source_schema(project)
164+
self.create_primary_table(project)
165+
run_dbt(["snapshot"])
166+
self.update_primary_table(project)
167+
run_dbt(["snapshot"])
168+
169+
self.cleanup_snapshot_table(project)
170+
self.cleanup_secondary_database(project)

0 commit comments

Comments
 (0)