Skip to content

Commit 9e4666f

Browse files
authored
changes to create demo for dbt docs (#32)
1 parent 8cab847 commit 9e4666f

19 files changed

+459
-6
lines changed
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
dbt-core==1.0.1
2-
dbt-postgres==1.0.1
3-
dbt-redshift==1.0.0
4-
dbt-snowflake==1.0.0
5-
dbt-bigquery==1.0.0
6-
dbt-materialize==1.0.1
1+
dbt-core
2+
dbt-postgres
3+
dbt-redshift
4+
dbt-snowflake
5+
dbt-bigquery
6+
dbt-materialize

dbt-lambda/.env.template

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
export ENV_CODE="DEV"
2+
export PROJ_CODE="ENTECHLOG"
3+
export SNOWSQL_ACCOUNT=""
4+
export SNOWSQL_PWD=""

dbt-lambda/README.md

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# Lambda Views Demo
2+
3+
This repository demonstrates a **Lambda Architecture** using dbt. It shows how you can combine historical and near real-time data through separate layers.
4+
5+
![dbt Docs](./assets/dbt-docs.png)
6+
7+
## What’s Included
8+
9+
- **Prep Layer:**
10+
Loads and prepares all raw data from the `purchases` seed. This layer supports querying all data from the raw seed.
11+
12+
- **Fact Layer Macro:**
13+
A macro (`fact_purchases_sql`) that transforms the prep data and applies a date filter using the `lambda_filter_by_date` macro.
14+
15+
- **Fact Layer Model:**
16+
A model that calls the fact layer macro to build an incremental fact table from the prep data.
17+
18+
- **OBT/Lambda View:**
19+
A view that uses `lambda_union_by_date` to dynamically union historical fact data with new raw data based on the batch cycle date. It determines whether to pull data from the fact or raw layer.
20+
21+
## Steps to Run the Demo
22+
23+
1. **Seed the Data:**
24+
Run the following command to load the raw seed data:
25+
```bash
26+
dbt seed
27+
```
28+
29+
2. **Build the Models:**
30+
Build all models by running:
31+
```bash
32+
dbt run
33+
```
34+
35+
3. **Initial Check:**
36+
- Query the **fact** model and the **OBT/lambda view**.
37+
- You should see **20 records** in each.
38+
39+
4. **Simulate New Data:**
40+
- Move the data from the backup seed into the raw seed file (this simulates new data arriving).
41+
- Run the seed command again:
42+
```bash
43+
dbt seed
44+
```
45+
46+
5. **Rebuild and Validate:**
47+
Run:
48+
```bash
49+
dbt run
50+
```
51+
- Now, when you query the **fact** model, it should still show **20 records**.
52+
- The **OBT/lambda view** will display **45 records**, with **25 new records** coming from the raw layer.
53+
54+
This demo shows how the Lambda Architecture in dbt can dynamically merge historical data with new incoming data at query time.

dbt-lambda/assets/dbt-docs.png

37.1 KB
Loading

dbt-lambda/dbt_project.yml

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
name: 'dbt_lambda_demo'
2+
version: '1.0.0'
3+
config-version: 2
4+
5+
profile: 'dbt-snowflake'
6+
7+
vars:
8+
run_type: "daily"
9+
batch_cycle_date: '2024-01-01'
10+
11+
source-paths: ["models"]
12+
macro-paths: ["macros"]
13+
seed-paths: ["seeds"]
14+
15+
models:
16+
dbt_lambda_demo:
17+
+materialized: table
18+
prep:
19+
dim:
20+
database: "{{ env_var('ENV_CODE') | trim }}_{{ env_var('PROJ_CODE') | trim }}_PREP_DB"
21+
schema: DIM
22+
materialized: view
23+
fact:
24+
database: "{{ env_var('ENV_CODE') | trim }}_{{ env_var('PROJ_CODE') | trim }}_PREP_DB"
25+
schema: FACT
26+
materialized: view
27+
dw:
28+
dim:
29+
database: "{{ env_var('ENV_CODE') | trim }}_{{ env_var('PROJ_CODE') | trim }}_DW_DB"
30+
schema: DIM
31+
materialized: view
32+
fact:
33+
database: "{{ env_var('ENV_CODE') | trim }}_{{ env_var('PROJ_CODE') | trim }}_DW_DB"
34+
schema: FACT
35+
materialized: view
36+
obt:
37+
database: "{{ env_var('ENV_CODE') | trim }}_{{ env_var('PROJ_CODE') | trim }}_DW_DB"
38+
schema: OBT
39+
40+
seeds:
41+
dbt_lambda_demo:
42+
database: "{{ env_var('ENV_CODE') | trim }}_{{ env_var('PROJ_CODE') | trim }}_PREP_DB"
43+
schema: SEED
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
{% macro lambda_filter_by_date(src_column_name, tgt_column_name) %}
2+
3+
{% set materialized = config.require('materialized') %}
4+
{% set filter_time = var('lambda_split', run_started_at) %}
5+
6+
{% if materialized == 'view' %}
7+
8+
where {{ src_column_name }} >= (select max({{ tgt_column_name }}) from {{ this | replace('.OBT.', '.FACT.') }})
9+
10+
{% elif is_incremental() %}
11+
12+
where DATE({{ src_column_name }}) = '{{ var("batch_cycle_date") }}'
13+
14+
{% else %}
15+
16+
where {{ src_column_name }} < '{{ filter_time }}'
17+
18+
{% endif %}
19+
20+
{% endmacro %}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
{% macro lambda_union_by_date(historical_relation, model_sql,tgt_timestamp_column) %}
2+
3+
{% set unique_key = config.get('unique_key', none) %}
4+
5+
with historical as (
6+
7+
select *
8+
{# ,'dw' as _dbt_lambda_view_source, #}
9+
{# ({{ get_max_timestamp(historical_relation,tgt_timestamp_column) }}) as _dbt_last_run_at #}
10+
11+
from {{ historical_relation }}
12+
13+
),
14+
15+
new_raw as (
16+
17+
{{ model_sql }}
18+
19+
),
20+
21+
new as (
22+
23+
select *
24+
{# ,'raw' as _dbt_lambda_view_source, #}
25+
{# {{ dbt_utils.current_timestamp() }} as _dbt_last_run_at #}
26+
27+
from new_raw
28+
29+
),
30+
31+
unioned as (
32+
33+
select * from historical
34+
35+
{% if unique_key %}
36+
37+
where {{ unique_key }} not in (
38+
select {{ unique_key }} from new
39+
)
40+
41+
{% endif %}
42+
43+
union
44+
45+
select * from new
46+
47+
)
48+
49+
select * from unioned
50+
51+
{% endmacro %}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
{% macro fact_purchases_sql() %}
2+
3+
WITH source AS (
4+
SELECT *
5+
FROM {{ ref('prep__fact_purchases') }}
6+
{{ lambda_filter_by_date(src_column_name='event_timestamp', tgt_column_name='event_timestamp') }}
7+
),
8+
9+
relations AS (
10+
SELECT
11+
event_date,
12+
event_hour,
13+
event_timestamp,
14+
store_key,
15+
product_key,
16+
product_name,
17+
quantity,
18+
unit_price,
19+
extended_price,
20+
payment_method,
21+
purchase_key
22+
FROM source
23+
)
24+
25+
SELECT *
26+
FROM relations
27+
28+
{% endmacro %}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
{% macro delete_data(del_key, del_value, this, offset_days=0) %}
2+
3+
{%- set relation = adapter.get_relation(this.database, this.schema, this.table) -%}
4+
5+
{%- if relation is not none -%}
6+
{%- if var("run_type") == 'full-refresh'-%}
7+
{%- call statement('truncate table ' ~ this, fetch_result=False, auto_begin=True) -%}
8+
truncate {{ relation }}
9+
{%- endcall -%}
10+
{%- elif var("run_type") == 'daily'-%}
11+
{% if offset_days != 0 %}
12+
{%- set sub_days = offset_days | default(0) -%}
13+
{%- set del_value_datetime = modules.datetime.datetime.strptime(del_value, "%Y%m%d") + modules.datetime.timedelta(days=sub_days) -%}
14+
{%- set del_value_date = del_value_datetime.strftime("'%Y%m%d'") -%}
15+
{%- call statement('delete ' ~ del_value_date ~ ' records from ' ~ this, fetch_result=False, auto_begin=True) -%}
16+
delete from {{ relation }} where {{del_key}} = {{del_value_date}}
17+
{%- endcall -%}
18+
{% else %}
19+
{%- call statement('delete ' ~ del_value ~ ' records from ' ~ this, fetch_result=False, auto_begin=True) -%}
20+
delete from {{ relation }} where {{del_key}} = '{{del_value}}'
21+
{%- endcall -%}
22+
{% endif %}
23+
{%- elif var("run_type") == 'backfill'-%}
24+
{% if offset_days > 0 %}
25+
{%- set sub_days = offset_days | default(0) -%}
26+
{%- set backfill_start_datetime = modules.datetime.datetime.strptime( var("backfill_start_date") , "%Y-%m-%d") + modules.datetime.timedelta(days=sub_days) -%}
27+
{%- set backfill_end_datetime = modules.datetime.datetime.strptime(var("backfill_end_date") , "%Y-%m-%d") + modules.datetime.timedelta(days=sub_days) -%}
28+
{%- set backfill_start_date = backfill_start_datetime.strftime("'%Y%m%d'") -%}
29+
{%- set backfill_end_date = backfill_end_datetime.strftime("'%Y%m%d'") -%}
30+
{%- call statement('delete ' ~ del_value ~ ' records from ' ~ this ~ ' where '~del_key~ ' between ' ~ backfill_start_date ~ ' and ' ~ backfill_end_date, fetch_result=False, auto_begin=True) -%}
31+
delete from {{ relation }} where {{del_key}} between {{backfill_start_date}} and {{backfill_end_date}}
32+
{%- endcall -%}
33+
{% else %}
34+
{%- call statement('delete ' ~ del_value ~ ' records from ' ~ this ~ ' where '~del_key~ ' between ' ~ var("backfill_start_date") ~ ' and ' ~ var("backfill_end_date"), fetch_result=False, auto_begin=True) -%}
35+
delete from {{ relation }} where {{ del_key }} between '{{ var("backfill_start_date") | replace('-', '') }}' and '{{ var("backfill_end_date") | replace('-', '') }}'
36+
{%- endcall -%}
37+
{% endif %}
38+
{% endif %}
39+
{%- endif -%}
40+
41+
{% endmacro %}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
{% macro generate_database_name(custom_database_name=none, node=none) -%}
2+
3+
{%- set default_database = target.database -%}
4+
{%- if custom_database_name is none -%}
5+
6+
{{ default_database }}
7+
8+
{%- else -%}
9+
10+
{{ custom_database_name | trim }}
11+
12+
{%- endif -%}
13+
14+
{%- endmacro %}

0 commit comments

Comments
 (0)